From b9354f72291836e155ed5af56ab98b6a8537ceef Mon Sep 17 00:00:00 2001 From: Simo Sorce Date: Tue, 3 May 2011 18:55:25 -0400 Subject: s3-prefork: add asynchronous functions To get a client connection it is now possible to use asynchronous functions. Signed-off-by: Andreas Schneider --- source3/lib/server_prefork.c | 260 ++++++++++++++++++++++++++++++++++++++++++- source3/lib/server_prefork.h | 8 ++ 2 files changed, 267 insertions(+), 1 deletion(-) (limited to 'source3/lib') diff --git a/source3/lib/server_prefork.c b/source3/lib/server_prefork.c index 4aa3f482cc..b337fa0c3b 100644 --- a/source3/lib/server_prefork.c +++ b/source3/lib/server_prefork.c @@ -24,6 +24,7 @@ #include "system/filesys.h" #include "server_prefork.h" #include "../lib/util/util.h" +#include "../lib/util/tevent_unix.h" struct prefork_pool { @@ -94,7 +95,6 @@ bool prefork_create_pool(struct tevent_context *ev_ctx, case 0: /* THE CHILD */ pfp->pool[i].status = PF_WORKER_IDLE; - ret = pfp->main_fn(ev_ctx, &pfp->pool[i], pfp->listen_fd, pfp->lock_fd, pfp->private_data); @@ -500,3 +500,261 @@ int prefork_wait_for_client(struct pf_worker_data *pf, *fd = sd; return 0; } + +/* ==== async code ==== */ + +#define PF_ASYNC_LOCK_GRAB 0x01 +#define PF_ASYNC_LOCK_RELEASE 0x02 +#define PF_ASYNC_ACTION_MASK 0x03 +#define PF_ASYNC_LOCK_DONE 0x04 + +struct pf_lock_state { + struct pf_worker_data *pf; + int lock_fd; + int flags; +}; + +static void prefork_lock_handler(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval curtime, void *pvt); + +static struct tevent_req *prefork_lock_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct pf_worker_data *pf, + int lock_fd, int action) +{ + struct tevent_req *req; + struct pf_lock_state *state; + + req = tevent_req_create(mem_ctx, &state, struct pf_lock_state); + if (!req) { + return NULL; + } + + state->pf = pf; + state->lock_fd = lock_fd; + state->flags = action; + + /* try once immediately */ + prefork_lock_handler(ev, NULL, tevent_timeval_zero(), req); + if (state->flags & PF_ASYNC_LOCK_DONE) { + tevent_req_post(req, ev); + } + + return req; +} + +static void prefork_lock_handler(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval curtime, void *pvt) +{ + struct tevent_req *req; + struct pf_lock_state *state; + int ret; + + req = talloc_get_type_abort(pvt, struct tevent_req); + state = tevent_req_data(req, struct pf_lock_state); + + switch (state->flags & PF_ASYNC_ACTION_MASK) { + case PF_ASYNC_LOCK_GRAB: + ret = prefork_grab_lock(state->pf, state->lock_fd, 0); + break; + case PF_ASYNC_LOCK_RELEASE: + ret = prefork_release_lock(state->pf, state->lock_fd, 0); + break; + default: + ret = EINVAL; + break; + } + + switch (ret) { + case 0: + state->flags |= PF_ASYNC_LOCK_DONE; + tevent_req_done(req); + return; + case -1: + te = tevent_add_timer(ev, state, + tevent_timeval_current_ofs(1, 0), + prefork_lock_handler, req); + tevent_req_nomem(te, req); + return; + case -2: + /* server tells us to stop */ + state->flags |= PF_ASYNC_LOCK_DONE; + tevent_req_error(req, -2); + return; + default: + state->flags |= PF_ASYNC_LOCK_DONE; + tevent_req_error(req, ret); + return; + } +} + +static int prefork_lock_recv(struct tevent_req *req) +{ + int ret; + + if (!tevent_req_is_unix_error(req, &ret)) { + ret = 0; + } + + tevent_req_received(req); + return ret; +} + +struct pf_listen_state { + struct tevent_context *ev; + struct pf_worker_data *pf; + + int lock_fd; + int listen_fd; + + struct sockaddr *addr; + socklen_t *addrlen; + + int accept_fd; + + int error; +}; + +static void prefork_listen_lock_done(struct tevent_req *subreq); +static void prefork_listen_accept_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, void *pvt); +static void prefork_listen_release_done(struct tevent_req *subreq); + +struct tevent_req *prefork_listen_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct pf_worker_data *pf, + int lock_fd, int listen_fd, + struct sockaddr *addr, + socklen_t *addrlen) +{ + struct tevent_req *req, *subreq; + struct pf_listen_state *state; + + req = tevent_req_create(mem_ctx, &state, struct pf_listen_state); + if (!req) { + return NULL; + } + + state->ev = ev; + state->pf = pf; + state->lock_fd = lock_fd; + state->listen_fd = listen_fd; + state->addr = addr; + state->addrlen = addrlen; + state->accept_fd = -1; + state->error = 0; + + subreq = prefork_lock_send(state, state->ev, state->pf, + state->lock_fd, PF_ASYNC_LOCK_GRAB); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + + tevent_req_set_callback(subreq, prefork_listen_lock_done, req); + return req; +} + +static void prefork_listen_lock_done(struct tevent_req *subreq) +{ + struct tevent_req *req; + struct pf_listen_state *state; + struct tevent_fd *fde; + int ret; + + req = tevent_req_callback_data(subreq, struct tevent_req); + state = tevent_req_data(req, struct pf_listen_state); + + ret = prefork_lock_recv(subreq); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + /* next step, accept */ + fde = tevent_add_fd(state->ev, state, + state->listen_fd, TEVENT_FD_READ, + prefork_listen_accept_handler, req); + tevent_req_nomem(fde, req); +} + +static void prefork_listen_accept_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, void *pvt) +{ + struct pf_listen_state *state; + struct tevent_req *req, *subreq; + int err = 0; + int sd = -1; + + req = talloc_get_type_abort(pvt, struct tevent_req); + state = tevent_req_data(req, struct pf_listen_state); + + sd = accept(state->listen_fd, state->addr, state->addrlen); + if (sd == -1) { + if (errno == EINTR) { + /* keep trying */ + return; + } + err = errno; + DEBUG(6, ("Accept failed! (%d, %s)\n", err, strerror(err))); + + } + + /* do not track the listen fd anymore */ + talloc_free(fde); + if (err) { + tevent_req_error(req, err); + return; + } + + state->accept_fd = sd; + + /* release lock now */ + subreq = prefork_lock_send(state, state->ev, state->pf, + state->lock_fd, PF_ASYNC_LOCK_RELEASE); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, prefork_listen_release_done, req); +} + +static void prefork_listen_release_done(struct tevent_req *subreq) +{ + struct tevent_req *req; + int ret; + + req = tevent_req_callback_data(subreq, struct tevent_req); + + ret = prefork_lock_recv(subreq); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +int prefork_listen_recv(struct tevent_req *req, int *fd) +{ + struct pf_listen_state *state; + int ret; + + state = tevent_req_data(req, struct pf_listen_state); + + if (tevent_req_is_unix_error(req, &ret)) { + if (state->accept_fd != -1) { + close(state->accept_fd); + } + } else { + *fd = state->accept_fd; + ret = 0; + state->pf->status = PF_WORKER_BUSY; + state->pf->num_clients++; + } + + tevent_req_received(req); + return ret; +} diff --git a/source3/lib/server_prefork.h b/source3/lib/server_prefork.h index d6d7bf95c9..7e95602e81 100644 --- a/source3/lib/server_prefork.h +++ b/source3/lib/server_prefork.h @@ -74,3 +74,11 @@ int prefork_wait_for_client(struct pf_worker_data *pf, int lock_fd, int listen_fd, struct sockaddr *addr, socklen_t *addrlen, int *fd); + +struct tevent_req *prefork_listen_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct pf_worker_data *pf, + int lock_fd, int listen_fd, + struct sockaddr *addr, + socklen_t *addrlen); +int prefork_listen_recv(struct tevent_req *req, int *fd); -- cgit