/* * Async syscalls * Copyright (C) Volker Lendecke 2012 * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "asys.h" #include #include #include "../pthreadpool/pthreadpool.h" struct asys_pwrite_args { int fildes; const void *buf; size_t nbyte; off_t offset; }; struct asys_pread_args { int fildes; void *buf; size_t nbyte; off_t offset; }; struct asys_fsync_args { int fildes; }; union asys_job_args { struct asys_pwrite_args pwrite_args; struct asys_pread_args pread_args; struct asys_fsync_args fsync_args; }; struct asys_job { void *private_data; union asys_job_args args; ssize_t ret; int err; char busy; char canceled; }; struct asys_context { struct pthreadpool *pool; int pthreadpool_fd; unsigned num_jobs; struct asys_job **jobs; }; struct asys_creds_context { int dummy; }; int asys_context_init(struct asys_context **pctx, unsigned max_parallel) { struct asys_context *ctx; int ret; ctx = calloc(1, sizeof(struct asys_context)); if (ctx == NULL) { return ENOMEM; } ret = pthreadpool_init(max_parallel, &ctx->pool); if (ret != 0) { free(ctx); return ret; } ctx->pthreadpool_fd = pthreadpool_signal_fd(ctx->pool); *pctx = ctx; return 0; } int asys_signalfd(struct asys_context *ctx) { return ctx->pthreadpool_fd; } int asys_context_destroy(struct asys_context *ctx) { int ret; unsigned i; for (i=0; inum_jobs; i++) { if (ctx->jobs[i]->busy) { return EBUSY; } } ret = pthreadpool_destroy(ctx->pool); if (ret != 0) { return ret; } for (i=0; inum_jobs; i++) { free(ctx->jobs[i]); } free(ctx->jobs); free(ctx); return 0; } static int asys_new_job(struct asys_context *ctx, int *jobid, struct asys_job **pjob) { struct asys_job **tmp; struct asys_job *job; unsigned i; for (i=0; inum_jobs; i++) { job = ctx->jobs[i]; if (!job->busy) { job->err = 0; *pjob = job; *jobid = i; return 0; } } if (ctx->num_jobs+1 == 0) { return EBUSY; /* overflow */ } tmp = realloc(ctx->jobs, sizeof(struct asys_job *)*(ctx->num_jobs+1)); if (tmp == NULL) { return ENOMEM; } ctx->jobs = tmp; job = calloc(1, sizeof(struct asys_job)); if (job == NULL) { return ENOMEM; } ctx->jobs[ctx->num_jobs] = job; *jobid = ctx->num_jobs; *pjob = job; ctx->num_jobs += 1; return 0; } static void asys_pwrite_do(void *private_data); int asys_pwrite(struct asys_context *ctx, int fildes, const void *buf, size_t nbyte, off_t offset, void *private_data) { struct asys_job *job; struct asys_pwrite_args *args; int jobid; int ret; ret = asys_new_job(ctx, &jobid, &job); if (ret != 0) { return ret; } job->private_data = private_data; args = &job->args.pwrite_args; args->fildes = fildes; args->buf = buf; args->nbyte = nbyte; args->offset = offset; ret = pthreadpool_add_job(ctx->pool, jobid, asys_pwrite_do, job); if (ret != 0) { return ret; } job->busy = 1; return 0; } static void asys_pwrite_do(void *private_data) { struct asys_job *job = (struct asys_job *)private_data; struct asys_pwrite_args *args = &job->args.pwrite_args; job->ret = pwrite(args->fildes, args->buf, args->nbyte, args->offset); if (job->ret == -1) { job->err = errno; } } static void asys_pread_do(void *private_data); int asys_pread(struct asys_context *ctx, int fildes, void *buf, size_t nbyte, off_t offset, void *private_data) { struct asys_job *job; struct asys_pread_args *args; int jobid; int ret; ret = asys_new_job(ctx, &jobid, &job); if (ret != 0) { return ret; } job->private_data = private_data; args = &job->args.pread_args; args->fildes = fildes; args->buf = buf; args->nbyte = nbyte; args->offset = offset; ret = pthreadpool_add_job(ctx->pool, jobid, asys_pread_do, job); if (ret != 0) { return ret; } job->busy = 1; return 0; } static void asys_pread_do(void *private_data) { struct asys_job *job = (struct asys_job *)private_data; struct asys_pread_args *args = &job->args.pread_args; job->ret = pread(args->fildes, args->buf, args->nbyte, args->offset); if (job->ret == -1) { job->err = errno; } } static void asys_fsync_do(void *private_data); int asys_fsync(struct asys_context *ctx, int fildes, void *private_data) { struct asys_job *job; struct asys_fsync_args *args; int jobid; int ret; ret = asys_new_job(ctx, &jobid, &job); if (ret != 0) { return ret; } job->private_data = private_data; args = &job->args.fsync_args; args->fildes = fildes; ret = pthreadpool_add_job(ctx->pool, jobid, asys_fsync_do, job); if (ret != 0) { return ret; } job->busy = 1; return 0; } static void asys_fsync_do(void *private_data) { struct asys_job *job = (struct asys_job *)private_data; struct asys_fsync_args *args = &job->args.fsync_args; job->ret = fsync(args->fildes); if (job->ret == -1) { job->err = errno; } } void asys_cancel(struct asys_context *ctx, void *private_data) { unsigned i; for (i=0; inum_jobs; i++) { struct asys_job *job = ctx->jobs[i]; if (job->private_data == private_data) { job->canceled = 1; } } } int asys_result(struct asys_context *ctx, ssize_t *pret, int *perrno, void *pdata) { void **pprivate_data = (void **)pdata; struct asys_job *job; int ret, jobid; ret = pthreadpool_finished_job(ctx->pool, &jobid); if (ret != 0) { return ret; } if ((jobid < 0) || (jobid >= ctx->num_jobs)) { return EIO; } job = ctx->jobs[jobid]; if (job->canceled) { return ECANCELED; } *pret = job->ret; *perrno = job->err; *pprivate_data = job->private_data; job->busy = 0; return 0; }