diff options
author | Volker Lendecke <vl@samba.org> | 2012-07-06 13:19:20 +0200 |
---|---|---|
committer | Jeremy Allison <jra@samba.org> | 2012-07-18 15:40:01 -0700 |
commit | 715653a335acec525e8b0aa6d24d840d9d347eb9 (patch) | |
tree | 87a607fcc98ebc0491eecf63559b93e219cb27f5 /source3 | |
parent | 66eb7bc6ecb1d9bada5b738fff6b428c59be29cc (diff) | |
download | samba-715653a335acec525e8b0aa6d24d840d9d347eb9.tar.gz samba-715653a335acec525e8b0aa6d24d840d9d347eb9.tar.bz2 samba-715653a335acec525e8b0aa6d24d840d9d347eb9.zip |
s3:vfs_aio_pthread: Convert to libasys
Signed-off-by: Jeremy Allison <jra@samba.org>
Diffstat (limited to 'source3')
-rw-r--r-- | source3/Makefile.in | 1 | ||||
-rw-r--r-- | source3/modules/vfs_aio_pthread.c | 646 | ||||
-rw-r--r-- | source3/modules/wscript_build | 2 |
3 files changed, 124 insertions, 525 deletions
diff --git a/source3/Makefile.in b/source3/Makefile.in index 672f2acb6c..06090446c3 100644 --- a/source3/Makefile.in +++ b/source3/Makefile.in @@ -453,6 +453,7 @@ LIB_OBJ = $(LIBSAMBAUTIL_OBJ) $(UTIL_OBJ) $(CRYPTO_OBJ) $(LIBTSOCKET_OBJ) \ ../lib/socket/interfaces.o lib/memcache.o \ lib/talloc_dict.o \ lib/serverid.o \ + lib/asys/asys.o \ lib/util_transfer_file.o ../lib/async_req/async_sock.o \ lib/addrchange.o \ $(TDB_LIB_OBJ) \ diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_pthread.c index ae5963b768..06ac8b8667 100644 --- a/source3/modules/vfs_aio_pthread.c +++ b/source3/modules/vfs_aio_pthread.c @@ -27,590 +27,168 @@ #include "smbd/smbd.h" #include "smbd/globals.h" #include "lib/pthreadpool/pthreadpool.h" +#include "lib/asys/asys.h" +#include "lib/util/tevent_unix.h" #ifdef HAVE_LINUX_FALLOC_H #include <linux/falloc.h> #endif -struct aio_extra; -static struct pthreadpool *pool; -static int aio_pthread_jobid; +static struct asys_context *asys_ctx; +struct tevent_fd *asys_fde; -struct aio_private_data { - struct aio_private_data *prev, *next; - int jobid; - SMB_STRUCT_AIOCB *aiocb; - ssize_t ret_size; - int ret_errno; - bool cancelled; - bool write_command; - bool flush_write; +struct aio_pthread_state { + struct tevent_req *req; + ssize_t ret; + int err; }; -/* List of outstanding requests we have. */ -static struct aio_private_data *pd_list; - -static void aio_pthread_handle_completion(struct event_context *event_ctx, - struct fd_event *event, - uint16 flags, - void *p); - - -/************************************************************************ - Ensure thread pool is initialized. -***********************************************************************/ - -static bool init_aio_threadpool(struct event_context *ev_ctx, - struct pthreadpool **pp_pool, - void (*completion_fn)(struct event_context *, - struct fd_event *, - uint16, - void *)) -{ - struct fd_event *sock_event = NULL; - int ret = 0; - - if (*pp_pool) { - return true; - } - - ret = pthreadpool_init(aio_pending_size, pp_pool); - if (ret) { - errno = ret; - return false; - } - sock_event = tevent_add_fd(ev_ctx, - NULL, - pthreadpool_signal_fd(*pp_pool), - TEVENT_FD_READ, - completion_fn, - NULL); - if (sock_event == NULL) { - pthreadpool_destroy(*pp_pool); - *pp_pool = NULL; - return false; - } - - DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n", - aio_pending_size)); - - return true; -} - - -/************************************************************************ - Worker function - core of the pthread aio engine. - This is the function that actually does the IO. -***********************************************************************/ - -static void aio_worker(void *private_data) -{ - struct aio_private_data *pd = - (struct aio_private_data *)private_data; - - if (pd->write_command) { - pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes, - (const void *)pd->aiocb->aio_buf, - pd->aiocb->aio_nbytes, - pd->aiocb->aio_offset); - if (pd->ret_size == -1 && errno == ESPIPE) { - /* Maintain the fiction that pipes can - be seeked (sought?) on. */ - pd->ret_size = sys_write(pd->aiocb->aio_fildes, - (const void *)pd->aiocb->aio_buf, - pd->aiocb->aio_nbytes); - } -#if defined(HAVE_FSYNC) - if (pd->ret_size != -1 && pd->flush_write) { - /* - * Optimization - flush if requested. - * Ignore error as upper layer will - * also do this. - */ - (void)fsync(pd->aiocb->aio_fildes); - } -#endif - } else { - pd->ret_size = sys_pread(pd->aiocb->aio_fildes, - (void *)pd->aiocb->aio_buf, - pd->aiocb->aio_nbytes, - pd->aiocb->aio_offset); - if (pd->ret_size == -1 && errno == ESPIPE) { - /* Maintain the fiction that pipes can - be seeked (sought?) on. */ - pd->ret_size = sys_read(pd->aiocb->aio_fildes, - (void *)pd->aiocb->aio_buf, - pd->aiocb->aio_nbytes); - } - } - if (pd->ret_size == -1) { - pd->ret_errno = errno; - } else { - pd->ret_errno = 0; - } -} - -/************************************************************************ - Private data destructor. -***********************************************************************/ - -static int pd_destructor(struct aio_private_data *pd) +static int aio_pthread_state_destructor(struct aio_pthread_state *s) { - DLIST_REMOVE(pd_list, pd); + asys_cancel(asys_ctx, s->req); return 0; } -/************************************************************************ - Create and initialize a private data struct. -***********************************************************************/ - -static struct aio_private_data *create_private_data(TALLOC_CTX *ctx, - SMB_STRUCT_AIOCB *aiocb) +static struct tevent_req *aio_pthread_pread_send( + struct vfs_handle_struct *handle, + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct files_struct *fsp, void *data, size_t n, off_t offset) { - struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data); - if (!pd) { - return NULL; - } - pd->jobid = aio_pthread_jobid++; - pd->aiocb = aiocb; - pd->ret_size = -1; - pd->ret_errno = EINPROGRESS; - talloc_set_destructor(pd, pd_destructor); - DLIST_ADD_END(pd_list, pd, struct aio_private_data *); - return pd; -} - -/************************************************************************ - Spin off a threadpool (if needed) and initiate a pread call. -***********************************************************************/ - -static int aio_pthread_read(struct vfs_handle_struct *handle, - struct files_struct *fsp, - SMB_STRUCT_AIOCB *aiocb) -{ - struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr; - struct aio_private_data *pd = NULL; + struct tevent_req *req; + struct aio_pthread_state *state; int ret; - if (!init_aio_threadpool(handle->conn->sconn->ev_ctx, - &pool, - aio_pthread_handle_completion)) { - return -1; - } - - pd = create_private_data(aio_ex, aiocb); - if (pd == NULL) { - DEBUG(10, ("aio_pthread_read: Could not create private data.\n")); - return -1; + req = tevent_req_create(mem_ctx, &state, struct aio_pthread_state); + if (req == NULL) { + return NULL; } + state->req = req; - ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd); - if (ret) { - errno = ret; - return -1; + ret = asys_pread(asys_ctx, fsp->fh->fd, data, n, offset, req); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); } + talloc_set_destructor(state, aio_pthread_state_destructor); - DEBUG(10, ("aio_pthread_read: jobid=%d pread requested " - "of %llu bytes at offset %llu\n", - pd->jobid, - (unsigned long long)pd->aiocb->aio_nbytes, - (unsigned long long)pd->aiocb->aio_offset)); - - return 0; + return req; } -/************************************************************************ - Spin off a threadpool (if needed) and initiate a pwrite call. -***********************************************************************/ - -static int aio_pthread_write(struct vfs_handle_struct *handle, - struct files_struct *fsp, - SMB_STRUCT_AIOCB *aiocb) +static struct tevent_req *aio_pthread_pwrite_send( + struct vfs_handle_struct *handle, + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct files_struct *fsp, const void *data, size_t n, off_t offset) { - struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr; - struct aio_private_data *pd = NULL; + struct tevent_req *req; + struct aio_pthread_state *state; int ret; - if (!init_aio_threadpool(handle->conn->sconn->ev_ctx, - &pool, - aio_pthread_handle_completion)) { - return -1; - } - - pd = create_private_data(aio_ex, aiocb); - if (pd == NULL) { - DEBUG(10, ("aio_pthread_write: Could not create private data.\n")); - return -1; - } - - pd->write_command = true; - if (lp_strict_sync(SNUM(fsp->conn)) && - (lp_syncalways(SNUM(fsp->conn)) || - aio_write_through_requested(aio_ex))) { - pd->flush_write = true; - } - - - ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd); - if (ret) { - errno = ret; - return -1; + req = tevent_req_create(mem_ctx, &state, struct aio_pthread_state); + if (req == NULL) { + return NULL; } + state->req = req; - DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested " - "of %llu bytes at offset %llu\n", - pd->jobid, - (unsigned long long)pd->aiocb->aio_nbytes, - (unsigned long long)pd->aiocb->aio_offset)); - - return 0; -} - -/************************************************************************ - Find the private data by jobid. -***********************************************************************/ - -static struct aio_private_data *find_private_data_by_jobid(int jobid) -{ - struct aio_private_data *pd; - - for (pd = pd_list; pd != NULL; pd = pd->next) { - if (pd->jobid == jobid) { - return pd; - } + ret = asys_pwrite(asys_ctx, fsp->fh->fd, data, n, offset, req); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); } + talloc_set_destructor(state, aio_pthread_state_destructor); - return NULL; + return req; } -/************************************************************************ - Callback when an IO completes. -***********************************************************************/ - -static void aio_pthread_handle_completion(struct event_context *event_ctx, - struct fd_event *event, - uint16 flags, - void *p) +static void aio_pthread_finished(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, void *p) { - struct aio_extra *aio_ex = NULL; - struct aio_private_data *pd = NULL; - int jobid = 0; - int ret; - - DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n", - (int)flags)); - - if ((flags & EVENT_FD_READ) == 0) { + struct tevent_req *req; + struct aio_pthread_state *state; + int res; + ssize_t ret; + int err; + void *private_data; + + if ((flags & TEVENT_FD_READ) == 0) { return; } - ret = pthreadpool_finished_job(pool, &jobid); - if (ret) { - smb_panic("aio_pthread_handle_completion"); + res = asys_result(asys_ctx, &ret, &err, &private_data); + if (res == ECANCELED) { return; } - pd = find_private_data_by_jobid(jobid); - if (pd == NULL) { - DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n", - jobid)); + if (res != 0) { + DEBUG(1, ("asys_result returned %s\n", strerror(res))); return; } - aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr; - smbd_aio_complete_aio_ex(aio_ex); + req = talloc_get_type_abort(private_data, struct tevent_req); + state = tevent_req_data(req, struct aio_pthread_state); - DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n", - jobid )); - TALLOC_FREE(aio_ex); -} + talloc_set_destructor(state, NULL); -/************************************************************************ - Find the private data by aiocb. -***********************************************************************/ - -static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb) -{ - struct aio_private_data *pd; - - for (pd = pd_list; pd != NULL; pd = pd->next) { - if (pd->aiocb == aiocb) { - return pd; - } - } - - return NULL; + state->ret = ret; + state->err = err; + tevent_req_done(req); } -/************************************************************************ - Called to return the result of a completed AIO. - Should only be called if aio_error returns something other than EINPROGRESS. - Returns: - Any other value - return from IO operation. -***********************************************************************/ - -static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle, - struct files_struct *fsp, - SMB_STRUCT_AIOCB *aiocb) +static ssize_t aio_pthread_recv(struct tevent_req *req, int *err) { - struct aio_private_data *pd = find_private_data_by_aiocb(aiocb); - - if (pd == NULL) { - errno = EINVAL; - DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n")); - return -1; - } + struct aio_pthread_state *state = tevent_req_data( + req, struct aio_pthread_state); - pd->aiocb = NULL; - - if (pd->cancelled) { - errno = ECANCELED; + if (tevent_req_is_unix_error(req, err)) { return -1; } - - if (pd->ret_size == -1) { - errno = pd->ret_errno; - } - - return pd->ret_size; + *err = state->err; + return state->ret; } -/************************************************************************ - Called to check the result of an AIO. - Returns: - EINPROGRESS - still in progress. - EINVAL - invalid aiocb. - ECANCELED - request was cancelled. - 0 - request completed successfully. - Any other value - errno from IO operation. -***********************************************************************/ - -static int aio_pthread_error_fn(struct vfs_handle_struct *handle, - struct files_struct *fsp, - SMB_STRUCT_AIOCB *aiocb) -{ - struct aio_private_data *pd = find_private_data_by_aiocb(aiocb); - - if (pd == NULL) { - return EINVAL; - } - if (pd->cancelled) { - return ECANCELED; - } - return pd->ret_errno; -} -/************************************************************************ - Called to request the cancel of an AIO, or all of them on a specific - fsp if aiocb == NULL. -***********************************************************************/ - -static int aio_pthread_cancel(struct vfs_handle_struct *handle, - struct files_struct *fsp, - SMB_STRUCT_AIOCB *aiocb) -{ - struct aio_private_data *pd = NULL; - - for (pd = pd_list; pd != NULL; pd = pd->next) { - if (pd->aiocb == NULL) { - continue; - } - if (pd->aiocb->aio_fildes != fsp->fh->fd) { - continue; - } - if ((aiocb != NULL) && (pd->aiocb != aiocb)) { - continue; - } - - /* - * We let the child do its job, but we discard the result when - * it's finished. - */ - - pd->cancelled = true; - } - - return AIO_CANCELED; -} - -/************************************************************************ - Callback for a previously detected job completion. -***********************************************************************/ - -static void aio_pthread_handle_immediate(struct tevent_context *ctx, - struct tevent_immediate *im, - void *private_data) -{ - struct aio_extra *aio_ex = NULL; - struct aio_private_data *pd = (struct aio_private_data *)private_data; - - aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr; - smbd_aio_complete_aio_ex(aio_ex); - TALLOC_FREE(aio_ex); -} - -/************************************************************************ - Private data struct used in suspend completion code. -***********************************************************************/ - -struct suspend_private { - int num_entries; - int num_finished; - const SMB_STRUCT_AIOCB * const *aiocb_array; -}; - -/************************************************************************ - Callback when an IO completes from a suspend call. -***********************************************************************/ - -static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx, - struct fd_event *event, - uint16 flags, - void *p) -{ - struct suspend_private *sp = (struct suspend_private *)p; - struct aio_private_data *pd = NULL; - struct tevent_immediate *im = NULL; - int jobid; - int i; - - DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n", - (int)flags)); - - if ((flags & EVENT_FD_READ) == 0) { - return; - } - - if (pthreadpool_finished_job(pool, &jobid)) { - smb_panic("aio_pthread_handle_suspend_completion: can't find job."); - return; - } - - pd = find_private_data_by_jobid(jobid); - if (pd == NULL) { - DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n", - jobid)); - return; - } - - /* Is this a jobid with an aiocb we're interested in ? */ - for (i = 0; i < sp->num_entries; i++) { - if (sp->aiocb_array[i] == pd->aiocb) { - sp->num_finished++; - return; - } - } - - /* Jobid completed we weren't waiting for. - We must reschedule this as an immediate event - on the main event context. */ - im = tevent_create_immediate(NULL); - if (!im) { - exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory"); - } - - DEBUG(10,("aio_pthread_handle_suspend_completion: " - "re-scheduling job id %d\n", - jobid)); - - tevent_schedule_immediate(im, - server_event_context(), - aio_pthread_handle_immediate, - (void *)pd); -} - - -static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx, - struct tevent_timer *te, - struct timeval now, - void *private_data) -{ - bool *timed_out = (bool *)private_data; - /* Remove this timed event handler. */ - TALLOC_FREE(te); - *timed_out = true; -} +#if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS) /************************************************************************ - Called to request everything to stop until all IO is completed. + Ensure thread pool is initialized. ***********************************************************************/ -static int aio_pthread_suspend(struct vfs_handle_struct *handle, - struct files_struct *fsp, - const SMB_STRUCT_AIOCB * const aiocb_array[], - int n, - const struct timespec *timeout) +static bool init_aio_threadpool(struct event_context *ev_ctx, + struct pthreadpool **pp_pool, + void (*completion_fn)(struct event_context *, + struct fd_event *, + uint16, + void *)) { - struct event_context *ev = NULL; struct fd_event *sock_event = NULL; - int ret = -1; - struct suspend_private sp; - bool timed_out = false; - TALLOC_CTX *frame = talloc_stackframe(); - - /* This is a blocking call, and has to use a sub-event loop. */ - ev = event_context_init(frame); - if (ev == NULL) { - errno = ENOMEM; - goto out; - } + int ret = 0; - if (timeout) { - struct timeval tv = convert_timespec_to_timeval(*timeout); - struct tevent_timer *te = tevent_add_timer(ev, - frame, - timeval_current_ofs(tv.tv_sec, - tv.tv_usec), - aio_pthread_suspend_timed_out, - &timed_out); - if (!te) { - errno = ENOMEM; - goto out; - } + if (*pp_pool) { + return true; } - ZERO_STRUCT(sp); - sp.num_entries = n; - sp.aiocb_array = aiocb_array; - sp.num_finished = 0; - - sock_event = tevent_add_fd(ev, - frame, - pthreadpool_signal_fd(pool), + ret = pthreadpool_init(aio_pending_size, pp_pool); + if (ret) { + errno = ret; + return false; + } + sock_event = tevent_add_fd(ev_ctx, + NULL, + pthreadpool_signal_fd(*pp_pool), TEVENT_FD_READ, - aio_pthread_handle_suspend_completion, - (void *)&sp); + completion_fn, + NULL); if (sock_event == NULL) { - pthreadpool_destroy(pool); - pool = NULL; - goto out; - } - /* - * We're going to cheat here. We know that smbd/aio.c - * only calls this when it's waiting for every single - * outstanding call to finish on a close, so just wait - * individually for each IO to complete. We don't care - * what order they finish - only that they all do. JRA. - */ - while (sp.num_entries != sp.num_finished) { - if (tevent_loop_once(ev) == -1) { - goto out; - } - - if (timed_out) { - errno = EAGAIN; - goto out; - } + pthreadpool_destroy(*pp_pool); + *pp_pool = NULL; + return false; } - ret = 0; - - out: + DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n", + aio_pending_size)); - TALLOC_FREE(frame); - return ret; + return true; } -#if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS) /* * We must have openat() to do any thread-based * asynchronous opens. We also must be using @@ -1034,6 +612,28 @@ static int aio_pthread_connect(vfs_handle_struct *handle, const char *service, *********************************************************************/ aio_pending_size = lp_parm_int( SNUM(handle->conn), "aio_pthread", "aio num threads", 100); + + if (asys_ctx == NULL) { + int ret; + + ret = asys_context_init(&asys_ctx, aio_pending_size); + if (ret != 0) { + DEBUG(1, ("asys_context_init failed: %s\n", + strerror(ret))); + return -1; + } + + asys_fde = tevent_add_fd(handle->conn->sconn->ev_ctx, NULL, + asys_signalfd(asys_ctx), + TEVENT_FD_READ, aio_pthread_finished, + NULL); + if (asys_fde == NULL) { + DEBUG(1, ("tevent_add_fd failed\n")); + asys_context_destroy(asys_ctx); + asys_ctx = NULL; + return -1; + } + } return SMB_VFS_NEXT_CONNECT(handle, service, user); } @@ -1042,12 +642,10 @@ static struct vfs_fn_pointers vfs_aio_pthread_fns = { #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS) .open_fn = aio_pthread_open_fn, #endif - .aio_read_fn = aio_pthread_read, - .aio_write_fn = aio_pthread_write, - .aio_return_fn = aio_pthread_return_fn, - .aio_cancel_fn = aio_pthread_cancel, - .aio_error_fn = aio_pthread_error_fn, - .aio_suspend_fn = aio_pthread_suspend, + .pread_send_fn = aio_pthread_pread_send, + .pread_recv_fn = aio_pthread_recv, + .pwrite_send_fn = aio_pthread_pwrite_send, + .pwrite_recv_fn = aio_pthread_recv, }; NTSTATUS vfs_aio_pthread_init(void); diff --git a/source3/modules/wscript_build b/source3/modules/wscript_build index b912b8c1b6..1f3189affb 100644 --- a/source3/modules/wscript_build +++ b/source3/modules/wscript_build @@ -357,7 +357,7 @@ bld.SAMBA3_MODULE('vfs_aio_fork', bld.SAMBA3_MODULE('vfs_aio_pthread', subsystem='vfs', source=VFS_AIO_PTHREAD_SRC, - deps='samba-util tevent', + deps='samba-util tevent LIBASYS', init_function='', internal_module=bld.SAMBA3_IS_STATIC_MODULE('vfs_aio_pthread'), enabled=bld.SAMBA3_IS_ENABLED_MODULE('vfs_aio_pthread'), |