summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVolker Lendecke <vl@samba.org>2012-08-13 16:06:01 +0200
committerStefan Metzmacher <metze@samba.org>2012-08-16 20:49:11 +0200
commitfa71f3241110fbe079de6d6cb0c8f3df001f4c65 (patch)
tree0711e760783e2d936ab7e605a0ae3492d4d55f05
parentd860aa2cacc783434973c14f7ae21964ca050e6f (diff)
downloadsamba-fa71f3241110fbe079de6d6cb0c8f3df001f4c65.tar.gz
samba-fa71f3241110fbe079de6d6cb0c8f3df001f4c65.tar.bz2
samba-fa71f3241110fbe079de6d6cb0c8f3df001f4c65.zip
lib/tevent: Add a thread-safe tevent backend
Signed-off-by: Stefan Metzmacher <metze@samba.org>
-rw-r--r--lib/tevent/tevent.c1
-rw-r--r--lib/tevent/tevent_internal.h1
-rw-r--r--lib/tevent/tevent_poll.c161
3 files changed, 160 insertions, 3 deletions
diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index 61ffc7edaa..fa842e4208 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -114,6 +114,7 @@ static void tevent_backend_init(void)
{
tevent_select_init();
tevent_poll_init();
+ tevent_poll_mt_init();
tevent_standard_init();
#ifdef HAVE_EPOLL
tevent_epoll_init();
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index 877510f9f4..f09cf576b2 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -315,6 +315,7 @@ void tevent_cleanup_pending_signal_handlers(struct tevent_signal *se);
bool tevent_standard_init(void);
bool tevent_select_init(void);
bool tevent_poll_init(void);
+bool tevent_poll_mt_init(void);
#ifdef HAVE_EPOLL
bool tevent_epoll_init(void);
#endif
diff --git a/lib/tevent/tevent_poll.c b/lib/tevent/tevent_poll.c
index 2639143998..da8cc0137a 100644
--- a/lib/tevent/tevent_poll.c
+++ b/lib/tevent/tevent_poll.c
@@ -34,7 +34,8 @@ struct poll_event_context {
struct tevent_context *ev;
/*
- * A DLIST for fresh fde's
+ * A DLIST for fresh fde's added by poll_event_add_fd but not
+ * picked up yet by poll_event_loop_once
*/
struct tevent_fd *fresh;
@@ -45,6 +46,11 @@ struct poll_event_context {
struct tevent_fd **fdes;
unsigned num_fds;
+ /*
+ * Signal fd to wake the poll() thread
+ */
+ int signal_fd;
+
/* information for exiting from the event loop */
int exit_code;
};
@@ -61,17 +67,125 @@ static int poll_event_context_init(struct tevent_context *ev)
return -1;
}
poll_ev->ev = ev;
+ poll_ev->signal_fd = -1;
ev->additional_data = poll_ev;
return 0;
}
+static int poll_event_mt_destructor(struct poll_event_context *poll_ev)
+{
+ if (poll_ev->signal_fd != -1) {
+ close(poll_ev->signal_fd);
+ poll_ev->signal_fd = -1;
+ }
+ if (poll_ev->num_fds == 0) {
+ return 0;
+ }
+ if (poll_ev->fds[0].fd != -1) {
+ close(poll_ev->fds[0].fd);
+ poll_ev->fds[0].fd = -1;
+ }
+ return 0;
+}
+
+static bool set_nonblock(int fd)
+{
+ int val;
+
+ val = fcntl(fd, F_GETFL, 0);
+ if (val == -1) {
+ return false;
+ }
+ val |= O_NONBLOCK;
+
+ return (fcntl(fd, F_SETFL, val) != -1);
+}
+
+static int poll_event_context_init_mt(struct tevent_context *ev)
+{
+ struct poll_event_context *poll_ev;
+ struct pollfd *pfd;
+ int fds[2];
+ int ret;
+
+ ret = poll_event_context_init(ev);
+ if (ret == -1) {
+ return ret;
+ }
+
+ poll_ev = talloc_get_type_abort(
+ ev->additional_data, struct poll_event_context);
+
+ poll_ev->fds = talloc_zero(poll_ev, struct pollfd);
+ if (poll_ev->fds == NULL) {
+ return -1;
+ }
+
+ ret = pipe(fds);
+ if (ret == -1) {
+ return -1;
+ }
+
+ if (!set_nonblock(fds[0]) || !set_nonblock(fds[1])) {
+ close(fds[0]);
+ close(fds[1]);
+ return -1;
+ }
+
+ poll_ev->signal_fd = fds[1];
+
+ pfd = &poll_ev->fds[0];
+ pfd->fd = fds[0];
+ pfd->events = (POLLIN|POLLHUP);
+
+ poll_ev->num_fds = 1;
+
+ talloc_set_destructor(poll_ev, poll_event_mt_destructor);
+
+ return 0;
+}
+
+static void poll_event_wake_pollthread(struct poll_event_context *poll_ev)
+{
+ char c;
+ ssize_t ret;
+
+ if (poll_ev->signal_fd == -1) {
+ return;
+ }
+ c = 0;
+ do {
+ ret = write(poll_ev->signal_fd, &c, sizeof(c));
+ } while ((ret == -1) && (errno == EINTR));
+}
+
+static void poll_event_drain_signal_fd(struct poll_event_context *poll_ev)
+{
+ char buf[16];
+ ssize_t ret;
+ int fd;
+
+ if (poll_ev->signal_fd == -1) {
+ return;
+ }
+
+ if (poll_ev->num_fds < 1) {
+ return;
+ }
+ fd = poll_ev->fds[0].fd;
+
+ do {
+ ret = read(fd, buf, sizeof(buf));
+ } while (ret == sizeof(buf));
+}
+
/*
destroy an fd_event
*/
static int poll_event_fd_destructor(struct tevent_fd *fde)
{
struct tevent_context *ev = fde->event_ctx;
- struct poll_event_context *poll_ev = NULL;
+ struct poll_event_context *poll_ev;
uint64_t del_idx = fde->additional_flags;
if (ev == NULL) {
@@ -82,6 +196,7 @@ static int poll_event_fd_destructor(struct tevent_fd *fde)
ev->additional_data, struct poll_event_context);
poll_ev->fdes[del_idx] = NULL;
+ poll_event_wake_pollthread(poll_ev);
done:
return tevent_common_fd_destructor(fde);
}
@@ -94,6 +209,21 @@ static int poll_fresh_fde_destructor(struct tevent_fd *fde)
return 0;
}
+static void poll_event_schedule_immediate(struct tevent_immediate *im,
+ struct tevent_context *ev,
+ tevent_immediate_handler_t handler,
+ void *private_data,
+ const char *handler_name,
+ const char *location)
+{
+ struct poll_event_context *poll_ev = talloc_get_type_abort(
+ ev->additional_data, struct poll_event_context);
+
+ tevent_common_schedule_immediate(im, ev, handler, private_data,
+ handler_name, location);
+ poll_event_wake_pollthread(poll_ev);
+}
+
/*
add a fd based event
return NULL on failure (memory allocation error)
@@ -131,6 +261,7 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev,
DLIST_ADD(poll_ev->fresh, fde);
talloc_set_destructor(fde, poll_fresh_fde_destructor);
+ poll_event_wake_pollthread(poll_ev);
/*
* poll_event_loop_poll will take care of the rest in
@@ -159,6 +290,7 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
poll_ev->fds[idx].events = pollflags;
fde->flags = flags;
+ poll_event_wake_pollthread(poll_ev);
}
static bool poll_event_setup_fresh(struct tevent_context *ev,
@@ -246,6 +378,7 @@ static int poll_event_loop_poll(struct tevent_context *ev,
ev->additional_data, struct poll_event_context);
int pollrtn;
int timeout = -1;
+ unsigned first_fd;
unsigned i;
if (ev->signal_events && tevent_common_check_signal(ev)) {
@@ -257,6 +390,8 @@ static int poll_event_loop_poll(struct tevent_context *ev,
timeout += (tvalp->tv_usec + 999) / 1000;
}
+ poll_event_drain_signal_fd(poll_ev);
+
if (!poll_event_setup_fresh(ev, poll_ev)) {
return -1;
}
@@ -283,11 +418,13 @@ static int poll_event_loop_poll(struct tevent_context *ev,
return 0;
}
+ first_fd = (poll_ev->signal_fd != -1) ? 1 : 0;
+
/* at least one file descriptor is ready - check
which ones and call the handler, being careful to allow
the handler to remove itself when called */
- for (i=0; i<poll_ev->num_fds; i++) {
+ for (i=first_fd; i<poll_ev->num_fds; i++) {
struct pollfd *pfd;
struct tevent_fd *fde;
uint16_t flags = 0;
@@ -379,3 +516,21 @@ _PRIVATE_ bool tevent_poll_init(void)
{
return tevent_register_backend("poll", &poll_event_ops);
}
+
+static const struct tevent_ops poll_event_mt_ops = {
+ .context_init = poll_event_context_init_mt,
+ .add_fd = poll_event_add_fd,
+ .set_fd_close_fn = tevent_common_fd_set_close_fn,
+ .get_fd_flags = tevent_common_fd_get_flags,
+ .set_fd_flags = poll_event_set_fd_flags,
+ .add_timer = tevent_common_add_timer,
+ .schedule_immediate = poll_event_schedule_immediate,
+ .add_signal = tevent_common_add_signal,
+ .loop_once = poll_event_loop_once,
+ .loop_wait = tevent_common_loop_wait,
+};
+
+_PRIVATE_ bool tevent_poll_mt_init(void)
+{
+ return tevent_register_backend("poll_mt", &poll_event_mt_ops);
+}