From fa71f3241110fbe079de6d6cb0c8f3df001f4c65 Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Mon, 13 Aug 2012 16:06:01 +0200 Subject: lib/tevent: Add a thread-safe tevent backend Signed-off-by: Stefan Metzmacher --- lib/tevent/tevent_poll.c | 161 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 158 insertions(+), 3 deletions(-) (limited to 'lib/tevent/tevent_poll.c') diff --git a/lib/tevent/tevent_poll.c b/lib/tevent/tevent_poll.c index 2639143998..da8cc0137a 100644 --- a/lib/tevent/tevent_poll.c +++ b/lib/tevent/tevent_poll.c @@ -34,7 +34,8 @@ struct poll_event_context { struct tevent_context *ev; /* - * A DLIST for fresh fde's + * A DLIST for fresh fde's added by poll_event_add_fd but not + * picked up yet by poll_event_loop_once */ struct tevent_fd *fresh; @@ -45,6 +46,11 @@ struct poll_event_context { struct tevent_fd **fdes; unsigned num_fds; + /* + * Signal fd to wake the poll() thread + */ + int signal_fd; + /* information for exiting from the event loop */ int exit_code; }; @@ -61,17 +67,125 @@ static int poll_event_context_init(struct tevent_context *ev) return -1; } poll_ev->ev = ev; + poll_ev->signal_fd = -1; ev->additional_data = poll_ev; return 0; } +static int poll_event_mt_destructor(struct poll_event_context *poll_ev) +{ + if (poll_ev->signal_fd != -1) { + close(poll_ev->signal_fd); + poll_ev->signal_fd = -1; + } + if (poll_ev->num_fds == 0) { + return 0; + } + if (poll_ev->fds[0].fd != -1) { + close(poll_ev->fds[0].fd); + poll_ev->fds[0].fd = -1; + } + return 0; +} + +static bool set_nonblock(int fd) +{ + int val; + + val = fcntl(fd, F_GETFL, 0); + if (val == -1) { + return false; + } + val |= O_NONBLOCK; + + return (fcntl(fd, F_SETFL, val) != -1); +} + +static int poll_event_context_init_mt(struct tevent_context *ev) +{ + struct poll_event_context *poll_ev; + struct pollfd *pfd; + int fds[2]; + int ret; + + ret = poll_event_context_init(ev); + if (ret == -1) { + return ret; + } + + poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + + poll_ev->fds = talloc_zero(poll_ev, struct pollfd); + if (poll_ev->fds == NULL) { + return -1; + } + + ret = pipe(fds); + if (ret == -1) { + return -1; + } + + if (!set_nonblock(fds[0]) || !set_nonblock(fds[1])) { + close(fds[0]); + close(fds[1]); + return -1; + } + + poll_ev->signal_fd = fds[1]; + + pfd = &poll_ev->fds[0]; + pfd->fd = fds[0]; + pfd->events = (POLLIN|POLLHUP); + + poll_ev->num_fds = 1; + + talloc_set_destructor(poll_ev, poll_event_mt_destructor); + + return 0; +} + +static void poll_event_wake_pollthread(struct poll_event_context *poll_ev) +{ + char c; + ssize_t ret; + + if (poll_ev->signal_fd == -1) { + return; + } + c = 0; + do { + ret = write(poll_ev->signal_fd, &c, sizeof(c)); + } while ((ret == -1) && (errno == EINTR)); +} + +static void poll_event_drain_signal_fd(struct poll_event_context *poll_ev) +{ + char buf[16]; + ssize_t ret; + int fd; + + if (poll_ev->signal_fd == -1) { + return; + } + + if (poll_ev->num_fds < 1) { + return; + } + fd = poll_ev->fds[0].fd; + + do { + ret = read(fd, buf, sizeof(buf)); + } while (ret == sizeof(buf)); +} + /* destroy an fd_event */ static int poll_event_fd_destructor(struct tevent_fd *fde) { struct tevent_context *ev = fde->event_ctx; - struct poll_event_context *poll_ev = NULL; + struct poll_event_context *poll_ev; uint64_t del_idx = fde->additional_flags; if (ev == NULL) { @@ -82,6 +196,7 @@ static int poll_event_fd_destructor(struct tevent_fd *fde) ev->additional_data, struct poll_event_context); poll_ev->fdes[del_idx] = NULL; + poll_event_wake_pollthread(poll_ev); done: return tevent_common_fd_destructor(fde); } @@ -94,6 +209,21 @@ static int poll_fresh_fde_destructor(struct tevent_fd *fde) return 0; } +static void poll_event_schedule_immediate(struct tevent_immediate *im, + struct tevent_context *ev, + tevent_immediate_handler_t handler, + void *private_data, + const char *handler_name, + const char *location) +{ + struct poll_event_context *poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + + tevent_common_schedule_immediate(im, ev, handler, private_data, + handler_name, location); + poll_event_wake_pollthread(poll_ev); +} + /* add a fd based event return NULL on failure (memory allocation error) @@ -131,6 +261,7 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev, DLIST_ADD(poll_ev->fresh, fde); talloc_set_destructor(fde, poll_fresh_fde_destructor); + poll_event_wake_pollthread(poll_ev); /* * poll_event_loop_poll will take care of the rest in @@ -159,6 +290,7 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags) poll_ev->fds[idx].events = pollflags; fde->flags = flags; + poll_event_wake_pollthread(poll_ev); } static bool poll_event_setup_fresh(struct tevent_context *ev, @@ -246,6 +378,7 @@ static int poll_event_loop_poll(struct tevent_context *ev, ev->additional_data, struct poll_event_context); int pollrtn; int timeout = -1; + unsigned first_fd; unsigned i; if (ev->signal_events && tevent_common_check_signal(ev)) { @@ -257,6 +390,8 @@ static int poll_event_loop_poll(struct tevent_context *ev, timeout += (tvalp->tv_usec + 999) / 1000; } + poll_event_drain_signal_fd(poll_ev); + if (!poll_event_setup_fresh(ev, poll_ev)) { return -1; } @@ -283,11 +418,13 @@ static int poll_event_loop_poll(struct tevent_context *ev, return 0; } + first_fd = (poll_ev->signal_fd != -1) ? 1 : 0; + /* at least one file descriptor is ready - check which ones and call the handler, being careful to allow the handler to remove itself when called */ - for (i=0; inum_fds; i++) { + for (i=first_fd; inum_fds; i++) { struct pollfd *pfd; struct tevent_fd *fde; uint16_t flags = 0; @@ -379,3 +516,21 @@ _PRIVATE_ bool tevent_poll_init(void) { return tevent_register_backend("poll", &poll_event_ops); } + +static const struct tevent_ops poll_event_mt_ops = { + .context_init = poll_event_context_init_mt, + .add_fd = poll_event_add_fd, + .set_fd_close_fn = tevent_common_fd_set_close_fn, + .get_fd_flags = tevent_common_fd_get_flags, + .set_fd_flags = poll_event_set_fd_flags, + .add_timer = tevent_common_add_timer, + .schedule_immediate = poll_event_schedule_immediate, + .add_signal = tevent_common_add_signal, + .loop_once = poll_event_loop_once, + .loop_wait = tevent_common_loop_wait, +}; + +_PRIVATE_ bool tevent_poll_mt_init(void) +{ + return tevent_register_backend("poll_mt", &poll_event_mt_ops); +} -- cgit