diff options
-rw-r--r-- | source3/Makefile.in | 5 | ||||
-rw-r--r-- | source3/configure.in | 4 | ||||
-rw-r--r-- | source3/modules/vfs_aio_pthread.c | 637 | ||||
-rw-r--r-- | source3/modules/wscript_build | 10 | ||||
-rw-r--r-- | source3/wscript | 3 |
5 files changed, 659 insertions, 0 deletions
diff --git a/source3/Makefile.in b/source3/Makefile.in index cd73263aa9..4bd1b40b64 100644 --- a/source3/Makefile.in +++ b/source3/Makefile.in @@ -868,6 +868,7 @@ VFS_READAHEAD_OBJ = modules/vfs_readahead.o VFS_TSMSM_OBJ = modules/vfs_tsmsm.o VFS_FILEID_OBJ = modules/vfs_fileid.o VFS_AIO_FORK_OBJ = modules/vfs_aio_fork.o +VFS_AIO_PTHREAD_OBJ = modules/vfs_aio_pthread.o VFS_PREOPEN_OBJ = modules/vfs_preopen.o VFS_SYNCOPS_OBJ = modules/vfs_syncops.o VFS_ACL_XATTR_OBJ = modules/vfs_acl_xattr.o @@ -3054,6 +3055,10 @@ bin/aio_fork.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_AIO_FORK_OBJ) @echo "Building plugin $@" @$(SHLD_MODULE) $(VFS_AIO_FORK_OBJ) +bin/aio_pthread.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_AIO_PTHREAD_OBJ) + @echo "Building plugin $@" + @$(SHLD_MODULE) $(VFS_AIO_PTHREAD_OBJ) + bin/preopen.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_PREOPEN_OBJ) @echo "Building plugin $@" @$(SHLD_MODULE) $(VFS_PREOPEN_OBJ) diff --git a/source3/configure.in b/source3/configure.in index 6f4300cc48..7ed093cd22 100644 --- a/source3/configure.in +++ b/source3/configure.in @@ -6724,6 +6724,9 @@ if test x"$enable_pthreadpool" = x"yes" -a x"$samba_cv_HAVE_PTHREAD" = x"yes"; t AC_SUBST(PTHREADPOOL_OBJ, "lib/pthreadpool/pthreadpool.o") PTHREADPOOLTEST="bin/pthreadpooltest\$(EXEEXT)" AC_SUBST(PTHREADPOOLTEST) + if test x"$samba_cv_HAVE_AIO" = x"yes"; then + default_shared_modules="$default_shared_modules vfs_aio_pthread" + fi fi ################################################# @@ -6947,6 +6950,7 @@ SMB_MODULE(vfs_readahead, \$(VFS_READAHEAD_OBJ), "bin/readahead.$SHLIBEXT", VFS) SMB_MODULE(vfs_tsmsm, \$(VFS_TSMSM_OBJ), "bin/tsmsm.$SHLIBEXT", VFS) SMB_MODULE(vfs_fileid, \$(VFS_FILEID_OBJ), "bin/fileid.$SHLIBEXT", VFS) SMB_MODULE(vfs_aio_fork, \$(VFS_AIO_FORK_OBJ), "bin/aio_fork.$SHLIBEXT", VFS) +SMB_MODULE(vfs_aio_pthread, \$(VFS_AIO_PTHREAD_OBJ), "bin/aio_pthread.$SHLIBEXT", VFS) SMB_MODULE(vfs_preopen, \$(VFS_PREOPEN_OBJ), "bin/preopen.$SHLIBEXT", VFS) SMB_MODULE(vfs_syncops, \$(VFS_SYNCOPS_OBJ), "bin/syncops.$SHLIBEXT", VFS) SMB_MODULE(vfs_zfsacl, \$(VFS_ZFSACL_OBJ), "bin/zfsacl.$SHLIBEXT", VFS) diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_pthread.c new file mode 100644 index 0000000000..56a2abd0d3 --- /dev/null +++ b/source3/modules/vfs_aio_pthread.c @@ -0,0 +1,637 @@ +/* + * Simulate Posix AIO using pthreads. + * + * Based on the aio_fork work from Volker and Volker's pthreadpool library. + * + * Copyright (C) Volker Lendecke 2008 + * Copyright (C) Jeremy Allison 2012 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include "includes.h" +#include "system/filesys.h" +#include "system/shmem.h" +#include "smbd/smbd.h" +#include "pthreadpool.h" + +struct aio_extra; +static struct pthreadpool *pool; +static int aio_pthread_jobid; + +struct aio_private_data { + struct aio_private_data *prev, *next; + int jobid; + SMB_STRUCT_AIOCB *aiocb; + ssize_t ret_size; + int ret_errno; + bool cancelled; + bool write_command; +}; + +/* List of outstanding requests we have. */ +struct aio_private_data *pd_list; + +static void aio_pthread_handle_completion(struct event_context *event_ctx, + struct fd_event *event, + uint16 flags, + void *p); + +/************************************************************************ + How many threads to initialize ? +***********************************************************************/ + +static int aio_get_num_threads(void) +{ + return 10; +} + +#if 0 +/************************************************************************ + Called every 30 seconds to destroy pool if it's idle. +***********************************************************************/ + +static void idle_pool_destroy_timer(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval current_time, + void *private_data) +{ + struct timeval ne; + + TALLOC_FREE(te); + + if (pool && pd_list == NULL) { + if (pthreadpool_destroy(pool) == 0) { + pool = NULL; + } + DEBUG(10,("idle_pool_destroy_timer: destroyed AIO pool.\n")); + return; + } + + /* Here, the IO is still active. */ + + /* Set an event up for 30 seconds time - if we have + no outstanding IO at this time shut the threadpool + down. */ + ne = tevent_timeval_current_ofs(30, 0); + tevent_add_timer(server_event_context(), + NULL, + ne, + idle_pool_destroy_timer, + NULL); +} +#endif + +/************************************************************************ + Ensure thread pool is initialized. +***********************************************************************/ + +static bool init_aio_threadpool(void) +{ + struct fd_event *sock_event = NULL; + int ret = 0; + int num_threads = aio_get_num_threads(); +#if 0 + struct timeval ne; +#endif + + if (pool) { + return true; + } + + ret = pthreadpool_init(num_threads, &pool); + if (ret) { + errno = ret; + return false; + } + sock_event = tevent_add_fd(server_event_context(), + NULL, + pthreadpool_signal_fd(pool), + TEVENT_FD_READ, + aio_pthread_handle_completion, + NULL); + if (sock_event == NULL) { + pthreadpool_destroy(pool); + pool = NULL; + return false; + } + +#if 0 + /* Set an event up for 30 seconds time - if we have + no outstanding IO at this time shut the threadpool + down. */ + ne = tevent_timeval_current_ofs(30, 0); + tevent_add_timer(server_event_context(), + NULL, + ne, + idle_pool_destroy_timer, + NULL); +#endif + + DEBUG(10,("init_aio_threadpool: initialized with %d threads\n", + num_threads)); + + return true; +} + + +/************************************************************************ + Worker function - core of the pthread aio engine. + This is the function that actually does the IO. +***********************************************************************/ + +static void aio_worker(void *private_data) +{ + struct aio_private_data *pd = + (struct aio_private_data *)private_data; + + if (pd->write_command) { + pd->ret_size = pwrite(pd->aiocb->aio_fildes, + (const void *)pd->aiocb->aio_buf, + pd->aiocb->aio_nbytes, + pd->aiocb->aio_offset); + } else { + pd->ret_size = pread(pd->aiocb->aio_fildes, + (void *)pd->aiocb->aio_buf, + pd->aiocb->aio_nbytes, + pd->aiocb->aio_offset); + } + if (pd->ret_size == -1) { + pd->ret_errno = errno; + } else { + pd->ret_errno = 0; + } +} + +/************************************************************************ + Private data destructor. +***********************************************************************/ + +static int pd_destructor(struct aio_private_data *pd) +{ + DLIST_REMOVE(pd_list, pd); + return 0; +} + +/************************************************************************ + Create and initialize a private data struct. +***********************************************************************/ + +static struct aio_private_data *create_private_data(TALLOC_CTX *ctx, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data); + if (!pd) { + return NULL; + } + pd->jobid = aio_pthread_jobid++; + pd->aiocb = aiocb; + pd->ret_size = -1; + pd->ret_errno = EINPROGRESS; + talloc_set_destructor(pd, pd_destructor); + DLIST_ADD_END(pd_list, pd, struct aio_private_data *); + return pd; +} + +/************************************************************************ + Spin off a threadpool (if needed) and initiate a pread call. +***********************************************************************/ + +static int aio_pthread_read(struct vfs_handle_struct *handle, + struct files_struct *fsp, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr; + struct aio_private_data *pd = NULL; + int ret; + + if (!init_aio_threadpool()) { + return -1; + } + + pd = create_private_data(aio_ex, aiocb); + if (pd == NULL) { + DEBUG(10, ("aio_pthread_read: Could not create private data.\n")); + return -1; + } + + ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd); + if (ret) { + errno = ret; + return -1; + } + + return 0; +} + +/************************************************************************ + Spin off a threadpool (if needed) and initiate a pwrite call. +***********************************************************************/ + +static int aio_pthread_write(struct vfs_handle_struct *handle, + struct files_struct *fsp, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr; + struct aio_private_data *pd = NULL; + int ret; + + if (!init_aio_threadpool()) { + return -1; + } + + pd = create_private_data(aio_ex, aiocb); + if (pd == NULL) { + DEBUG(10, ("aio_pthread_write: Could not create private data.\n")); + return -1; + } + + pd->write_command = true; + + ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd); + if (ret) { + errno = ret; + return -1; + } + + return 0; +} + +/************************************************************************ + Find the private data by jobid. +***********************************************************************/ + +static struct aio_private_data *find_private_data_by_jobid(int jobid) +{ + struct aio_private_data *pd; + + for (pd = pd_list; pd != NULL; pd = pd->next) { + if (pd->jobid == jobid) { + return pd; + } + } + + return NULL; +} + +/************************************************************************ + Callback when an IO completes. +***********************************************************************/ + +static void aio_pthread_handle_completion(struct event_context *event_ctx, + struct fd_event *event, + uint16 flags, + void *p) +{ + struct aio_extra *aio_ex = NULL; + struct aio_private_data *pd = NULL; + int jobid = 0; + int ret; + + DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n", + (int)flags)); + + if ((flags & EVENT_FD_READ) == 0) { + return; + } + + ret = pthreadpool_finished_job(pool, &jobid); + if (ret) { + smb_panic("aio_pthread_handle_completion"); + return; + } + + pd = find_private_data_by_jobid(jobid); + if (pd == NULL) { + DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n", + jobid)); + return; + } + + aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr; + smbd_aio_complete_aio_ex(aio_ex); +} + +/************************************************************************ + Find the private data by aiocb. +***********************************************************************/ + +static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_private_data *pd; + + for (pd = pd_list; pd != NULL; pd = pd->next) { + if (pd->aiocb == aiocb) { + return pd; + } + } + + return NULL; +} + +/************************************************************************ + Called to return the result of a completed AIO. + Should only be called if aio_error returns something other than EINPROGRESS. + Returns: + Any other value - return from IO operation. +***********************************************************************/ + +static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle, + struct files_struct *fsp, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_private_data *pd = find_private_data_by_aiocb(aiocb); + + if (pd == NULL) { + errno = EINVAL; + DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n")); + return -1; + } + + pd->aiocb = NULL; + + if (pd->ret_size == -1) { + errno = pd->ret_errno; + } + + return pd->ret_size; +} + +/************************************************************************ + Called to check the result of an AIO. + Returns: + EINPROGRESS - still in progress. + EINVAL - invalid aiocb. + ECANCELED - request was cancelled. + 0 - request completed successfully. + Any other value - errno from IO operation. +***********************************************************************/ + +static int aio_pthread_error_fn(struct vfs_handle_struct *handle, + struct files_struct *fsp, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_private_data *pd = find_private_data_by_aiocb(aiocb); + + if (pd == NULL) { + return EINVAL; + } + if (pd->cancelled) { + return ECANCELED; + } + return pd->ret_errno; +} + +/************************************************************************ + Called to request the cancel of an AIO, or all of them on a specific + fsp if aiocb == NULL. +***********************************************************************/ + +static int aio_pthread_cancel(struct vfs_handle_struct *handle, + struct files_struct *fsp, + SMB_STRUCT_AIOCB *aiocb) +{ + struct aio_private_data *pd = NULL; + + for (pd = pd_list; pd != NULL; pd = pd->next) { + if (pd->aiocb == NULL) { + continue; + } + if (pd->aiocb->aio_fildes != fsp->fh->fd) { + continue; + } + if ((aiocb != NULL) && (pd->aiocb != aiocb)) { + continue; + } + + /* + * We let the child do its job, but we discard the result when + * it's finished. + */ + + pd->cancelled = true; + } + + return AIO_CANCELED; +} + +/************************************************************************ + Callback for a previously detected job completion. +***********************************************************************/ + +static void aio_pthread_handle_immediate(struct tevent_context *ctx, + struct tevent_immediate *im, + void *private_data) +{ + struct aio_extra *aio_ex = NULL; + int *pjobid = (int *)private_data; + struct aio_private_data *pd = find_private_data_by_jobid(*pjobid); + + if (pd == NULL) { + DEBUG(1, ("aio_pthread_handle_immediate cannot find jobid %d\n", + *pjobid)); + TALLOC_FREE(pjobid); + return; + } + + TALLOC_FREE(pjobid); + aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr; + smbd_aio_complete_aio_ex(aio_ex); +} + +/************************************************************************ + Private data struct used in suspend completion code. +***********************************************************************/ + +struct suspend_private { + int num_entries; + int num_finished; + const SMB_STRUCT_AIOCB * const *aiocb_array; +}; + +/************************************************************************ + Callback when an IO completes from a suspend call. +***********************************************************************/ + +static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx, + struct fd_event *event, + uint16 flags, + void *p) +{ + struct suspend_private *sp = (struct suspend_private *)p; + struct aio_private_data *pd = NULL; + struct tevent_immediate *im = NULL; + int *pjobid = NULL; + int i; + + DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n", + (int)flags)); + + if ((flags & EVENT_FD_READ) == 0) { + return; + } + + pjobid = talloc_array(NULL, int, 1); + if (pjobid) { + smb_panic("aio_pthread_handle_suspend_completion: no memory."); + } + + if (pthreadpool_finished_job(pool, pjobid)) { + smb_panic("aio_pthread_handle_suspend_completion: can't find job."); + return; + } + + pd = find_private_data_by_jobid(*pjobid); + if (pd == NULL) { + DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n", + *pjobid)); + TALLOC_FREE(pjobid); + return; + } + + /* Is this a jobid with an aiocb we're interested in ? */ + for (i = 0; i < sp->num_entries; i++) { + if (sp->aiocb_array[i] == pd->aiocb) { + sp->num_finished++; + TALLOC_FREE(pjobid); + return; + } + } + + /* Jobid completed we weren't waiting for. + We must reshedule this as an immediate event + on the main event context. */ + im = tevent_create_immediate(NULL); + if (!im) { + exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory"); + } + + DEBUG(10,("aio_pthread_handle_suspend_completion: " + "re-scheduling job id %d\n", + *pjobid)); + + tevent_schedule_immediate(im, + server_event_context(), + aio_pthread_handle_immediate, + (void *)pjobid); +} + + +static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx, + struct tevent_timer *te, + struct timeval now, + void *private_data) +{ + bool *timed_out = (bool *)private_data; + /* Remove this timed event handler. */ + TALLOC_FREE(te); + *timed_out = true; +} + +/************************************************************************ + Called to request everything to stop until all IO is completed. +***********************************************************************/ + +static int aio_pthread_suspend(struct vfs_handle_struct *handle, + struct files_struct *fsp, + const SMB_STRUCT_AIOCB * const aiocb_array[], + int n, + const struct timespec *timeout) +{ + struct event_context *ev = NULL; + struct fd_event *sock_event = NULL; + int ret = -1; + struct suspend_private sp; + bool timed_out = false; + TALLOC_CTX *frame = talloc_stackframe(); + + /* This is a blocking call, and has to use a sub-event loop. */ + ev = event_context_init(frame); + if (ev == NULL) { + errno = ENOMEM; + goto out; + } + + if (timeout) { + struct timeval tv = convert_timespec_to_timeval(*timeout); + struct tevent_timer *te = tevent_add_timer(ev, + frame, + timeval_current_ofs(tv.tv_sec, + tv.tv_usec), + aio_pthread_suspend_timed_out, + &timed_out); + if (!te) { + errno = ENOMEM; + goto out; + } + } + + ZERO_STRUCT(sp); + sp.num_entries = n; + sp.aiocb_array = aiocb_array; + sp.num_finished = 0; + + sock_event = tevent_add_fd(ev, + frame, + pthreadpool_signal_fd(pool), + TEVENT_FD_READ, + aio_pthread_handle_suspend_completion, + (void *)&sp); + if (sock_event == NULL) { + pthreadpool_destroy(pool); + pool = NULL; + goto out; + } + /* + * We're going to cheat here. We know that smbd/aio.c + * only calls this when it's waiting for every single + * outstanding call to finish on a close, so just wait + * individually for each IO to complete. We don't care + * what order they finish - only that they all do. JRA. + */ + while (sp.num_entries != sp.num_finished) { + if (tevent_loop_once(ev) == -1) { + goto out; + } + + if (timed_out) { + errno = EAGAIN; + goto out; + } + } + + ret = 0; + + out: + + TALLOC_FREE(frame); + return ret; +} + +static struct vfs_fn_pointers vfs_aio_pthread_fns = { + .aio_read_fn = aio_pthread_read, + .aio_write_fn = aio_pthread_write, + .aio_return_fn = aio_pthread_return_fn, + .aio_cancel_fn = aio_pthread_cancel, + .aio_error_fn = aio_pthread_error_fn, + .aio_suspend_fn = aio_pthread_suspend, +}; + +NTSTATUS vfs_aio_pthread_init(void); +NTSTATUS vfs_aio_pthread_init(void) +{ + return smb_register_vfs(SMB_VFS_INTERFACE_VERSION, + "aio_pthread", &vfs_aio_pthread_fns); +} diff --git a/source3/modules/wscript_build b/source3/modules/wscript_build index 86fc64781a..d59e29ccd9 100644 --- a/source3/modules/wscript_build +++ b/source3/modules/wscript_build @@ -36,6 +36,7 @@ VFS_READAHEAD_SRC = 'vfs_readahead.c' VFS_TSMSM_SRC = 'vfs_tsmsm.c' VFS_FILEID_SRC = 'vfs_fileid.c' VFS_AIO_FORK_SRC = 'vfs_aio_fork.c' +VFS_AIO_PTHREAD_SRC = 'vfs_aio_pthread.c' VFS_PREOPEN_SRC = 'vfs_preopen.c' VFS_SYNCOPS_SRC = 'vfs_syncops.c' VFS_ACL_XATTR_SRC = 'vfs_acl_xattr.c' @@ -348,6 +349,15 @@ bld.SAMBA3_MODULE('vfs_aio_fork', enabled=bld.SAMBA3_IS_ENABLED_MODULE('vfs_aio_fork'), allow_undefined_symbols=True) +bld.SAMBA3_MODULE('vfs_aio_pthread', + subsystem='vfs', + source=VFS_AIO_PTHREAD_SRC, + deps='samba-util', + init_function='', + internal_module=bld.SAMBA3_IS_STATIC_MODULE('vfs_aio_pthread'), + enabled=bld.SAMBA3_IS_ENABLED_MODULE('vfs_aio_pthread'), + allow_undefined_symbols=True) + bld.SAMBA3_MODULE('vfs_preopen', subsystem='vfs', source=VFS_PREOPEN_SRC, diff --git a/source3/wscript b/source3/wscript index cc925a617a..846917c13b 100644 --- a/source3/wscript +++ b/source3/wscript @@ -1683,6 +1683,9 @@ main() { if conf.CONFIG_SET('HAVE_AIO') and (conf.CONFIG_SET('HAVE_MSGHDR_MSG_CONTROL') or conf.CONFIG_SET('HAVE_MSGHDR_MSG_ACCTRIGHTS')): default_shared_modules.extend(TO_LIST('vfs_aio_fork')) + if conf.CONFIG_SET('HAVE_AIO') and Options.options.with_pthreadpool: + default_shared_modules.extend(TO_LIST('vfs_aio_pthread')) + if conf.CONFIG_SET('HAVE_LDAP'): default_static_modules.extend(TO_LIST('pdb_ldap idmap_ldap')) |