diff options
Diffstat (limited to 'source4/lib/events/events_aio.c')
-rw-r--r-- | source4/lib/events/events_aio.c | 259 |
1 files changed, 121 insertions, 138 deletions
diff --git a/source4/lib/events/events_aio.c b/source4/lib/events/events_aio.c index 97305fdeb3..1c2735c200 100644 --- a/source4/lib/events/events_aio.c +++ b/source4/lib/events/events_aio.c @@ -39,8 +39,10 @@ #include "lib/events/events_internal.h" #include <libaio.h> -#define MAX_AIO_QUEUE_DEPTH 10 +#define MAX_AIO_QUEUE_DEPTH 100 +#ifndef IOCB_CMD_EPOLL_WAIT #define IOCB_CMD_EPOLL_WAIT 9 +#endif struct aio_event_context { /* a pointer back to the generic event_context */ @@ -49,22 +51,24 @@ struct aio_event_context { /* number of registered fd event handlers */ int num_fd_events; - /* list of timed events */ - struct timed_event *timed_events; - uint32_t destruction_count; io_context_t ioctx; - struct io_event events[MAX_AIO_QUEUE_DEPTH]; - struct epoll_event epevent; + struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH]; struct iocb *epoll_iocb; int epoll_fd; + int is_epoll_set; }; -static void aio_event_loop_timer(struct aio_event_context *aio_ev); +struct aio_event { + struct event_context *event_ctx; + struct iocb iocb; + void *private_data; + event_aio_handler_t handler; +}; /* map from EVENT_FD_* to EPOLLIN/EPOLLOUT @@ -82,6 +86,7 @@ static uint32_t epoll_map_flags(uint16_t flags) */ static int aio_ctx_destructor(struct aio_event_context *aio_ev) { + io_queue_release(aio_ev->ioctx); close(aio_ev->epoll_fd); aio_ev->epoll_fd = -1; return 0; @@ -187,20 +192,24 @@ static void epoll_change_event(struct aio_event_context *aio_ev, struct fd_event static int setup_epoll_wait(struct aio_event_context *aio_ev) { - struct io_event r; - + if (aio_ev->is_epoll_set) { + return 0; + } memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb)); aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd; aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT; aio_ev->epoll_iocb->aio_reqprio = 0; - aio_ev->epoll_iocb->u.c.nbytes = 1; + aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH; aio_ev->epoll_iocb->u.c.offset = -1; - aio_ev->epoll_iocb->u.c.buf = &aio_ev->epevent; + aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent; + aio_ev->is_epoll_set = 1; if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) { return -1; } + + return 0; } @@ -212,6 +221,7 @@ static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tval int ret, i; uint32_t destruction_count = aio_ev->destruction_count; struct timespec timeout; + struct io_event events[8]; if (aio_ev->epoll_fd == -1) return -1; @@ -221,50 +231,74 @@ static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tval timeout.tv_nsec *= 1000; } - setup_epoll_wait(aio_ev); + if (setup_epoll_wait(aio_ev) < 0) + return -1; - ret = io_getevents(aio_ev->ioctx, 1, MAX_AIO_QUEUE_DEPTH, - aio_ev->events, tvalp?&timeout:NULL); + ret = io_getevents(aio_ev->ioctx, 1, 8, + events, tvalp?&timeout:NULL); if (ret == -EINTR) { return 0; } if (ret == 0 && tvalp) { - aio_event_loop_timer(aio_ev); + common_event_loop_timer(aio_ev->ev); return 0; } for (i=0;i<ret;i++) { - struct iocb *finished = aio_ev->events[i].obj; + struct io_event *event = &events[i]; + struct iocb *finished = event->obj; + switch (finished->aio_lio_opcode) { + case IO_CMD_PWRITE: + case IO_CMD_PREAD: { + struct aio_event *ae = talloc_get_type(finished->data, + struct aio_event); + if (ae) { + talloc_set_destructor(ae, NULL); + ae->handler(ae->event_ctx, ae, + event->res, ae->private_data); + talloc_free(ae); + } + break; + } case IOCB_CMD_EPOLL_WAIT: { struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf; - struct fd_event *fde = talloc_get_type(ep->data.ptr, - struct fd_event); + struct fd_event *fde; uint16_t flags = 0; + int j; - if (fde == NULL) { - return -1; - } - if (ep->events & (EPOLLHUP|EPOLLERR)) { - fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR; - if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) { - epoll_del_event(aio_ev, fde); - continue; +// DEBUG(0,("EVENT finished=%p fde=%p ep=%p\n", finished, fde, ep)); + //printf("GOT %d aio epoll events\n", event->res); + + aio_ev->is_epoll_set = 0; + + for (j=0; j<event->res; j++, ep++) { + fde = talloc_get_type(ep->data.ptr, + struct fd_event); + if (fde == NULL) { + return -1; } - flags |= EVENT_FD_READ; - } - if (ep->events & EPOLLIN) flags |= EVENT_FD_READ; - if (ep->events & EPOLLOUT) flags |= EVENT_FD_WRITE; - if (flags) { - fde->handler(aio_ev->ev, fde, flags, fde->private_data); - if (destruction_count != aio_ev->destruction_count) { - return 0; + if (ep->events & (EPOLLHUP|EPOLLERR)) { + fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR; + if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) { + epoll_del_event(aio_ev, fde); + continue; + } + flags |= EVENT_FD_READ; + } + if (ep->events & EPOLLIN) flags |= EVENT_FD_READ; + if (ep->events & EPOLLOUT) flags |= EVENT_FD_WRITE; + if (flags) { + fde->handler(aio_ev->ev, fde, flags, fde->private_data); } } break; } } + if (destruction_count != aio_ev->destruction_count) { + return 0; + } } return 0; @@ -273,7 +307,7 @@ static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tval /* create a aio_event_context structure. */ -static int aio_event_context_init(struct event_context *ev, void *private_data) +static int aio_event_context_init(struct event_context *ev) { struct aio_event_context *aio_ev; @@ -373,93 +407,6 @@ static void aio_event_set_fd_flags(struct fd_event *fde, uint16_t flags) } /* - destroy a timed event -*/ -static int aio_event_timed_destructor(struct timed_event *te) -{ - struct aio_event_context *aio_ev = talloc_get_type(te->event_ctx->additional_data, - struct aio_event_context); - DLIST_REMOVE(aio_ev->timed_events, te); - return 0; -} - -static int aio_event_timed_deny_destructor(struct timed_event *te) -{ - return -1; -} - -/* - add a timed event - return NULL on failure (memory allocation error) -*/ -static struct timed_event *aio_event_add_timed(struct event_context *ev, TALLOC_CTX *mem_ctx, - struct timeval next_event, - event_timed_handler_t handler, - void *private_data) -{ - struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data, - struct aio_event_context); - struct timed_event *te, *last_te, *cur_te; - - te = talloc(mem_ctx?mem_ctx:ev, struct timed_event); - if (te == NULL) return NULL; - - te->event_ctx = ev; - te->next_event = next_event; - te->handler = handler; - te->private_data = private_data; - te->additional_data = NULL; - - /* keep the list ordered */ - last_te = NULL; - for (cur_te = aio_ev->timed_events; cur_te; cur_te = cur_te->next) { - /* if the new event comes before the current one break */ - if (!timeval_is_zero(&cur_te->next_event) && - timeval_compare(&te->next_event, - &cur_te->next_event) < 0) { - break; - } - - last_te = cur_te; - } - - DLIST_ADD_AFTER(aio_ev->timed_events, te, last_te); - - talloc_set_destructor(te, aio_event_timed_destructor); - - return te; -} - -/* - a timer has gone off - call it -*/ -static void aio_event_loop_timer(struct aio_event_context *aio_ev) -{ - struct timeval t = timeval_current(); - struct timed_event *te = aio_ev->timed_events; - - if (te == NULL) { - return; - } - - /* deny the handler to free the event */ - talloc_set_destructor(te, aio_event_timed_deny_destructor); - - /* We need to remove the timer from the list before calling the - * handler because in a semi-async inner event loop called from the - * handler we don't want to come across this event again -- vl */ - DLIST_REMOVE(aio_ev->timed_events, te); - - te->handler(aio_ev->ev, te, t, te->private_data); - - /* The destructor isn't necessary anymore, we've already removed the - * event from the list. */ - talloc_set_destructor(te, NULL); - - talloc_free(te); -} - -/* do a single event loop using the events defined in ev */ static int aio_event_loop_once(struct event_context *ev) @@ -468,19 +415,11 @@ static int aio_event_loop_once(struct event_context *ev) struct aio_event_context); struct timeval tval; - /* work out the right timeout for all timed events */ - if (aio_ev->timed_events) { - struct timeval t = timeval_current(); - tval = timeval_until(&t, &aio_ev->timed_events->next_event); - if (timeval_is_zero(&tval)) { - aio_event_loop_timer(aio_ev); - return 0; - } - } else { - /* have a default tick time of 30 seconds. This guarantees - that code that uses its own timeout checking will be - able to proceeed eventually */ - tval = timeval_set(30, 0); + tval = common_event_loop_delay(ev); + + if (timeval_is_zero(&tval)) { + common_event_loop_timer(ev); + return 0; } return aio_event_loop(aio_ev, &tval); @@ -502,17 +441,61 @@ static int aio_event_loop_wait(struct event_context *ev) return 0; } +/* + called when a disk IO event needs to be cancelled +*/ +static int aio_destructor(struct aio_event *ae) +{ + struct event_context *ev = ae->event_ctx; + struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data, + struct aio_event_context); + struct io_event result; + io_cancel(aio_ev->ioctx, &ae->iocb, &result); + /* TODO: handle errors from io_cancel()! */ + return 0; +} + +/* submit an aio disk IO event */ +static struct aio_event *aio_event_add_aio(struct event_context *ev, + TALLOC_CTX *mem_ctx, + struct iocb *iocb, + event_aio_handler_t handler, + void *private_data) +{ + struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data, + struct aio_event_context); + struct iocb *iocbp; + struct aio_event *ae = talloc(mem_ctx?mem_ctx:ev, struct aio_event); + if (ae == NULL) return NULL; + + ae->event_ctx = ev; + ae->iocb = *iocb; + ae->handler = handler; + ae->private_data = private_data; + iocbp = &ae->iocb; + + if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) { + talloc_free(ae); + return NULL; + } + ae->iocb.data = ae; + talloc_set_destructor(ae, aio_destructor); + + return ae; +} + static const struct event_ops aio_event_ops = { .context_init = aio_event_context_init, .add_fd = aio_event_add_fd, + .add_aio = aio_event_add_aio, .get_fd_flags = aio_event_get_fd_flags, .set_fd_flags = aio_event_set_fd_flags, - .add_timed = aio_event_add_timed, + .add_timed = common_event_add_timed, .loop_once = aio_event_loop_once, .loop_wait = aio_event_loop_wait, }; -const struct event_ops *event_aio_get_ops(void) +NTSTATUS events_aio_init(void) { - return &aio_event_ops; + return event_register_backend("aio", &aio_event_ops); } |