summaryrefslogtreecommitdiff
path: root/source4/lib/events/events_aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'source4/lib/events/events_aio.c')
-rw-r--r--source4/lib/events/events_aio.c259
1 files changed, 121 insertions, 138 deletions
diff --git a/source4/lib/events/events_aio.c b/source4/lib/events/events_aio.c
index 97305fdeb3..1c2735c200 100644
--- a/source4/lib/events/events_aio.c
+++ b/source4/lib/events/events_aio.c
@@ -39,8 +39,10 @@
#include "lib/events/events_internal.h"
#include <libaio.h>
-#define MAX_AIO_QUEUE_DEPTH 10
+#define MAX_AIO_QUEUE_DEPTH 100
+#ifndef IOCB_CMD_EPOLL_WAIT
#define IOCB_CMD_EPOLL_WAIT 9
+#endif
struct aio_event_context {
/* a pointer back to the generic event_context */
@@ -49,22 +51,24 @@ struct aio_event_context {
/* number of registered fd event handlers */
int num_fd_events;
- /* list of timed events */
- struct timed_event *timed_events;
-
uint32_t destruction_count;
io_context_t ioctx;
- struct io_event events[MAX_AIO_QUEUE_DEPTH];
- struct epoll_event epevent;
+ struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH];
struct iocb *epoll_iocb;
int epoll_fd;
+ int is_epoll_set;
};
-static void aio_event_loop_timer(struct aio_event_context *aio_ev);
+struct aio_event {
+ struct event_context *event_ctx;
+ struct iocb iocb;
+ void *private_data;
+ event_aio_handler_t handler;
+};
/*
map from EVENT_FD_* to EPOLLIN/EPOLLOUT
@@ -82,6 +86,7 @@ static uint32_t epoll_map_flags(uint16_t flags)
*/
static int aio_ctx_destructor(struct aio_event_context *aio_ev)
{
+ io_queue_release(aio_ev->ioctx);
close(aio_ev->epoll_fd);
aio_ev->epoll_fd = -1;
return 0;
@@ -187,20 +192,24 @@ static void epoll_change_event(struct aio_event_context *aio_ev, struct fd_event
static int setup_epoll_wait(struct aio_event_context *aio_ev)
{
- struct io_event r;
-
+ if (aio_ev->is_epoll_set) {
+ return 0;
+ }
memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
aio_ev->epoll_iocb->aio_reqprio = 0;
- aio_ev->epoll_iocb->u.c.nbytes = 1;
+ aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH;
aio_ev->epoll_iocb->u.c.offset = -1;
- aio_ev->epoll_iocb->u.c.buf = &aio_ev->epevent;
+ aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent;
+ aio_ev->is_epoll_set = 1;
if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
return -1;
}
+
+ return 0;
}
@@ -212,6 +221,7 @@ static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tval
int ret, i;
uint32_t destruction_count = aio_ev->destruction_count;
struct timespec timeout;
+ struct io_event events[8];
if (aio_ev->epoll_fd == -1) return -1;
@@ -221,50 +231,74 @@ static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tval
timeout.tv_nsec *= 1000;
}
- setup_epoll_wait(aio_ev);
+ if (setup_epoll_wait(aio_ev) < 0)
+ return -1;
- ret = io_getevents(aio_ev->ioctx, 1, MAX_AIO_QUEUE_DEPTH,
- aio_ev->events, tvalp?&timeout:NULL);
+ ret = io_getevents(aio_ev->ioctx, 1, 8,
+ events, tvalp?&timeout:NULL);
if (ret == -EINTR) {
return 0;
}
if (ret == 0 && tvalp) {
- aio_event_loop_timer(aio_ev);
+ common_event_loop_timer(aio_ev->ev);
return 0;
}
for (i=0;i<ret;i++) {
- struct iocb *finished = aio_ev->events[i].obj;
+ struct io_event *event = &events[i];
+ struct iocb *finished = event->obj;
+
switch (finished->aio_lio_opcode) {
+ case IO_CMD_PWRITE:
+ case IO_CMD_PREAD: {
+ struct aio_event *ae = talloc_get_type(finished->data,
+ struct aio_event);
+ if (ae) {
+ talloc_set_destructor(ae, NULL);
+ ae->handler(ae->event_ctx, ae,
+ event->res, ae->private_data);
+ talloc_free(ae);
+ }
+ break;
+ }
case IOCB_CMD_EPOLL_WAIT: {
struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
- struct fd_event *fde = talloc_get_type(ep->data.ptr,
- struct fd_event);
+ struct fd_event *fde;
uint16_t flags = 0;
+ int j;
- if (fde == NULL) {
- return -1;
- }
- if (ep->events & (EPOLLHUP|EPOLLERR)) {
- fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
- if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
- epoll_del_event(aio_ev, fde);
- continue;
+// DEBUG(0,("EVENT finished=%p fde=%p ep=%p\n", finished, fde, ep));
+ //printf("GOT %d aio epoll events\n", event->res);
+
+ aio_ev->is_epoll_set = 0;
+
+ for (j=0; j<event->res; j++, ep++) {
+ fde = talloc_get_type(ep->data.ptr,
+ struct fd_event);
+ if (fde == NULL) {
+ return -1;
}
- flags |= EVENT_FD_READ;
- }
- if (ep->events & EPOLLIN) flags |= EVENT_FD_READ;
- if (ep->events & EPOLLOUT) flags |= EVENT_FD_WRITE;
- if (flags) {
- fde->handler(aio_ev->ev, fde, flags, fde->private_data);
- if (destruction_count != aio_ev->destruction_count) {
- return 0;
+ if (ep->events & (EPOLLHUP|EPOLLERR)) {
+ fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
+ if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
+ epoll_del_event(aio_ev, fde);
+ continue;
+ }
+ flags |= EVENT_FD_READ;
+ }
+ if (ep->events & EPOLLIN) flags |= EVENT_FD_READ;
+ if (ep->events & EPOLLOUT) flags |= EVENT_FD_WRITE;
+ if (flags) {
+ fde->handler(aio_ev->ev, fde, flags, fde->private_data);
}
}
break;
}
}
+ if (destruction_count != aio_ev->destruction_count) {
+ return 0;
+ }
}
return 0;
@@ -273,7 +307,7 @@ static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tval
/*
create a aio_event_context structure.
*/
-static int aio_event_context_init(struct event_context *ev, void *private_data)
+static int aio_event_context_init(struct event_context *ev)
{
struct aio_event_context *aio_ev;
@@ -373,93 +407,6 @@ static void aio_event_set_fd_flags(struct fd_event *fde, uint16_t flags)
}
/*
- destroy a timed event
-*/
-static int aio_event_timed_destructor(struct timed_event *te)
-{
- struct aio_event_context *aio_ev = talloc_get_type(te->event_ctx->additional_data,
- struct aio_event_context);
- DLIST_REMOVE(aio_ev->timed_events, te);
- return 0;
-}
-
-static int aio_event_timed_deny_destructor(struct timed_event *te)
-{
- return -1;
-}
-
-/*
- add a timed event
- return NULL on failure (memory allocation error)
-*/
-static struct timed_event *aio_event_add_timed(struct event_context *ev, TALLOC_CTX *mem_ctx,
- struct timeval next_event,
- event_timed_handler_t handler,
- void *private_data)
-{
- struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
- struct aio_event_context);
- struct timed_event *te, *last_te, *cur_te;
-
- te = talloc(mem_ctx?mem_ctx:ev, struct timed_event);
- if (te == NULL) return NULL;
-
- te->event_ctx = ev;
- te->next_event = next_event;
- te->handler = handler;
- te->private_data = private_data;
- te->additional_data = NULL;
-
- /* keep the list ordered */
- last_te = NULL;
- for (cur_te = aio_ev->timed_events; cur_te; cur_te = cur_te->next) {
- /* if the new event comes before the current one break */
- if (!timeval_is_zero(&cur_te->next_event) &&
- timeval_compare(&te->next_event,
- &cur_te->next_event) < 0) {
- break;
- }
-
- last_te = cur_te;
- }
-
- DLIST_ADD_AFTER(aio_ev->timed_events, te, last_te);
-
- talloc_set_destructor(te, aio_event_timed_destructor);
-
- return te;
-}
-
-/*
- a timer has gone off - call it
-*/
-static void aio_event_loop_timer(struct aio_event_context *aio_ev)
-{
- struct timeval t = timeval_current();
- struct timed_event *te = aio_ev->timed_events;
-
- if (te == NULL) {
- return;
- }
-
- /* deny the handler to free the event */
- talloc_set_destructor(te, aio_event_timed_deny_destructor);
-
- /* We need to remove the timer from the list before calling the
- * handler because in a semi-async inner event loop called from the
- * handler we don't want to come across this event again -- vl */
- DLIST_REMOVE(aio_ev->timed_events, te);
-
- te->handler(aio_ev->ev, te, t, te->private_data);
-
- /* The destructor isn't necessary anymore, we've already removed the
- * event from the list. */
- talloc_set_destructor(te, NULL);
-
- talloc_free(te);
-}
-
-/*
do a single event loop using the events defined in ev
*/
static int aio_event_loop_once(struct event_context *ev)
@@ -468,19 +415,11 @@ static int aio_event_loop_once(struct event_context *ev)
struct aio_event_context);
struct timeval tval;
- /* work out the right timeout for all timed events */
- if (aio_ev->timed_events) {
- struct timeval t = timeval_current();
- tval = timeval_until(&t, &aio_ev->timed_events->next_event);
- if (timeval_is_zero(&tval)) {
- aio_event_loop_timer(aio_ev);
- return 0;
- }
- } else {
- /* have a default tick time of 30 seconds. This guarantees
- that code that uses its own timeout checking will be
- able to proceeed eventually */
- tval = timeval_set(30, 0);
+ tval = common_event_loop_delay(ev);
+
+ if (timeval_is_zero(&tval)) {
+ common_event_loop_timer(ev);
+ return 0;
}
return aio_event_loop(aio_ev, &tval);
@@ -502,17 +441,61 @@ static int aio_event_loop_wait(struct event_context *ev)
return 0;
}
+/*
+ called when a disk IO event needs to be cancelled
+*/
+static int aio_destructor(struct aio_event *ae)
+{
+ struct event_context *ev = ae->event_ctx;
+ struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+ struct aio_event_context);
+ struct io_event result;
+ io_cancel(aio_ev->ioctx, &ae->iocb, &result);
+ /* TODO: handle errors from io_cancel()! */
+ return 0;
+}
+
+/* submit an aio disk IO event */
+static struct aio_event *aio_event_add_aio(struct event_context *ev,
+ TALLOC_CTX *mem_ctx,
+ struct iocb *iocb,
+ event_aio_handler_t handler,
+ void *private_data)
+{
+ struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+ struct aio_event_context);
+ struct iocb *iocbp;
+ struct aio_event *ae = talloc(mem_ctx?mem_ctx:ev, struct aio_event);
+ if (ae == NULL) return NULL;
+
+ ae->event_ctx = ev;
+ ae->iocb = *iocb;
+ ae->handler = handler;
+ ae->private_data = private_data;
+ iocbp = &ae->iocb;
+
+ if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) {
+ talloc_free(ae);
+ return NULL;
+ }
+ ae->iocb.data = ae;
+ talloc_set_destructor(ae, aio_destructor);
+
+ return ae;
+}
+
static const struct event_ops aio_event_ops = {
.context_init = aio_event_context_init,
.add_fd = aio_event_add_fd,
+ .add_aio = aio_event_add_aio,
.get_fd_flags = aio_event_get_fd_flags,
.set_fd_flags = aio_event_set_fd_flags,
- .add_timed = aio_event_add_timed,
+ .add_timed = common_event_add_timed,
.loop_once = aio_event_loop_once,
.loop_wait = aio_event_loop_wait,
};
-const struct event_ops *event_aio_get_ops(void)
+NTSTATUS events_aio_init(void)
{
- return &aio_event_ops;
+ return event_register_backend("aio", &aio_event_ops);
}