summaryrefslogtreecommitdiff
path: root/source3/lib/pthreadpool.c
diff options
context:
space:
mode:
authorVolker Lendecke <vl@samba.org>2011-04-22 11:47:11 +0200
committerVolker Lendecke <vl@samba.org>2011-04-25 09:50:32 +0200
commit62689d8166b8e070f855e6910470796dd7e1b2c8 (patch)
tree3ff4c20867ed0401753fa880b949fa98dc795012 /source3/lib/pthreadpool.c
parent23a6af46c84cd9b738af403d80c5187d858eac03 (diff)
downloadsamba-62689d8166b8e070f855e6910470796dd7e1b2c8.tar.gz
samba-62689d8166b8e070f855e6910470796dd7e1b2c8.tar.bz2
samba-62689d8166b8e070f855e6910470796dd7e1b2c8.zip
s3: Many pthreadpool fixes
In particular, this makes it fork-safe
Diffstat (limited to 'source3/lib/pthreadpool.c')
-rw-r--r--source3/lib/pthreadpool.c505
1 files changed, 0 insertions, 505 deletions
diff --git a/source3/lib/pthreadpool.c b/source3/lib/pthreadpool.c
deleted file mode 100644
index b62bab0a2e..0000000000
--- a/source3/lib/pthreadpool.c
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * thread pool implementation
- * Copyright (C) Volker Lendecke 2009
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include <errno.h>
-#include <stdio.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <signal.h>
-#include <assert.h>
-#include <fcntl.h>
-
-#include "pthreadpool.h"
-
-struct pthreadpool_job {
- struct pthreadpool_job *next;
- int id;
- void (*fn)(void *private_data);
- void *private_data;
-};
-
-struct pthreadpool {
- /*
- * Control access to this struct
- */
- pthread_mutex_t mutex;
-
- /*
- * Threads waiting for work do so here
- */
- pthread_cond_t condvar;
-
- /*
- * List of work jobs
- */
- struct pthreadpool_job *jobs, *last_job;
-
- /*
- * pipe for signalling
- */
- int sig_pipe[2];
-
- /*
- * indicator to worker threads that they should shut down
- */
- int shutdown;
-
- /*
- * maximum number of threads
- */
- int max_threads;
-
- /*
- * Number of threads
- */
- int num_threads;
-
- /*
- * Number of idle threads
- */
- int num_idle;
-
- /*
- * An array of threads that require joining, the array has
- * "max_threads" elements. It contains "num_exited" ids.
- */
- int num_exited;
- pthread_t exited[1]; /* We alloc more */
-};
-
-/*
- * Initialize a thread pool
- */
-
-int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
-{
- struct pthreadpool *pool;
- size_t size;
- int ret;
-
- size = sizeof(struct pthreadpool) + max_threads * sizeof(pthread_t);
-
- pool = (struct pthreadpool *)malloc(size);
- if (pool == NULL) {
- return ENOMEM;
- }
-
- ret = pthread_mutex_init(&pool->mutex, NULL);
- if (ret != 0) {
- free(pool);
- return ret;
- }
-
- ret = pthread_cond_init(&pool->condvar, NULL);
- if (ret != 0) {
- pthread_mutex_destroy(&pool->mutex);
- free(pool);
- return ret;
- }
-
- pool->shutdown = 0;
- pool->jobs = pool->last_job = NULL;
- pool->num_threads = 0;
- pool->num_exited = 0;
- pool->max_threads = max_threads;
- pool->num_idle = 0;
- pool->sig_pipe[0] = -1;
- pool->sig_pipe[1] = -1;
-
- *presult = pool;
- return 0;
-}
-
-/*
- * Create and return a file descriptor which becomes readable when a job has
- * finished
- */
-
-int pthreadpool_sig_fd(struct pthreadpool *pool)
-{
- int result, ret;
-
- ret = pthread_mutex_lock(&pool->mutex);
- if (ret != 0) {
- errno = ret;
- return -1;
- }
-
- if (pool->sig_pipe[0] != -1) {
- result = pool->sig_pipe[0];
- goto done;
- }
-
- ret = pipe(pool->sig_pipe);
- if (ret == -1) {
- result = -1;
- goto done;
- }
-
- result = pool->sig_pipe[0];
-done:
- ret = pthread_mutex_unlock(&pool->mutex);
- assert(ret == 0);
- return result;
-}
-
-/*
- * Do a pthread_join() on all children that have exited, pool->mutex must be
- * locked
- */
-static void pthreadpool_join_children(struct pthreadpool *pool)
-{
- int i;
-
- for (i=0; i<pool->num_exited; i++) {
- pthread_join(pool->exited[i], NULL);
- }
- pool->num_exited = 0;
-}
-
-/*
- * Fetch a finished job number from the signal pipe
- */
-
-int pthreadpool_finished_job(struct pthreadpool *pool)
-{
- int result, ret, fd;
- ssize_t nread;
-
- ret = pthread_mutex_lock(&pool->mutex);
- if (ret != 0) {
- errno = ret;
- return -1;
- }
-
- /*
- * Just some cleanup under the mutex
- */
- pthreadpool_join_children(pool);
-
- fd = pool->sig_pipe[0];
-
- ret = pthread_mutex_unlock(&pool->mutex);
- assert(ret == 0);
-
- if (fd == -1) {
- errno = EINVAL;
- return -1;
- }
-
- nread = -1;
- errno = EINTR;
-
- while ((nread == -1) && (errno == EINTR)) {
- nread = read(fd, &result, sizeof(int));
- }
-
- /*
- * TODO: handle nread > 0 && nread < sizeof(int)
- */
-
- /*
- * Lock the mutex to provide a memory barrier for data from the worker
- * thread to the main thread. The pipe access itself does not have to
- * be locked, for sizeof(int) the write to a pipe is atomic, and only
- * one thread reads from it. But we need to lock the mutex briefly
- * even if we don't do anything under the lock, to make sure we can
- * see all memory the helper thread has written.
- */
-
- ret = pthread_mutex_lock(&pool->mutex);
- if (ret == -1) {
- errno = ret;
- return -1;
- }
-
- ret = pthread_mutex_unlock(&pool->mutex);
- assert(ret == 0);
-
- return result;
-}
-
-/*
- * Destroy a thread pool, finishing all threads working for it
- */
-
-int pthreadpool_destroy(struct pthreadpool *pool)
-{
- int ret, ret1;
-
- ret = pthread_mutex_lock(&pool->mutex);
- if (ret != 0) {
- return ret;
- }
-
- if (pool->num_threads > 0) {
- /*
- * We have active threads, tell them to finish, wait for that.
- */
-
- pool->shutdown = 1;
-
- if (pool->num_idle > 0) {
- /*
- * Wake the idle threads. They will find pool->quit to
- * be set and exit themselves
- */
- ret = pthread_cond_broadcast(&pool->condvar);
- if (ret != 0) {
- pthread_mutex_unlock(&pool->mutex);
- return ret;
- }
- }
-
- while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
-
- if (pool->num_exited > 0) {
- pthreadpool_join_children(pool);
- continue;
- }
- /*
- * A thread that shuts down will also signal
- * pool->condvar
- */
- ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
- if (ret != 0) {
- pthread_mutex_unlock(&pool->mutex);
- return ret;
- }
- }
- }
-
- ret = pthread_mutex_unlock(&pool->mutex);
- if (ret != 0) {
- return ret;
- }
- ret = pthread_mutex_destroy(&pool->mutex);
- ret1 = pthread_cond_destroy(&pool->condvar);
-
- if ((ret == 0) && (ret1 == 0)) {
- free(pool);
- }
-
- if (ret != 0) {
- return ret;
- }
- return ret1;
-}
-
-/*
- * Prepare for pthread_exit(), pool->mutex must be locked
- */
-static void pthreadpool_server_exit(struct pthreadpool *pool)
-{
- pool->num_threads -= 1;
- pool->exited[pool->num_exited] = pthread_self();
- pool->num_exited += 1;
-}
-
-static void *pthreadpool_server(void *arg)
-{
- struct pthreadpool *pool = (struct pthreadpool *)arg;
- int res;
-
- res = pthread_mutex_lock(&pool->mutex);
- if (res != 0) {
- return NULL;
- }
-
- while (1) {
- struct timespec timeout;
- struct pthreadpool_job *job;
-
- /*
- * idle-wait at most 1 second. If nothing happens in that
- * time, exit this thread.
- */
-
- timeout.tv_sec = time(NULL) + 1;
- timeout.tv_nsec = 0;
-
- while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
-
- pool->num_idle += 1;
- res = pthread_cond_timedwait(
- &pool->condvar, &pool->mutex, &timeout);
- pool->num_idle -= 1;
-
- if (res == ETIMEDOUT) {
-
- if (pool->jobs == NULL) {
- /*
- * we timed out and still no work for
- * us. Exit.
- */
- pthreadpool_server_exit(pool);
- pthread_mutex_unlock(&pool->mutex);
- return NULL;
- }
-
- break;
- }
- assert(res == 0);
- }
-
- job = pool->jobs;
-
- if (job != NULL) {
- int fd = pool->sig_pipe[1];
- ssize_t written;
-
- /*
- * Ok, there's work for us to do, remove the job from
- * the pthreadpool list
- */
- pool->jobs = job->next;
- if (pool->last_job == job) {
- pool->last_job = NULL;
- }
-
- /*
- * Do the work with the mutex unlocked :-)
- */
-
- res = pthread_mutex_unlock(&pool->mutex);
- assert(res == 0);
-
- job->fn(job->private_data);
-
- written = sizeof(int);
-
- res = pthread_mutex_lock(&pool->mutex);
- assert(res == 0);
-
- if (fd != -1) {
- written = write(fd, &job->id, sizeof(int));
- }
-
- free(job);
-
- if (written != sizeof(int)) {
- pthreadpool_server_exit(pool);
- pthread_mutex_unlock(&pool->mutex);
- return NULL;
- }
- }
-
- if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
- /*
- * No more work to do and we're asked to shut down, so
- * exit
- */
- pthreadpool_server_exit(pool);
-
- if (pool->num_threads == 0) {
- /*
- * Ping the main thread waiting for all of us
- * workers to have quit.
- */
- pthread_cond_broadcast(&pool->condvar);
- }
-
- pthread_mutex_unlock(&pool->mutex);
- return NULL;
- }
- }
-}
-
-int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
- void (*fn)(void *private_data), void *private_data)
-{
- struct pthreadpool_job *job;
- pthread_t thread_id;
- int res;
- sigset_t mask, omask;
-
- job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
- if (job == NULL) {
- return ENOMEM;
- }
-
- job->fn = fn;
- job->private_data = private_data;
- job->id = job_id;
- job->next = NULL;
-
- res = pthread_mutex_lock(&pool->mutex);
- if (res != 0) {
- free(job);
- return res;
- }
-
- /*
- * Just some cleanup under the mutex
- */
- pthreadpool_join_children(pool);
-
- /*
- * Add job to the end of the queue
- */
- if (pool->jobs == NULL) {
- pool->jobs = job;
- }
- else {
- pool->last_job->next = job;
- }
- pool->last_job = job;
-
- if (pool->num_idle > 0) {
- /*
- * We have idle threads, wake one.
- */
- res = pthread_cond_signal(&pool->condvar);
- pthread_mutex_unlock(&pool->mutex);
- return res;
- }
-
- if (pool->num_threads >= pool->max_threads) {
- /*
- * No more new threads, we just queue the request
- */
- pthread_mutex_unlock(&pool->mutex);
- return 0;
- }
-
- /*
- * Create a new worker thread. It should not receive any signals.
- */
-
- sigfillset(&mask);
-
- res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
- if (res != 0) {
- pthread_mutex_unlock(&pool->mutex);
- return res;
- }
-
- res = pthread_create(&thread_id, NULL, pthreadpool_server,
- (void *)pool);
- if (res == 0) {
- pool->num_threads += 1;
- }
-
- assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
-
- pthread_mutex_unlock(&pool->mutex);
- return res;
-}