diff options
author | Volker Lendecke <vl@samba.org> | 2012-07-29 13:05:36 +0200 |
---|---|---|
committer | Stefan Metzmacher <metze@samba.org> | 2012-08-16 20:48:32 +0200 |
commit | d860aa2cacc783434973c14f7ae21964ca050e6f (patch) | |
tree | d664a95f270552b593eb15ab22600c88cb80fa6a | |
parent | cbe25105c7fc51b178bf4df217812ccf48c472a1 (diff) | |
download | samba-d860aa2cacc783434973c14f7ae21964ca050e6f.tar.gz samba-d860aa2cacc783434973c14f7ae21964ca050e6f.tar.bz2 samba-d860aa2cacc783434973c14f7ae21964ca050e6f.zip |
tevent_poll: Decouple poll_ev->fds handling from adding/removing fds
Step 1 in a python backend for multiple threads
Signed-off-by: Stefan Metzmacher <metze@samba.org>
-rw-r--r-- | lib/tevent/tevent_poll.c | 207 |
1 files changed, 135 insertions, 72 deletions
diff --git a/lib/tevent/tevent_poll.c b/lib/tevent/tevent_poll.c index 7ae3c42188..2639143998 100644 --- a/lib/tevent/tevent_poll.c +++ b/lib/tevent/tevent_poll.c @@ -34,11 +34,16 @@ struct poll_event_context { struct tevent_context *ev; /* + * A DLIST for fresh fde's + */ + struct tevent_fd *fresh; + + /* * These two arrays are maintained together. */ struct pollfd *fds; - struct tevent_fd **fd_events; - uint64_t num_fds; + struct tevent_fd **fdes; + unsigned num_fds; /* information for exiting from the event loop */ int exit_code; @@ -67,7 +72,6 @@ static int poll_event_fd_destructor(struct tevent_fd *fde) { struct tevent_context *ev = fde->event_ctx; struct poll_event_context *poll_ev = NULL; - struct tevent_fd *moved_fde; uint64_t del_idx = fde->additional_flags; if (ev == NULL) { @@ -77,16 +81,19 @@ static int poll_event_fd_destructor(struct tevent_fd *fde) poll_ev = talloc_get_type_abort( ev->additional_data, struct poll_event_context); - moved_fde = poll_ev->fd_events[poll_ev->num_fds-1]; - poll_ev->fd_events[del_idx] = moved_fde; - poll_ev->fds[del_idx] = poll_ev->fds[poll_ev->num_fds-1]; - moved_fde->additional_flags = del_idx; - - poll_ev->num_fds -= 1; + poll_ev->fdes[del_idx] = NULL; done: return tevent_common_fd_destructor(fde); } +static int poll_fresh_fde_destructor(struct tevent_fd *fde) +{ + struct poll_event_context *poll_ev = talloc_get_type_abort( + fde->event_ctx->additional_data, struct poll_event_context); + DLIST_REMOVE(poll_ev->fresh, fde); + return 0; +} + /* add a fd based event return NULL on failure (memory allocation error) @@ -101,60 +108,34 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev, { struct poll_event_context *poll_ev = talloc_get_type_abort( ev->additional_data, struct poll_event_context); - struct pollfd *pfd; struct tevent_fd *fde; - fde = tevent_common_add_fd(ev, mem_ctx, fd, flags, - handler, private_data, - handler_name, location); - if (fde == NULL) { + if (fd < 0) { return NULL; } - /* we allocate 16 slots to avoid a lot of reallocations */ - if (talloc_array_length(poll_ev->fds) == poll_ev->num_fds) { - struct pollfd *tmp_fds; - struct tevent_fd **tmp_fd_events; - tmp_fds = talloc_realloc( - poll_ev, poll_ev->fds, struct pollfd, - poll_ev->num_fds + 16); - if (tmp_fds == NULL) { - TALLOC_FREE(fde); - return NULL; - } - poll_ev->fds = tmp_fds; - - tmp_fd_events = talloc_realloc( - poll_ev, poll_ev->fd_events, struct tevent_fd *, - poll_ev->num_fds + 16); - if (tmp_fd_events == NULL) { - TALLOC_FREE(fde); - return NULL; - } - poll_ev->fd_events = tmp_fd_events; - } - - pfd = &poll_ev->fds[poll_ev->num_fds]; - - pfd->fd = fd; - - pfd->events = 0; - pfd->revents = 0; - - if (flags & TEVENT_FD_READ) { - pfd->events |= (POLLIN|POLLHUP); - } - if (flags & TEVENT_FD_WRITE) { - pfd->events |= (POLLOUT); + fde = talloc(mem_ctx ? mem_ctx : ev, struct tevent_fd); + if (fde == NULL) { + return NULL; } + fde->event_ctx = ev; + fde->fd = fd; + fde->flags = flags; + fde->handler = handler; + fde->close_fn = NULL; + fde->private_data = private_data; + fde->handler_name = handler_name; + fde->location = location; + fde->additional_flags = 0; + fde->additional_data = NULL; + + DLIST_ADD(poll_ev->fresh, fde); + talloc_set_destructor(fde, poll_fresh_fde_destructor); - fde->additional_flags = poll_ev->num_fds; - poll_ev->fd_events[poll_ev->num_fds] = fde; - - poll_ev->num_fds += 1; - - talloc_set_destructor(fde, poll_event_fd_destructor); - + /* + * poll_event_loop_poll will take care of the rest in + * poll_event_setup_fresh + */ return fde; } @@ -180,6 +161,81 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags) fde->flags = flags; } +static bool poll_event_setup_fresh(struct tevent_context *ev, + struct poll_event_context *poll_ev) +{ + struct tevent_fd *fde, *next; + unsigned num_fresh, num_fds; + + if (poll_ev->fresh == NULL) { + return true; + } + + num_fresh = 0; + for (fde = poll_ev->fresh; fde; fde = fde->next) { + num_fresh += 1; + } + num_fds = poll_ev->num_fds + num_fresh; + + /* + * We check the length of fdes here. It is the last one + * enlarged, so if the realloc for poll_fd->fdes fails, + * poll_fd->fds will have at least the size of poll_fd->fdes + */ + + if (num_fds >= talloc_array_length(poll_ev->fdes)) { + struct pollfd *tmp_fds; + struct tevent_fd **tmp_fdes; + unsigned array_length; + + array_length = (num_fds + 15) & ~15; /* round up to 16 */ + + tmp_fds = talloc_realloc( + poll_ev, poll_ev->fds, struct pollfd, array_length); + if (tmp_fds == NULL) { + return false; + } + poll_ev->fds = tmp_fds; + + tmp_fdes = talloc_realloc( + poll_ev, poll_ev->fdes, struct tevent_fd *, + array_length); + if (tmp_fdes == NULL) { + return false; + } + poll_ev->fdes = tmp_fdes; + } + + for (fde = poll_ev->fresh; fde; fde = next) { + struct pollfd *pfd; + + pfd = &poll_ev->fds[poll_ev->num_fds]; + + pfd->fd = fde->fd; + pfd->events = 0; + pfd->revents = 0; + + if (fde->flags & TEVENT_FD_READ) { + pfd->events |= (POLLIN|POLLHUP); + } + if (fde->flags & TEVENT_FD_WRITE) { + pfd->events |= (POLLOUT); + } + + fde->additional_flags = poll_ev->num_fds; + poll_ev->fdes[poll_ev->num_fds] = fde; + + next = fde->next; + DLIST_REMOVE(poll_ev->fresh, fde); + DLIST_ADD(ev->fd_events, fde); + + talloc_set_destructor(fde, poll_event_fd_destructor); + + poll_ev->num_fds += 1; + } + return true; +} + /* event loop handling using poll() */ @@ -188,9 +244,9 @@ static int poll_event_loop_poll(struct tevent_context *ev, { struct poll_event_context *poll_ev = talloc_get_type_abort( ev->additional_data, struct poll_event_context); - struct tevent_fd *fde; int pollrtn; int timeout = -1; + unsigned i; if (ev->signal_events && tevent_common_check_signal(ev)) { return 0; @@ -201,6 +257,10 @@ static int poll_event_loop_poll(struct tevent_context *ev, timeout += (tvalp->tv_usec + 999) / 1000; } + if (!poll_event_setup_fresh(ev, poll_ev)) { + return -1; + } + tevent_trace_point_callback(poll_ev->ev, TEVENT_TRACE_BEFORE_WAIT); pollrtn = poll(poll_ev->fds, poll_ev->num_fds, timeout); tevent_trace_point_callback(poll_ev->ev, TEVENT_TRACE_AFTER_WAIT); @@ -210,18 +270,6 @@ static int poll_event_loop_poll(struct tevent_context *ev, return 0; } - if (pollrtn == -1 && errno == EBADF) { - /* the socket is dead! this should never - happen as the socket should have first been - made readable and that should have removed - the event, so this must be a bug. This is a - fatal error. */ - tevent_debug(ev, TEVENT_DEBUG_FATAL, - "ERROR: EBADF on poll_event_loop_once\n"); - poll_ev->exit_code = EBADF; - return -1; - } - if (pollrtn == 0 && tvalp) { /* we don't care about a possible delay here */ tevent_common_loop_timer_delay(ev); @@ -239,12 +287,27 @@ static int poll_event_loop_poll(struct tevent_context *ev, which ones and call the handler, being careful to allow the handler to remove itself when called */ - for (fde = ev->fd_events; fde; fde = fde->next) { + for (i=0; i<poll_ev->num_fds; i++) { struct pollfd *pfd; - uint64_t pfd_idx = fde->additional_flags; + struct tevent_fd *fde; uint16_t flags = 0; - pfd = &poll_ev->fds[pfd_idx]; + fde = poll_ev->fdes[i]; + if (fde == NULL) { + /* + * This fde was talloc_free()'ed. Delete it + * from the arrays + */ + poll_ev->num_fds -= 1; + poll_ev->fds[i] = poll_ev->fds[poll_ev->num_fds]; + poll_ev->fdes[i] = poll_ev->fdes[poll_ev->num_fds]; + if (poll_ev->fdes[i] != NULL) { + poll_ev->fdes[i]->additional_flags = i; + } + continue; + } + + pfd = &poll_ev->fds[i]; if (pfd->revents & (POLLHUP|POLLERR)) { /* If we only wait for TEVENT_FD_WRITE, we |