diff options
-rw-r--r-- | source3/configure.in | 2 | ||||
-rw-r--r-- | source3/lib/fncall.c | 2 | ||||
-rw-r--r-- | source3/lib/pthreadpool/Makefile | 9 | ||||
-rw-r--r-- | source3/lib/pthreadpool/pthreadpool.c (renamed from source3/lib/pthreadpool.c) | 259 | ||||
-rw-r--r-- | source3/lib/pthreadpool/pthreadpool.h (renamed from source3/include/pthreadpool.h) | 0 | ||||
-rw-r--r-- | source3/lib/pthreadpool/tests.c | 362 |
6 files changed, 546 insertions, 88 deletions
diff --git a/source3/configure.in b/source3/configure.in index b2c1856bec..b4ac2114d4 100644 --- a/source3/configure.in +++ b/source3/configure.in @@ -6647,7 +6647,7 @@ if test x"$enable_pthreadpool" = x"yes" -a x"$samba_cv_HAVE_PTHREAD" = x"yes"; t LIBS="$LIBS $PTHREAD_LDFLAGS" CFLAGS="$CFLAGS $PTHREAD_CFLAGS" AC_DEFINE(WITH_PTHREADPOOL, 1, [Whether to include pthreadpool helpers]) - AC_SUBST(PTHREADPOOL_OBJ, "lib/pthreadpool.o") + AC_SUBST(PTHREADPOOL_OBJ, "lib/pthreadpool/pthreadpool.o") fi ################################################# diff --git a/source3/lib/fncall.c b/source3/lib/fncall.c index e810b6814e..bd06be2cd8 100644 --- a/source3/lib/fncall.c +++ b/source3/lib/fncall.c @@ -21,7 +21,7 @@ #if WITH_PTHREADPOOL -#include "pthreadpool.h" +#include "lib/pthreadpool/pthreadpool.h" struct fncall_state { struct fncall_context *ctx; diff --git a/source3/lib/pthreadpool/Makefile b/source3/lib/pthreadpool/Makefile new file mode 100644 index 0000000000..48626bd2c0 --- /dev/null +++ b/source3/lib/pthreadpool/Makefile @@ -0,0 +1,9 @@ +all: tests + +CFLAGS=-O3 -g -Wall + +pthreadpool.o: pthreadpool.c pthreadpool.h + gcc -c -O3 -o pthreadpool.o pthreadpool.c -I../../.. + +tests: tests.o pthreadpool.o + gcc -o tests tests.o pthreadpool.o -lpthread
\ No newline at end of file diff --git a/source3/lib/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c index b62bab0a2e..4605538cf2 100644 --- a/source3/lib/pthreadpool.c +++ b/source3/lib/pthreadpool/pthreadpool.c @@ -26,8 +26,10 @@ #include <signal.h> #include <assert.h> #include <fcntl.h> +#include <sys/time.h> #include "pthreadpool.h" +#include "lib/util/dlinklist.h" struct pthreadpool_job { struct pthreadpool_job *next; @@ -38,6 +40,11 @@ struct pthreadpool_job { struct pthreadpool { /* + * List pthreadpools for fork safety + */ + struct pthreadpool *prev, *next; + + /* * Control access to this struct */ pthread_mutex_t mutex; @@ -85,6 +92,12 @@ struct pthreadpool { pthread_t exited[1]; /* We alloc more */ }; +static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER; +static struct pthreadpool *pthreadpools = NULL; +static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT; + +static void pthreadpool_prep_atfork(void); + /* * Initialize a thread pool */ @@ -95,13 +108,21 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult) size_t size; int ret; - size = sizeof(struct pthreadpool) + max_threads * sizeof(pthread_t); + size = sizeof(struct pthreadpool) + + (max_threads-1) * sizeof(pthread_t); pool = (struct pthreadpool *)malloc(size); if (pool == NULL) { return ENOMEM; } + ret = pipe(pool->sig_pipe); + if (ret == -1) { + int err = errno; + free(pool); + return err; + } + ret = pthread_mutex_init(&pool->mutex, NULL); if (ret != 0) { free(pool); @@ -121,44 +142,117 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult) pool->num_exited = 0; pool->max_threads = max_threads; pool->num_idle = 0; - pool->sig_pipe[0] = -1; - pool->sig_pipe[1] = -1; + + ret = pthread_mutex_lock(&pthreadpools_mutex); + if (ret != 0) { + pthread_cond_destroy(&pool->condvar); + pthread_mutex_destroy(&pool->mutex); + free(pool); + return ret; + } + DLIST_ADD(pthreadpools, pool); + + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); + + pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork); *presult = pool; + return 0; } -/* - * Create and return a file descriptor which becomes readable when a job has - * finished - */ - -int pthreadpool_sig_fd(struct pthreadpool *pool) +static void pthreadpool_prepare(void) { - int result, ret; + int ret; + struct pthreadpool *pool; - ret = pthread_mutex_lock(&pool->mutex); - if (ret != 0) { - errno = ret; - return -1; + ret = pthread_mutex_lock(&pthreadpools_mutex); + assert(ret == 0); + + pool = pthreadpools; + + while (pool != NULL) { + ret = pthread_mutex_lock(&pool->mutex); + assert(ret == 0); + pool = pool->next; } +} + +static void pthreadpool_parent(void) +{ + int ret; + struct pthreadpool *pool; + + pool = DLIST_TAIL(pthreadpools); + + while (1) { + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); - if (pool->sig_pipe[0] != -1) { - result = pool->sig_pipe[0]; - goto done; + if (pool == pthreadpools) { + break; + } + pool = pool->prev; } - ret = pipe(pool->sig_pipe); - if (ret == -1) { - result = -1; - goto done; + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); +} + +static void pthreadpool_child(void) +{ + int ret; + struct pthreadpool *pool; + + pool = DLIST_TAIL(pthreadpools); + + while (1) { + close(pool->sig_pipe[0]); + close(pool->sig_pipe[1]); + + ret = pipe(pool->sig_pipe); + assert(ret == 0); + + pool->num_threads = 0; + pool->num_exited = 0; + pool->num_idle = 0; + + while (pool->jobs != NULL) { + struct pthreadpool_job *job; + job = pool->jobs; + pool->jobs = job->next; + free(job); + } + pool->last_job = NULL; + + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); + + if (pool == pthreadpools) { + break; + } + pool = pool->prev; } - result = pool->sig_pipe[0]; -done: - ret = pthread_mutex_unlock(&pool->mutex); + ret = pthread_mutex_unlock(&pthreadpools_mutex); assert(ret == 0); - return result; +} + +static void pthreadpool_prep_atfork(void) +{ + pthread_atfork(pthreadpool_prepare, pthreadpool_parent, + pthreadpool_child); +} + +/* + * Return the file descriptor which becomes readable when a job has + * finished + */ + +int pthreadpool_sig_fd(struct pthreadpool *pool) +{ + return pool->sig_pipe[0]; } /* @@ -181,59 +275,21 @@ static void pthreadpool_join_children(struct pthreadpool *pool) int pthreadpool_finished_job(struct pthreadpool *pool) { - int result, ret, fd; + int result; ssize_t nread; - ret = pthread_mutex_lock(&pool->mutex); - if (ret != 0) { - errno = ret; - return -1; - } - - /* - * Just some cleanup under the mutex - */ - pthreadpool_join_children(pool); - - fd = pool->sig_pipe[0]; - - ret = pthread_mutex_unlock(&pool->mutex); - assert(ret == 0); - - if (fd == -1) { - errno = EINVAL; - return -1; - } - nread = -1; errno = EINTR; while ((nread == -1) && (errno == EINTR)) { - nread = read(fd, &result, sizeof(int)); + nread = read(pool->sig_pipe[0], &result, sizeof(int)); } - - /* - * TODO: handle nread > 0 && nread < sizeof(int) - */ - - /* - * Lock the mutex to provide a memory barrier for data from the worker - * thread to the main thread. The pipe access itself does not have to - * be locked, for sizeof(int) the write to a pipe is atomic, and only - * one thread reads from it. But we need to lock the mutex briefly - * even if we don't do anything under the lock, to make sure we can - * see all memory the helper thread has written. - */ - - ret = pthread_mutex_lock(&pool->mutex); - if (ret == -1) { - errno = ret; - return -1; + if (nread == -1) { + return errno; + } + if (nread != sizeof(int)) { + return EINVAL; } - - ret = pthread_mutex_unlock(&pool->mutex); - assert(ret == 0); - return result; } @@ -250,6 +306,12 @@ int pthreadpool_destroy(struct pthreadpool *pool) return ret; } + if ((pool->jobs != NULL) || pool->shutdown) { + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); + return EBUSY; + } + if (pool->num_threads > 0) { /* * We have active threads, tell them to finish, wait for that. @@ -294,14 +356,30 @@ int pthreadpool_destroy(struct pthreadpool *pool) ret = pthread_mutex_destroy(&pool->mutex); ret1 = pthread_cond_destroy(&pool->condvar); - if ((ret == 0) && (ret1 == 0)) { - free(pool); + if (ret != 0) { + return ret; + } + if (ret1 != 0) { + return ret1; } + ret = pthread_mutex_lock(&pthreadpools_mutex); if (ret != 0) { return ret; } - return ret1; + DLIST_REMOVE(pthreadpools, pool); + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); + + close(pool->sig_pipe[0]); + pool->sig_pipe[0] = -1; + + close(pool->sig_pipe[1]); + pool->sig_pipe[1] = -1; + + free(pool); + + return 0; } /* @@ -325,7 +403,8 @@ static void *pthreadpool_server(void *arg) } while (1) { - struct timespec timeout; + struct timeval tv; + struct timespec ts; struct pthreadpool_job *job; /* @@ -333,14 +412,15 @@ static void *pthreadpool_server(void *arg) * time, exit this thread. */ - timeout.tv_sec = time(NULL) + 1; - timeout.tv_nsec = 0; + gettimeofday(&tv, NULL); + ts.tv_sec = tv.tv_sec + 1; + ts.tv_nsec = tv.tv_usec*1000; while ((pool->jobs == NULL) && (pool->shutdown == 0)) { pool->num_idle += 1; res = pthread_cond_timedwait( - &pool->condvar, &pool->mutex, &timeout); + &pool->condvar, &pool->mutex, &ts); pool->num_idle -= 1; if (res == ETIMEDOUT) { @@ -363,7 +443,6 @@ static void *pthreadpool_server(void *arg) job = pool->jobs; if (job != NULL) { - int fd = pool->sig_pipe[1]; ssize_t written; /* @@ -376,7 +455,7 @@ static void *pthreadpool_server(void *arg) } /* - * Do the work with the mutex unlocked :-) + * Do the work with the mutex unlocked */ res = pthread_mutex_unlock(&pool->mutex); @@ -384,17 +463,14 @@ static void *pthreadpool_server(void *arg) job->fn(job->private_data); - written = sizeof(int); + written = write(pool->sig_pipe[1], &job->id, + sizeof(int)); + + free(job); res = pthread_mutex_lock(&pool->mutex); assert(res == 0); - if (fd != -1) { - written = write(fd, &job->id, sizeof(int)); - } - - free(job); - if (written != sizeof(int)) { pthreadpool_server_exit(pool); pthread_mutex_unlock(&pool->mutex); @@ -447,6 +523,17 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, return res; } + if (pool->shutdown) { + /* + * Protect against the pool being shut down while + * trying to add a job + */ + res = pthread_mutex_unlock(&pool->mutex); + assert(res == 0); + free(job); + return EINVAL; + } + /* * Just some cleanup under the mutex */ diff --git a/source3/include/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h index 7ef7ddf419..7ef7ddf419 100644 --- a/source3/include/pthreadpool.h +++ b/source3/lib/pthreadpool/pthreadpool.h diff --git a/source3/lib/pthreadpool/tests.c b/source3/lib/pthreadpool/tests.c new file mode 100644 index 0000000000..d365fbd5b6 --- /dev/null +++ b/source3/lib/pthreadpool/tests.c @@ -0,0 +1,362 @@ +#include <stdio.h> +#include <string.h> +#include <poll.h> +#include <errno.h> +#include <stdlib.h> +#include <pthread.h> +#include <unistd.h> +#include "pthreadpool.h" + +static int test_init(void) +{ + struct pthreadpool *p; + int ret; + + ret = pthreadpool_init(1, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_init failed: %s\n", + strerror(ret)); + return -1; + } + ret = pthreadpool_destroy(p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_init failed: %s\n", + strerror(ret)); + return -1; + } + return 0; +} + +static void test_sleep(void *ptr) +{ + int *ptimeout = (int *)ptr; + int ret; + ret = poll(NULL, 0, *ptimeout); + if (ret != 0) { + fprintf(stderr, "poll returned %d (%s)\n", + ret, strerror(errno)); + } +} + +static int test_jobs(int num_threads, int num_jobs) +{ + char *finished; + struct pthreadpool *p; + int timeout = 1; + int i, ret; + + finished = (char *)calloc(1, num_jobs); + if (finished == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + ret = pthreadpool_init(num_threads, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_init failed: %s\n", + strerror(ret)); + return -1; + } + + for (i=0; i<num_jobs; i++) { + ret = pthreadpool_add_job(p, i, test_sleep, &timeout); + if (ret != 0) { + fprintf(stderr, "pthreadpool_add_job failed: %s\n", + strerror(ret)); + return -1; + } + } + + for (i=0; i<num_jobs; i++) { + ret = pthreadpool_finished_job(p); + if ((ret < 0) || (ret >= num_jobs)) { + fprintf(stderr, "invalid job number %d\n", ret); + return -1; + } + finished[ret] += 1; + } + + for (i=0; i<num_jobs; i++) { + if (finished[i] != 1) { + fprintf(stderr, "finished[%d] = %d\n", + i, finished[i]); + return -1; + } + } + + ret = pthreadpool_destroy(p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_destroy failed: %s\n", + strerror(ret)); + return -1; + } + + free(finished); + return 0; +} + +static int test_busydestroy(void) +{ + struct pthreadpool *p; + int timeout = 50; + struct pollfd pfd; + int ret; + + ret = pthreadpool_init(1, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_init failed: %s\n", + strerror(ret)); + return -1; + } + ret = pthreadpool_add_job(p, 1, test_sleep, &timeout); + if (ret != 0) { + fprintf(stderr, "pthreadpool_add_job failed: %s\n", + strerror(ret)); + return -1; + } + ret = pthreadpool_destroy(p); + if (ret != EBUSY) { + fprintf(stderr, "Could destroy a busy pool\n"); + return -1; + } + + pfd.fd = pthreadpool_sig_fd(p); + pfd.events = POLLIN|POLLERR; + + poll(&pfd, 1, -1); + + ret = pthreadpool_destroy(p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_destroy failed: %s\n", + strerror(ret)); + return -1; + } + return 0; +} + +struct threaded_state { + pthread_t tid; + struct pthreadpool *p; + int start_job; + int num_jobs; + int timeout; +}; + +static void *test_threaded_worker(void *p) +{ + struct threaded_state *state = (struct threaded_state *)p; + int i; + + for (i=0; i<state->num_jobs; i++) { + int ret = pthreadpool_add_job(state->p, state->start_job + i, + test_sleep, &state->timeout); + if (ret != 0) { + fprintf(stderr, "pthreadpool_add_job failed: %s\n", + strerror(ret)); + return NULL; + } + } + return NULL; +} + +static int test_threaded_addjob(int num_pools, int num_threads, int poolsize, + int num_jobs) +{ + struct pthreadpool **pools; + struct threaded_state *states; + struct threaded_state *state; + struct pollfd *pfds; + char *finished; + pid_t child; + int i, ret, poolnum; + int received; + + states = calloc(num_threads, sizeof(struct threaded_state)); + if (states == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + finished = calloc(num_threads * num_jobs, 1); + if (finished == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + pools = calloc(num_pools, sizeof(struct pthreadpool *)); + if (pools == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + pfds = calloc(num_pools, sizeof(struct pollfd)); + if (pfds == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + for (i=0; i<num_pools; i++) { + ret = pthreadpool_init(poolsize, &pools[i]); + if (ret != 0) { + fprintf(stderr, "pthreadpool_init failed: %s\n", + strerror(ret)); + return -1; + } + pfds[i].fd = pthreadpool_sig_fd(pools[i]); + pfds[i].events = POLLIN|POLLHUP; + } + + poolnum = 0; + + for (i=0; i<num_threads; i++) { + state = &states[i]; + + state->p = pools[poolnum]; + poolnum = (poolnum + 1) % num_pools; + + state->num_jobs = num_jobs; + state->timeout = 1; + state->start_job = i * num_jobs; + + ret = pthread_create(&state->tid, NULL, test_threaded_worker, + state); + if (ret != 0) { + fprintf(stderr, "pthread_create failed: %s\n", + strerror(ret)); + return -1; + } + } + + if (random() % 1) { + poll(NULL, 0, 1); + } + + child = fork(); + if (child < 0) { + fprintf(stderr, "fork failed: %s\n", strerror(errno)); + return -1; + } + if (child == 0) { + for (i=0; i<num_pools; i++) { + ret = pthreadpool_destroy(pools[i]); + if (ret != 0) { + fprintf(stderr, "pthreadpool_destroy failed: " + "%s\n", strerror(ret)); + exit(1); + } + } + /* child */ + exit(0); + } + + for (i=0; i<num_threads; i++) { + ret = pthread_join(states[i].tid, NULL); + if (ret != 0) { + fprintf(stderr, "pthread_join(%d) failed: %s\n", + i, strerror(ret)); + return -1; + } + } + + received = 0; + + while (received < num_threads*num_jobs) { + int j; + + ret = poll(pfds, num_pools, 1000); + if (ret == -1) { + fprintf(stderr, "poll failed: %s\n", + strerror(errno)); + return -1; + } + if (ret == 0) { + fprintf(stderr, "\npoll timed out\n"); + break; + } + + for (j=0; j<num_pools; j++) { + + if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) { + continue; + } + + ret = pthreadpool_finished_job(pools[j]); + if ((ret < 0) || (ret >= num_jobs * num_threads)) { + fprintf(stderr, "invalid job number %d\n", + ret); + return -1; + } + finished[ret] += 1; + received += 1; + } + } + + for (i=0; i<num_threads*num_jobs; i++) { + if (finished[i] != 1) { + fprintf(stderr, "finished[%d] = %d\n", + i, finished[i]); + return -1; + } + } + + for (i=0; i<num_pools; i++) { + ret = pthreadpool_destroy(pools[i]); + if (ret != 0) { + fprintf(stderr, "pthreadpool_destroy failed: %s\n", + strerror(ret)); + return -1; + } + } + + free(pfds); + free(pools); + free(states); + free(finished); + + return 0; +} + +int main(void) +{ + int ret; + + ret = test_init(); + if (ret != 0) { + fprintf(stderr, "test_init failed\n"); + return 1; + } + + ret = test_jobs(10, 10000); + if (ret != 0) { + fprintf(stderr, "test_jobs failed\n"); + return 1; + } + + ret = test_busydestroy(); + if (ret != 0) { + fprintf(stderr, "test_busydestroy failed\n"); + return 1; + } + + /* + * Test 10 threads adding jobs on a single pool + */ + ret = test_threaded_addjob(1, 10, 5, 5000); + if (ret != 0) { + fprintf(stderr, "test_jobs failed\n"); + return 1; + } + + /* + * Test 10 threads on 3 pools to verify our fork handling + * works right. + */ + ret = test_threaded_addjob(3, 10, 5, 5000); + if (ret != 0) { + fprintf(stderr, "test_jobs failed\n"); + return 1; + } + + printf("success\n"); + return 0; +} |