summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVolker Lendecke <vl@samba.org>2012-07-29 13:05:36 +0200
committerStefan Metzmacher <metze@samba.org>2012-08-16 20:48:32 +0200
commitd860aa2cacc783434973c14f7ae21964ca050e6f (patch)
treed664a95f270552b593eb15ab22600c88cb80fa6a
parentcbe25105c7fc51b178bf4df217812ccf48c472a1 (diff)
downloadsamba-d860aa2cacc783434973c14f7ae21964ca050e6f.tar.gz
samba-d860aa2cacc783434973c14f7ae21964ca050e6f.tar.bz2
samba-d860aa2cacc783434973c14f7ae21964ca050e6f.zip
tevent_poll: Decouple poll_ev->fds handling from adding/removing fds
Step 1 in a python backend for multiple threads Signed-off-by: Stefan Metzmacher <metze@samba.org>
-rw-r--r--lib/tevent/tevent_poll.c207
1 files changed, 135 insertions, 72 deletions
diff --git a/lib/tevent/tevent_poll.c b/lib/tevent/tevent_poll.c
index 7ae3c42188..2639143998 100644
--- a/lib/tevent/tevent_poll.c
+++ b/lib/tevent/tevent_poll.c
@@ -34,11 +34,16 @@ struct poll_event_context {
struct tevent_context *ev;
/*
+ * A DLIST for fresh fde's
+ */
+ struct tevent_fd *fresh;
+
+ /*
* These two arrays are maintained together.
*/
struct pollfd *fds;
- struct tevent_fd **fd_events;
- uint64_t num_fds;
+ struct tevent_fd **fdes;
+ unsigned num_fds;
/* information for exiting from the event loop */
int exit_code;
@@ -67,7 +72,6 @@ static int poll_event_fd_destructor(struct tevent_fd *fde)
{
struct tevent_context *ev = fde->event_ctx;
struct poll_event_context *poll_ev = NULL;
- struct tevent_fd *moved_fde;
uint64_t del_idx = fde->additional_flags;
if (ev == NULL) {
@@ -77,16 +81,19 @@ static int poll_event_fd_destructor(struct tevent_fd *fde)
poll_ev = talloc_get_type_abort(
ev->additional_data, struct poll_event_context);
- moved_fde = poll_ev->fd_events[poll_ev->num_fds-1];
- poll_ev->fd_events[del_idx] = moved_fde;
- poll_ev->fds[del_idx] = poll_ev->fds[poll_ev->num_fds-1];
- moved_fde->additional_flags = del_idx;
-
- poll_ev->num_fds -= 1;
+ poll_ev->fdes[del_idx] = NULL;
done:
return tevent_common_fd_destructor(fde);
}
+static int poll_fresh_fde_destructor(struct tevent_fd *fde)
+{
+ struct poll_event_context *poll_ev = talloc_get_type_abort(
+ fde->event_ctx->additional_data, struct poll_event_context);
+ DLIST_REMOVE(poll_ev->fresh, fde);
+ return 0;
+}
+
/*
add a fd based event
return NULL on failure (memory allocation error)
@@ -101,60 +108,34 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev,
{
struct poll_event_context *poll_ev = talloc_get_type_abort(
ev->additional_data, struct poll_event_context);
- struct pollfd *pfd;
struct tevent_fd *fde;
- fde = tevent_common_add_fd(ev, mem_ctx, fd, flags,
- handler, private_data,
- handler_name, location);
- if (fde == NULL) {
+ if (fd < 0) {
return NULL;
}
- /* we allocate 16 slots to avoid a lot of reallocations */
- if (talloc_array_length(poll_ev->fds) == poll_ev->num_fds) {
- struct pollfd *tmp_fds;
- struct tevent_fd **tmp_fd_events;
- tmp_fds = talloc_realloc(
- poll_ev, poll_ev->fds, struct pollfd,
- poll_ev->num_fds + 16);
- if (tmp_fds == NULL) {
- TALLOC_FREE(fde);
- return NULL;
- }
- poll_ev->fds = tmp_fds;
-
- tmp_fd_events = talloc_realloc(
- poll_ev, poll_ev->fd_events, struct tevent_fd *,
- poll_ev->num_fds + 16);
- if (tmp_fd_events == NULL) {
- TALLOC_FREE(fde);
- return NULL;
- }
- poll_ev->fd_events = tmp_fd_events;
- }
-
- pfd = &poll_ev->fds[poll_ev->num_fds];
-
- pfd->fd = fd;
-
- pfd->events = 0;
- pfd->revents = 0;
-
- if (flags & TEVENT_FD_READ) {
- pfd->events |= (POLLIN|POLLHUP);
- }
- if (flags & TEVENT_FD_WRITE) {
- pfd->events |= (POLLOUT);
+ fde = talloc(mem_ctx ? mem_ctx : ev, struct tevent_fd);
+ if (fde == NULL) {
+ return NULL;
}
+ fde->event_ctx = ev;
+ fde->fd = fd;
+ fde->flags = flags;
+ fde->handler = handler;
+ fde->close_fn = NULL;
+ fde->private_data = private_data;
+ fde->handler_name = handler_name;
+ fde->location = location;
+ fde->additional_flags = 0;
+ fde->additional_data = NULL;
+
+ DLIST_ADD(poll_ev->fresh, fde);
+ talloc_set_destructor(fde, poll_fresh_fde_destructor);
- fde->additional_flags = poll_ev->num_fds;
- poll_ev->fd_events[poll_ev->num_fds] = fde;
-
- poll_ev->num_fds += 1;
-
- talloc_set_destructor(fde, poll_event_fd_destructor);
-
+ /*
+ * poll_event_loop_poll will take care of the rest in
+ * poll_event_setup_fresh
+ */
return fde;
}
@@ -180,6 +161,81 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
fde->flags = flags;
}
+static bool poll_event_setup_fresh(struct tevent_context *ev,
+ struct poll_event_context *poll_ev)
+{
+ struct tevent_fd *fde, *next;
+ unsigned num_fresh, num_fds;
+
+ if (poll_ev->fresh == NULL) {
+ return true;
+ }
+
+ num_fresh = 0;
+ for (fde = poll_ev->fresh; fde; fde = fde->next) {
+ num_fresh += 1;
+ }
+ num_fds = poll_ev->num_fds + num_fresh;
+
+ /*
+ * We check the length of fdes here. It is the last one
+ * enlarged, so if the realloc for poll_fd->fdes fails,
+ * poll_fd->fds will have at least the size of poll_fd->fdes
+ */
+
+ if (num_fds >= talloc_array_length(poll_ev->fdes)) {
+ struct pollfd *tmp_fds;
+ struct tevent_fd **tmp_fdes;
+ unsigned array_length;
+
+ array_length = (num_fds + 15) & ~15; /* round up to 16 */
+
+ tmp_fds = talloc_realloc(
+ poll_ev, poll_ev->fds, struct pollfd, array_length);
+ if (tmp_fds == NULL) {
+ return false;
+ }
+ poll_ev->fds = tmp_fds;
+
+ tmp_fdes = talloc_realloc(
+ poll_ev, poll_ev->fdes, struct tevent_fd *,
+ array_length);
+ if (tmp_fdes == NULL) {
+ return false;
+ }
+ poll_ev->fdes = tmp_fdes;
+ }
+
+ for (fde = poll_ev->fresh; fde; fde = next) {
+ struct pollfd *pfd;
+
+ pfd = &poll_ev->fds[poll_ev->num_fds];
+
+ pfd->fd = fde->fd;
+ pfd->events = 0;
+ pfd->revents = 0;
+
+ if (fde->flags & TEVENT_FD_READ) {
+ pfd->events |= (POLLIN|POLLHUP);
+ }
+ if (fde->flags & TEVENT_FD_WRITE) {
+ pfd->events |= (POLLOUT);
+ }
+
+ fde->additional_flags = poll_ev->num_fds;
+ poll_ev->fdes[poll_ev->num_fds] = fde;
+
+ next = fde->next;
+ DLIST_REMOVE(poll_ev->fresh, fde);
+ DLIST_ADD(ev->fd_events, fde);
+
+ talloc_set_destructor(fde, poll_event_fd_destructor);
+
+ poll_ev->num_fds += 1;
+ }
+ return true;
+}
+
/*
event loop handling using poll()
*/
@@ -188,9 +244,9 @@ static int poll_event_loop_poll(struct tevent_context *ev,
{
struct poll_event_context *poll_ev = talloc_get_type_abort(
ev->additional_data, struct poll_event_context);
- struct tevent_fd *fde;
int pollrtn;
int timeout = -1;
+ unsigned i;
if (ev->signal_events && tevent_common_check_signal(ev)) {
return 0;
@@ -201,6 +257,10 @@ static int poll_event_loop_poll(struct tevent_context *ev,
timeout += (tvalp->tv_usec + 999) / 1000;
}
+ if (!poll_event_setup_fresh(ev, poll_ev)) {
+ return -1;
+ }
+
tevent_trace_point_callback(poll_ev->ev, TEVENT_TRACE_BEFORE_WAIT);
pollrtn = poll(poll_ev->fds, poll_ev->num_fds, timeout);
tevent_trace_point_callback(poll_ev->ev, TEVENT_TRACE_AFTER_WAIT);
@@ -210,18 +270,6 @@ static int poll_event_loop_poll(struct tevent_context *ev,
return 0;
}
- if (pollrtn == -1 && errno == EBADF) {
- /* the socket is dead! this should never
- happen as the socket should have first been
- made readable and that should have removed
- the event, so this must be a bug. This is a
- fatal error. */
- tevent_debug(ev, TEVENT_DEBUG_FATAL,
- "ERROR: EBADF on poll_event_loop_once\n");
- poll_ev->exit_code = EBADF;
- return -1;
- }
-
if (pollrtn == 0 && tvalp) {
/* we don't care about a possible delay here */
tevent_common_loop_timer_delay(ev);
@@ -239,12 +287,27 @@ static int poll_event_loop_poll(struct tevent_context *ev,
which ones and call the handler, being careful to allow
the handler to remove itself when called */
- for (fde = ev->fd_events; fde; fde = fde->next) {
+ for (i=0; i<poll_ev->num_fds; i++) {
struct pollfd *pfd;
- uint64_t pfd_idx = fde->additional_flags;
+ struct tevent_fd *fde;
uint16_t flags = 0;
- pfd = &poll_ev->fds[pfd_idx];
+ fde = poll_ev->fdes[i];
+ if (fde == NULL) {
+ /*
+ * This fde was talloc_free()'ed. Delete it
+ * from the arrays
+ */
+ poll_ev->num_fds -= 1;
+ poll_ev->fds[i] = poll_ev->fds[poll_ev->num_fds];
+ poll_ev->fdes[i] = poll_ev->fdes[poll_ev->num_fds];
+ if (poll_ev->fdes[i] != NULL) {
+ poll_ev->fdes[i]->additional_flags = i;
+ }
+ continue;
+ }
+
+ pfd = &poll_ev->fds[i];
if (pfd->revents & (POLLHUP|POLLERR)) {
/* If we only wait for TEVENT_FD_WRITE, we