diff options
author | Volker Lendecke <vl@samba.org> | 2012-07-09 11:10:30 +0200 |
---|---|---|
committer | Jeremy Allison <jra@samba.org> | 2012-07-18 15:42:22 -0700 |
commit | 9e1c873a9e5034244a58940ebd32c7ba0f6b9e01 (patch) | |
tree | 35ec41824a6ba80055e51ccf149a508735f85de1 | |
parent | bf8696fe5dee4a829b6f7444cdd4e9a9aad2d110 (diff) | |
download | samba-9e1c873a9e5034244a58940ebd32c7ba0f6b9e01.tar.gz samba-9e1c873a9e5034244a58940ebd32c7ba0f6b9e01.tar.bz2 samba-9e1c873a9e5034244a58940ebd32c7ba0f6b9e01.zip |
s3: Convert aio_fork to pread/pwrite_send/recv
Signed-off-by: Jeremy Allison <jra@samba.org>
-rw-r--r-- | source3/modules/vfs_aio_fork.c | 497 |
1 files changed, 174 insertions, 323 deletions
diff --git a/source3/modules/vfs_aio_fork.c b/source3/modules/vfs_aio_fork.c index 21a7f1df42..76153e38bc 100644 --- a/source3/modules/vfs_aio_fork.c +++ b/source3/modules/vfs_aio_fork.c @@ -24,7 +24,8 @@ #include "system/shmem.h" #include "smbd/smbd.h" #include "smbd/globals.h" -#include <aio.h> +#include "lib/async_req/async_sock.h" +#include "lib/util/tevent_unix.h" #ifndef MAP_FILE #define MAP_FILE 0 @@ -94,17 +95,11 @@ struct aio_child_list; struct aio_child { struct aio_child *prev, *next; struct aio_child_list *list; - SMB_STRUCT_AIOCB *aiocb; pid_t pid; int sockfd; - struct fd_event *sock_event; - struct rw_ret retval; - struct mmap_area *map; /* ==NULL means write request */ + struct mmap_area *map; bool dont_delete; /* Marked as in use since last cleanup */ - bool cancelled; - bool read_cmd; - bool called_from_suspend; - bool completion_done; + bool busy; }; struct aio_child_list { @@ -241,7 +236,7 @@ static void aio_child_cleanup(struct event_context *event_ctx, for (child = list->children; child != NULL; child = next) { next = child->next; - if (child->aiocb != NULL) { + if (child->busy) { DEBUG(10, ("child %d currently active\n", (int)child->pid)); continue; @@ -395,62 +390,11 @@ static void aio_child_loop(int sockfd, struct mmap_area *map) } } -static void handle_aio_completion(struct event_context *event_ctx, - struct fd_event *event, uint16 flags, - void *p) -{ - struct aio_extra *aio_ex = NULL; - struct aio_child *child = (struct aio_child *)p; - NTSTATUS status; - - DEBUG(10, ("handle_aio_completion called with flags=%d\n", flags)); - - if ((flags & EVENT_FD_READ) == 0) { - return; - } - - status = read_data(child->sockfd, (char *)&child->retval, - sizeof(child->retval)); - - if (!NT_STATUS_IS_OK(status)) { - DEBUG(1, ("aio child %d died: %s\n", (int)child->pid, - nt_errstr(status))); - child->retval.size = -1; - child->retval.ret_errno = EIO; - } - - if (child->aiocb == NULL) { - DEBUG(1, ("Inactive child died\n")); - TALLOC_FREE(child); - return; - } - - if (child->cancelled) { - child->aiocb = NULL; - child->cancelled = false; - return; - } - - if (child->read_cmd && (child->retval.size > 0)) { - SMB_ASSERT(child->retval.size <= child->aiocb->aio_nbytes); - memcpy((void *)child->aiocb->aio_buf, (void *)child->map->ptr, - child->retval.size); - } - - if (child->called_from_suspend) { - child->completion_done = true; - return; - } - aio_ex = (struct aio_extra *)child->aiocb->aio_sigevent.sigev_value.sival_ptr; - smbd_aio_complete_aio_ex(aio_ex); - TALLOC_FREE(aio_ex); -} - static int aio_child_destructor(struct aio_child *child) { char c=0; - SMB_ASSERT((child->aiocb == NULL) || child->cancelled); + SMB_ASSERT(!child->busy); DEBUG(10, ("aio_child_destructor: removing child %d on fd %d\n", child->pid, child->sockfd)); @@ -531,16 +475,6 @@ static int create_aio_child(struct smbd_server_connection *sconn, result->sockfd = fdpair[0]; close(fdpair[1]); - result->sock_event = event_add_fd(server_event_context(), result, - result->sockfd, EVENT_FD_READ, - handle_aio_completion, - result); - if (result->sock_event == NULL) { - ret = ENOMEM; - DEBUG(0, ("event_add_fd failed\n")); - goto fail; - } - result->list = children; DLIST_ADD(children->children, result); @@ -570,8 +504,7 @@ static int get_idle_child(struct vfs_handle_struct *handle, } for (child = children->children; child != NULL; child = child->next) { - if (child->aiocb == NULL) { - /* idle */ + if (!child->busy) { break; } } @@ -591,314 +524,234 @@ static int get_idle_child(struct vfs_handle_struct *handle, } child->dont_delete = true; + child->busy = true; *pchild = child; return 0; } -static int aio_fork_read(struct vfs_handle_struct *handle, - struct files_struct *fsp, SMB_STRUCT_AIOCB *aiocb) -{ +struct aio_fork_pread_state { struct aio_child *child; - struct rw_cmd cmd; ssize_t ret; int err; +}; - if (aiocb->aio_nbytes > 128*1024) { - /* TODO: support variable buffers */ - errno = EINVAL; - return -1; - } - - err = get_idle_child(handle, &child); - if (err != 0) { - DEBUG(10, ("Could not get an idle child: %s\n", - strerror(err))); - errno = err; - return -1; - } - - child->read_cmd = true; - child->aiocb = aiocb; - child->retval.ret_errno = EINPROGRESS; - - ZERO_STRUCT(cmd); - cmd.n = aiocb->aio_nbytes; - cmd.offset = aiocb->aio_offset; - cmd.read_cmd = child->read_cmd; - - DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd, - (int)child->pid)); - - ret = write_fd(child->sockfd, &cmd, sizeof(cmd), fsp->fh->fd); - if (ret == -1) { - DEBUG(10, ("write_fd failed: %s\n", strerror(errno))); - return -1; - } - - return 0; -} +static void aio_fork_pread_done(struct tevent_req *subreq); -static int aio_fork_write(struct vfs_handle_struct *handle, - struct files_struct *fsp, SMB_STRUCT_AIOCB *aiocb) +static struct tevent_req *aio_fork_pread_send(struct vfs_handle_struct *handle, + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct files_struct *fsp, + void *data, + size_t n, off_t offset) { - struct aio_child *child; + struct tevent_req *req, *subreq; + struct aio_fork_pread_state *state; struct rw_cmd cmd; - ssize_t ret; + ssize_t written; int err; - if (aiocb->aio_nbytes > 128*1024) { + req = tevent_req_create(mem_ctx, &state, struct aio_fork_pread_state); + if (req == NULL) { + return NULL; + } + + if (n > 128*1024) { /* TODO: support variable buffers */ - errno = EINVAL; - return -1; + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); } - err = get_idle_child(handle, &child); + err = get_idle_child(handle, &state->child); if (err != 0) { - DEBUG(10, ("Could not get an idle child: %s\n", - strerror(err))); - errno = err; - return -1; + tevent_req_error(req, err); + return tevent_req_post(req, ev); } - child->read_cmd = false; - child->aiocb = aiocb; - child->retval.ret_errno = EINPROGRESS; - - memcpy((void *)child->map->ptr, (void *)aiocb->aio_buf, - aiocb->aio_nbytes); - ZERO_STRUCT(cmd); - cmd.n = aiocb->aio_nbytes; - cmd.offset = aiocb->aio_offset; - cmd.read_cmd = child->read_cmd; + cmd.n = n; + cmd.offset = offset; + cmd.read_cmd = true; DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd, - (int)child->pid)); + (int)state->child->pid)); - ret = write_fd(child->sockfd, &cmd, sizeof(cmd), fsp->fh->fd); - if (ret == -1) { - DEBUG(10, ("write_fd failed: %s\n", strerror(errno))); - return -1; + /* + * Not making this async. We're writing into an empty unix + * domain socket. This should never block. + */ + written = write_fd(state->child->sockfd, &cmd, sizeof(cmd), + fsp->fh->fd); + if (written == -1) { + err = errno; + + TALLOC_FREE(state->child); + + DEBUG(10, ("write_fd failed: %s\n", strerror(err))); + tevent_req_error(req, err); + return tevent_req_post(req, ev); } - return 0; + subreq = read_packet_send(state, ev, state->child->sockfd, + sizeof(struct rw_ret), NULL, NULL); + if (tevent_req_nomem(subreq, req)) { + TALLOC_FREE(state->child); /* we sent sth down */ + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, aio_fork_pread_done, req); + return req; } -static struct aio_child *aio_fork_find_child(struct vfs_handle_struct *handle, - SMB_STRUCT_AIOCB *aiocb) +static void aio_fork_pread_done(struct tevent_req *subreq) { - struct aio_child_list *children; - struct aio_child *child; + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct aio_fork_pread_state *state = tevent_req_data( + req, struct aio_fork_pread_state); + ssize_t nread; + uint8_t *buf; + int err; + struct rw_ret *retbuf; - children = init_aio_children(handle); - if (children == NULL) { - return NULL; + nread = read_packet_recv(subreq, talloc_tos(), &buf, &err); + TALLOC_FREE(subreq); + if (nread == -1) { + TALLOC_FREE(state->child); + tevent_req_error(req, err); + return; } - for (child = children->children; child != NULL; child = child->next) { - if (child->aiocb == aiocb) { - return child; - } - } + state->child->busy = false; - return NULL; + retbuf = (struct rw_ret *)buf; + state->ret = retbuf->size; + state->err = retbuf->ret_errno; + tevent_req_done(req); } -static ssize_t aio_fork_return_fn(struct vfs_handle_struct *handle, - struct files_struct *fsp, - SMB_STRUCT_AIOCB *aiocb) +static ssize_t aio_fork_pread_recv(struct tevent_req *req, int *err) { - struct aio_child *child = aio_fork_find_child(handle, aiocb); + struct aio_fork_pread_state *state = tevent_req_data( + req, struct aio_fork_pread_state); - if (child == NULL) { - errno = EINVAL; - DEBUG(0, ("returning EINVAL\n")); + if (tevent_req_is_unix_error(req, err)) { return -1; } - - child->aiocb = NULL; - - if (child->cancelled) { - errno = ECANCELED; - return -1; + if (state->ret == -1) { + *err = state->err; } + return state->ret; +} - if (child->retval.size == -1) { - errno = child->retval.ret_errno; - } +struct aio_fork_pwrite_state { + struct aio_child *child; + ssize_t ret; + int err; +}; - return child->retval.size; -} +static void aio_fork_pwrite_done(struct tevent_req *subreq); -static int aio_fork_cancel(struct vfs_handle_struct *handle, - struct files_struct *fsp, - SMB_STRUCT_AIOCB *aiocb) +static struct tevent_req *aio_fork_pwrite_send( + struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx, + struct tevent_context *ev, struct files_struct *fsp, + const void *data, size_t n, off_t offset) { - struct aio_child_list *children; - struct aio_child *child; + struct tevent_req *req, *subreq; + struct aio_fork_pwrite_state *state; + struct rw_cmd cmd; + ssize_t written; + int err; - children = init_aio_children(handle); - if (children == NULL) { - errno = EINVAL; - return -1; + req = tevent_req_create(mem_ctx, &state, struct aio_fork_pwrite_state); + if (req == NULL) { + return NULL; } - for (child = children->children; child != NULL; child = child->next) { - if (child->aiocb == NULL) { - continue; - } - if (child->aiocb->aio_fildes != fsp->fh->fd) { - continue; - } - if ((aiocb != NULL) && (child->aiocb != aiocb)) { - continue; - } - - /* - * We let the child do its job, but we discard the result when - * it's finished. - */ + if (n > 128*1024) { + /* TODO: support variable buffers */ + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } - child->cancelled = true; + err = get_idle_child(handle, &state->child); + if (err != 0) { + tevent_req_error(req, err); + return tevent_req_post(req, ev); } - return AIO_CANCELED; -} + ZERO_STRUCT(cmd); + cmd.n = n; + cmd.offset = offset; + cmd.read_cmd = false; -static int aio_fork_error_fn(struct vfs_handle_struct *handle, - struct files_struct *fsp, - SMB_STRUCT_AIOCB *aiocb) -{ - struct aio_child *child = aio_fork_find_child(handle, aiocb); + DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd, + (int)state->child->pid)); - if (child == NULL) { - errno = EINVAL; - return -1; - } + /* + * Not making this async. We're writing into an empty unix + * domain socket. This should never block. + */ + written = write_fd(state->child->sockfd, &cmd, sizeof(cmd), + fsp->fh->fd); + if (written == -1) { + err = errno; - return child->retval.ret_errno; -} + TALLOC_FREE(state->child); -static void aio_fork_suspend_timed_out(struct tevent_context *event_ctx, - struct tevent_timer *te, - struct timeval now, - void *private_data) -{ - bool *timed_out = (bool *)private_data; - /* Remove this timed event handler. */ - TALLOC_FREE(te); - *timed_out = true; + DEBUG(10, ("write_fd failed: %s\n", strerror(err))); + tevent_req_error(req, err); + return tevent_req_post(req, ev); + } + + subreq = read_packet_send(state, ev, state->child->sockfd, + sizeof(struct rw_ret), NULL, NULL); + if (tevent_req_nomem(subreq, req)) { + TALLOC_FREE(state->child); /* we sent sth down */ + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, aio_fork_pwrite_done, req); + return req; } -static int aio_fork_suspend(struct vfs_handle_struct *handle, - struct files_struct *fsp, - const SMB_STRUCT_AIOCB * const aiocb_array[], - int n, - const struct timespec *timeout) +static void aio_fork_pwrite_done(struct tevent_req *subreq) { - struct aio_child_list *children = NULL; - TALLOC_CTX *frame = talloc_stackframe(); - struct event_context *ev = NULL; - int i; - int ret = -1; - bool timed_out = false; + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct aio_fork_pwrite_state *state = tevent_req_data( + req, struct aio_fork_pwrite_state); + ssize_t nread; + uint8_t *buf; int err; + struct rw_ret *retbuf; - children = init_aio_children(handle); - if (children == NULL) { - errno = EINVAL; - goto out; - } - - /* This is a blocking call, and has to use a sub-event loop. */ - ev = event_context_init(frame); - if (ev == NULL) { - errno = ENOMEM; - goto out; - } - - if (timeout) { - struct timeval tv = convert_timespec_to_timeval(*timeout); - struct tevent_timer *te = tevent_add_timer(ev, - frame, - timeval_current_ofs(tv.tv_sec, - tv.tv_usec), - aio_fork_suspend_timed_out, - &timed_out); - if (!te) { - errno = ENOMEM; - goto out; - } + nread = read_packet_recv(subreq, talloc_tos(), &buf, &err); + TALLOC_FREE(subreq); + if (nread == -1) { + TALLOC_FREE(state->child); + tevent_req_error(req, err); + return; } - for (i = 0; i < n; i++) { - struct aio_child *child = NULL; - const SMB_STRUCT_AIOCB *aiocb = aiocb_array[i]; + state->child->busy = false; - if (!aiocb) { - continue; - } + retbuf = (struct rw_ret *)buf; + state->ret = retbuf->size; + state->err = retbuf->ret_errno; + tevent_req_done(req); +} - /* - * We're going to cheat here. We know that smbd/aio.c - * only calls this when it's waiting for every single - * outstanding call to finish on a close, so just wait - * individually for each IO to complete. We don't care - * what order they finish - only that they all do. JRA. - */ +static ssize_t aio_fork_pwrite_recv(struct tevent_req *req, int *err) +{ + struct aio_fork_pwrite_state *state = tevent_req_data( + req, struct aio_fork_pwrite_state); - for (child = children->children; child != NULL; child = child->next) { - struct tevent_fd *event; - - if (child->aiocb == NULL) { - continue; - } - if (child->aiocb->aio_fildes != fsp->fh->fd) { - continue; - } - if (child->aiocb != aiocb) { - continue; - } - - if (child->aiocb->aio_sigevent.sigev_value.sival_ptr == NULL) { - continue; - } - - event = event_add_fd(ev, - frame, - child->sockfd, - EVENT_FD_READ, - handle_aio_completion, - child); - if (event == NULL) { - errno = ENOMEM; - goto out; - } - - child->called_from_suspend = true; - - while (!child->completion_done) { - if (tevent_loop_once(ev) == -1) { - goto out; - } - - if (timed_out) { - errno = EAGAIN; - goto out; - } - } - } + if (tevent_req_is_unix_error(req, err)) { + return -1; } - - ret = 0; - - out: - - err = errno; - TALLOC_FREE(frame); - errno = err; - return ret; + if (state->ret == -1) { + *err = state->err; + } + return state->ret; } static int aio_fork_connect(vfs_handle_struct *handle, const char *service, @@ -920,12 +773,10 @@ static int aio_fork_connect(vfs_handle_struct *handle, const char *service, static struct vfs_fn_pointers vfs_aio_fork_fns = { .connect_fn = aio_fork_connect, - .aio_read_fn = aio_fork_read, - .aio_write_fn = aio_fork_write, - .aio_return_fn = aio_fork_return_fn, - .aio_cancel_fn = aio_fork_cancel, - .aio_error_fn = aio_fork_error_fn, - .aio_suspend_fn = aio_fork_suspend, + .pread_send_fn = aio_fork_pread_send, + .pread_recv_fn = aio_fork_pread_recv, + .pwrite_send_fn = aio_fork_pwrite_send, + .pwrite_recv_fn = aio_fork_pwrite_recv, }; NTSTATUS vfs_aio_fork_init(void); |