From 8ee30be431246863a0ddb2f942b7b6fe51788d6d Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Tue, 10 Jul 2012 20:59:27 -0700 Subject: Add in the threaded async open engine. Fixes all issues raised originally. This code will only do threaded opens with thread-specific credentials (Linux for now) and changes credentials before doing the call. Also only fires on O_CREAT|O_EXCL so will only create new files, never open old ones async. Volker, this is isolated enough that it shouldn't prevent you from refactoring it into a new module when the aio pread/pwrite code is moved into the default aio path. Autobuild-User(master): Jeremy Allison Autobuild-Date(master): Wed Jul 11 08:04:56 CEST 2012 on sn-devel-104 --- source3/modules/vfs_aio_pthread.c | 393 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 393 insertions(+) (limited to 'source3') diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_pthread.c index 7167818608..d62af57256 100644 --- a/source3/modules/vfs_aio_pthread.c +++ b/source3/modules/vfs_aio_pthread.c @@ -590,6 +590,396 @@ static int aio_pthread_suspend(struct vfs_handle_struct *handle, return ret; } +#if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS) +/* + * We must have openat() to do any thread-based + * asynchronous opens. We also must be using + * thread-specific credentials (Linux-only + * for now). + */ + +/* + * NB. This threadpool is shared over all + * instances of this VFS module in this + * process, as is the current jobid. + */ + +static struct pthreadpool *open_pool; +static int aio_pthread_open_jobid; + +struct aio_open_private_data { + struct aio_open_private_data *prev, *next; + /* Inputs. */ + int jobid; + int dir_fd; + int flags; + mode_t mode; + uint64_t mid; + bool in_progress; + const char *fname; + char *dname; + struct smbd_server_connection *sconn; + const struct security_unix_token *ux_tok; + /* Returns. */ + int ret_fd; + int ret_errno; +}; + +/* List of outstanding requests we have. */ +static struct aio_open_private_data *open_pd_list; + +/************************************************************************ + Find the open private data by jobid. +***********************************************************************/ + +static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid) +{ + struct aio_open_private_data *opd; + + for (opd = open_pd_list; opd != NULL; opd = opd->next) { + if (opd->jobid == jobid) { + return opd; + } + } + + return NULL; +} + +/************************************************************************ + Find the open private data by mid. +***********************************************************************/ + +static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid) +{ + struct aio_open_private_data *opd; + + for (opd = open_pd_list; opd != NULL; opd = opd->next) { + if (opd->mid == mid) { + return opd; + } + } + + return NULL; +} + +/************************************************************************ + Callback when an open completes. +***********************************************************************/ + +static void aio_open_handle_completion(struct event_context *event_ctx, + struct fd_event *event, + uint16 flags, + void *p) +{ + struct aio_open_private_data *opd = NULL; + int jobid = 0; + int ret; + + DEBUG(10, ("aio_open_handle_completion called with flags=%d\n", + (int)flags)); + + if ((flags & EVENT_FD_READ) == 0) { + return; + } + + ret = pthreadpool_finished_job(open_pool, &jobid); + if (ret) { + smb_panic("aio_open_handle_completion"); + /* notreached. */ + return; + } + + opd = find_open_private_data_by_jobid(jobid); + if (opd == NULL) { + DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n", + jobid)); + smb_panic("aio_open_handle_completion - no jobid"); + /* notreached. */ + return; + } + + DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu " + "for file %s/%s completed\n", + jobid, + (unsigned long long)opd->mid, + opd->dname, + opd->fname)); + + opd->in_progress = false; + + /* Find outstanding event and reschdule. */ + if (!schedule_deferred_open_message_smb(opd->sconn, opd->mid)) { + /* + * Outstanding event didn't exist or was + * cancelled. Free up the fd and throw + * away the result. + */ + if (opd->ret_fd != -1) { + close(opd->ret_fd); + opd->ret_fd = -1; + } + TALLOC_FREE(opd); + } +} + +/***************************************************************** + The core of the async open code - the worker function. Note we + use the new openat() system call to avoid any problems with + current working directory changes plus we change credentials + on the thread to prevent any security race conditions. +*****************************************************************/ + +static void aio_open_worker(void *private_data) +{ + struct aio_open_private_data *opd = + (struct aio_open_private_data *)private_data; + + /* Become the correct credential on this thread. */ + if (set_thread_credentials(opd->ux_tok->uid, + opd->ux_tok->gid, + (size_t)opd->ux_tok->ngroups, + opd->ux_tok->groups) != 0) { + opd->ret_fd = -1; + opd->ret_errno = errno; + return; + } + + opd->ret_fd = openat(opd->dir_fd, + opd->fname, + opd->flags, + opd->mode); + + if (opd->ret_fd == -1) { + opd->ret_errno = errno; + } else { + /* Create was successful. */ + opd->ret_errno = 0; + } +} + +/************************************************************************ + Open private data destructor. +***********************************************************************/ + +static int opd_destructor(struct aio_open_private_data *opd) +{ + if (opd->dir_fd != -1) { + close(opd->dir_fd); + } + DLIST_REMOVE(open_pd_list, opd); + return 0; +} + +/************************************************************************ + Create and initialize a private data struct for async open. +***********************************************************************/ + +static struct aio_open_private_data *create_private_open_data(const files_struct *fsp, + int flags, + mode_t mode) +{ + struct aio_open_private_data *opd = talloc_zero(NULL, + struct aio_open_private_data); + const char *fname = NULL; + + if (!opd) { + return NULL; + } + + opd->jobid = aio_pthread_open_jobid++; + opd->dir_fd = -1; + opd->ret_fd = -1; + opd->ret_errno = EINPROGRESS; + opd->flags = flags; + opd->mode = mode; + opd->mid = fsp->mid; + opd->in_progress = true; + opd->sconn = fsp->conn->sconn; + + /* Copy our current credentials. */ + opd->ux_tok = copy_unix_token(opd, get_current_utok(fsp->conn)); + if (opd->ux_tok == NULL) { + TALLOC_FREE(opd); + return NULL; + } + + /* + * Copy the parent directory name and the + * relative path within it. + */ + if (parent_dirname(opd, + fsp->fsp_name->base_name, + &opd->dname, + &fname) == false) { + TALLOC_FREE(opd); + return NULL; + } + opd->fname = talloc_strdup(opd, fname); + if (opd->fname == NULL) { + TALLOC_FREE(opd); + return NULL; + } + +#if defined(O_DIRECTORY) + opd->dir_fd = open(opd->dname, O_RDONLY|O_DIRECTORY); +#else + opd->dir_fd = open(opd->dname, O_RDONLY); +#endif + if (opd->dir_fd == -1) { + TALLOC_FREE(opd); + return NULL; + } + + talloc_set_destructor(opd, opd_destructor); + DLIST_ADD_END(open_pd_list, opd, struct aio_open_private_data *); + return opd; +} + +/***************************************************************** + Setup an async open. +*****************************************************************/ + +static int open_async(const files_struct *fsp, + int flags, + mode_t mode) +{ + struct aio_open_private_data *opd = NULL; + int ret; + + if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx, + &open_pool, + aio_open_handle_completion)) { + return -1; + } + + opd = create_private_open_data(fsp, flags, mode); + if (opd == NULL) { + DEBUG(10, ("open_async: Could not create private data.\n")); + return -1; + } + + ret = pthreadpool_add_job(open_pool, + opd->jobid, + aio_open_worker, + (void *)opd); + if (ret) { + errno = ret; + return -1; + } + + DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n", + (unsigned long long)opd->mid, + opd->jobid, + opd->dname, + opd->fname)); + + /* Cause the calling code to reschedule us. */ + errno = EINTR; /* Maps to NT_STATUS_RETRY. */ + return -1; +} + +/***************************************************************** + Look for a matching SMB2 mid. If we find it we're rescheduled, + just return the completed open. +*****************************************************************/ + +static bool find_completed_open(files_struct *fsp, + int *p_fd, + int *p_errno) +{ + struct aio_open_private_data *opd; + + opd = find_open_private_data_by_mid(fsp->mid); + if (!opd) { + return false; + } + + if (opd->in_progress) { + DEBUG(0,("find_completed_open: mid %llu " + "jobid %d still in progress for " + "file %s/%s. PANIC !\n", + (unsigned long long)opd->mid, + opd->jobid, + opd->dname, + opd->fname)); + /* Disaster ! This is an open timeout. Just panic. */ + smb_panic("find_completed_open - in_progress\n"); + /* notreached. */ + return false; + } + + *p_fd = opd->ret_fd; + *p_errno = opd->ret_errno; + + DEBUG(5,("find_completed_open: mid %llu returning " + "fd = %d, errno = %d (%s) " + "jobid (%d) for file %s\n", + (unsigned long long)opd->mid, + opd->ret_fd, + opd->ret_errno, + strerror(opd->ret_errno), + opd->jobid, + smb_fname_str_dbg(fsp->fsp_name))); + + /* Now we can free the opd. */ + TALLOC_FREE(opd); + return true; +} + +/***************************************************************** + The core open function. Only go async on O_CREAT|O_EXCL + opens to prevent any race conditions. +*****************************************************************/ + +static int aio_pthread_open_fn(vfs_handle_struct *handle, + struct smb_filename *smb_fname, + files_struct *fsp, + int flags, + mode_t mode) +{ + int my_errno = 0; + int fd = -1; + bool aio_allow_open = lp_parm_bool( + SNUM(handle->conn), "aio_pthread", "aio open", false); + + if (smb_fname->stream_name) { + /* Don't handle stream opens. */ + errno = ENOENT; + return -1; + } + + if (!aio_allow_open) { + /* aio opens turned off. */ + return open(smb_fname->base_name, flags, mode); + } + + if (!(flags & O_CREAT)) { + /* Only creates matter. */ + return open(smb_fname->base_name, flags, mode); + } + + if (!(flags & O_EXCL)) { + /* Only creates with O_EXCL matter. */ + return open(smb_fname->base_name, flags, mode); + } + + /* + * See if this is a reentrant call - i.e. is this a + * restart of an existing open that just completed. + */ + + if (find_completed_open(fsp, + &fd, + &my_errno)) { + errno = my_errno; + return fd; + } + + /* Ok, it's a create exclusive call - pass it to a thread helper. */ + return open_async(fsp, flags, mode); +} +#endif + static int aio_pthread_connect(vfs_handle_struct *handle, const char *service, const char *user) { @@ -610,6 +1000,9 @@ static int aio_pthread_connect(vfs_handle_struct *handle, const char *service, static struct vfs_fn_pointers vfs_aio_pthread_fns = { .connect_fn = aio_pthread_connect, +#if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS) + .open_fn = aio_pthread_open_fn, +#endif .aio_read_fn = aio_pthread_read, .aio_write_fn = aio_pthread_write, .aio_return_fn = aio_pthread_return_fn, -- cgit