diff ezBAMQC/src/htslib/cram/thread_pool.c @ 0:dfa3745e5fd8

Uploaded
author youngkim
date Thu, 24 Mar 2016 17:12:52 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/ezBAMQC/src/htslib/cram/thread_pool.c	Thu Mar 24 17:12:52 2016 -0400
@@ -0,0 +1,811 @@
+/*
+Copyright (c) 2013 Genome Research Ltd.
+Author: James Bonfield <jkb@sanger.ac.uk>
+
+Redistribution and use in source and binary forms, with or without 
+modification, are permitted provided that the following conditions are met:
+
+   1. Redistributions of source code must retain the above copyright notice, 
+this list of conditions and the following disclaimer.
+
+   2. Redistributions in binary form must reproduce the above copyright notice, 
+this list of conditions and the following disclaimer in the documentation 
+and/or other materials provided with the distribution.
+
+   3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
+Institute nor the names of its contributors may be used to endorse or promote
+products derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND 
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
+DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#include <stdlib.h>
+
+#include <signal.h>
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/time.h>
+#include <assert.h>
+
+#include "cram/thread_pool.h"
+
+//#define DEBUG
+//#define DEBUG_TIME
+
+#define IN_ORDER
+
+#ifdef DEBUG
+static int worker_id(t_pool *p) {
+    int i;
+    pthread_t s = pthread_self();
+    for (i = 0; i < p->tsize; i++) {
+	if (pthread_equal(s, p->t[i].tid))
+	    return i;
+    }
+    return -1;
+}
+#endif
+
+/* ----------------------------------------------------------------------------
+ * A queue to hold results from the thread pool.
+ *
+ * Each thread pool may have jobs of multiple types being queued up and
+ * interleaved, so we allow several results queue per pool.
+ *
+ * The jobs themselves are expected to push their results onto their
+ * appropriate results queue.
+ */
+
+/*
+ * Adds a result to the end of the result queue.
+ *
+ * Returns 0 on success;
+ *        -1 on failure
+ */
+static int t_pool_add_result(t_pool_job *j, void *data) {
+    t_results_queue *q = j->q;
+    t_pool_result *r;
+
+#ifdef DEBUG
+    fprintf(stderr, "%d: Adding resulting to queue %p, serial %d\n",
+	    worker_id(j->p), q, j->serial);
+#endif
+
+    /* No results queue is fine if we don't want any results back */
+    if (!q)
+	return 0;
+
+    if (!(r = malloc(sizeof(*r))))
+	return -1;
+
+    r->next = NULL;
+    r->data = data;
+    r->serial = j->serial;
+
+    pthread_mutex_lock(&q->result_m);
+    if (q->result_tail) {
+	q->result_tail->next = r;
+	q->result_tail = r;
+    } else {
+	q->result_head = q->result_tail = r;
+    }
+    q->queue_len++;
+    q->pending--;
+
+#ifdef DEBUG
+    fprintf(stderr, "%d: Broadcasting result_avail (id %d)\n",
+	    worker_id(j->p), r->serial);
+#endif
+    pthread_cond_signal(&q->result_avail_c);
+#ifdef DEBUG
+    fprintf(stderr, "%d: Broadcast complete\n", worker_id(j->p));
+#endif
+
+    pthread_mutex_unlock(&q->result_m);
+
+    return 0;
+}
+
+/* Core of t_pool_next_result() */
+static t_pool_result *t_pool_next_result_locked(t_results_queue *q) {
+    t_pool_result *r, *last;
+
+    for (last = NULL, r = q->result_head; r; last = r, r = r->next) {
+	if (r->serial == q->next_serial)
+	    break;
+    }
+
+    if (r) {
+	if (q->result_head == r)
+	    q->result_head = r->next;
+	else
+	    last->next = r->next;
+
+	if (q->result_tail == r)
+	    q->result_tail = last;
+
+	if (!q->result_head)
+	    q->result_tail = NULL;
+
+	q->next_serial++;
+	q->queue_len--;
+    }
+
+    return r;
+}
+
+/*
+ * Pulls a result off the head of the result queue. Caller should
+ * free it (and any internals as appropriate) after use. This doesn't
+ * wait for a result to be present.
+ *
+ * Results will be returned in strict order.
+ * 
+ * Returns t_pool_result pointer if a result is ready.
+ *         NULL if not.
+ */
+t_pool_result *t_pool_next_result(t_results_queue *q) {
+    t_pool_result *r;
+
+#ifdef DEBUG
+    fprintf(stderr, "Requesting next result on queue %p\n", q);
+#endif
+
+    pthread_mutex_lock(&q->result_m);
+    r = t_pool_next_result_locked(q);
+    pthread_mutex_unlock(&q->result_m);
+
+#ifdef DEBUG
+    fprintf(stderr, "(q=%p) Found %p\n", q, r);
+#endif
+
+    return r;
+}
+
+t_pool_result *t_pool_next_result_wait(t_results_queue *q) {
+    t_pool_result *r;
+
+#ifdef DEBUG
+    fprintf(stderr, "Waiting for result %d...\n", q->next_serial);
+#endif
+
+    pthread_mutex_lock(&q->result_m);
+    while (!(r = t_pool_next_result_locked(q))) {
+	/* Possible race here now avoided via _locked() call, but incase... */
+	struct timeval now;
+	struct timespec timeout;
+
+	gettimeofday(&now, NULL);
+	timeout.tv_sec = now.tv_sec + 10;
+	timeout.tv_nsec = now.tv_usec * 1000;
+
+	pthread_cond_timedwait(&q->result_avail_c, &q->result_m, &timeout);
+    }
+    pthread_mutex_unlock(&q->result_m);
+
+    return r;
+}
+
+/*
+ * Returns true if there are no items on the finished results queue and
+ * also none still pending.
+ */
+int t_pool_results_queue_empty(t_results_queue *q) {
+    int empty;
+
+    pthread_mutex_lock(&q->result_m);
+    empty = q->queue_len == 0 && q->pending == 0;
+    pthread_mutex_unlock(&q->result_m);
+
+    return empty;
+}
+
+
+/*
+ * Returns the number of completed jobs on the results queue.
+ */
+int t_pool_results_queue_len(t_results_queue *q) {
+    int len;
+
+    pthread_mutex_lock(&q->result_m);
+    len = q->queue_len;
+    pthread_mutex_unlock(&q->result_m);
+
+    return len;
+}
+
+int t_pool_results_queue_sz(t_results_queue *q) {
+    int len;
+
+    pthread_mutex_lock(&q->result_m);
+    len = q->queue_len + q->pending;
+    pthread_mutex_unlock(&q->result_m);
+
+    return len;
+}
+
+/*
+ * Frees a result 'r' and if free_data is true also frees
+ * the internal r->data result too.
+ */
+void t_pool_delete_result(t_pool_result *r, int free_data) {
+    if (!r)
+	return;
+
+    if (free_data && r->data)
+	free(r->data);
+
+    free(r);
+}
+
+/*
+ * Initialises a results queue.
+ *
+ * Results queue pointer on success;
+ *         NULL on failure
+ */
+t_results_queue *t_results_queue_init(void) {
+    t_results_queue *q = malloc(sizeof(*q));
+
+    pthread_mutex_init(&q->result_m, NULL);
+    pthread_cond_init(&q->result_avail_c, NULL);
+
+    q->result_head = NULL;
+    q->result_tail = NULL;
+    q->next_serial = 0;
+    q->curr_serial = 0;
+    q->queue_len   = 0;
+    q->pending     = 0;
+
+    return q;
+}
+
+/* Deallocates memory for a results queue */
+void t_results_queue_destroy(t_results_queue *q) {
+#ifdef DEBUG
+    fprintf(stderr, "Destroying results queue %p\n", q);
+#endif
+
+    if (!q)
+	return;
+
+    pthread_mutex_destroy(&q->result_m);
+    pthread_cond_destroy(&q->result_avail_c);
+
+    memset(q, 0xbb, sizeof(*q));
+    free(q);
+
+#ifdef DEBUG
+    fprintf(stderr, "Destroyed results queue %p\n", q);
+#endif
+}
+
+/* ----------------------------------------------------------------------------
+ * The thread pool.
+ */
+
+#define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
+
+/*
+ * A worker thread.
+ *
+ * Each thread waits for the pool to be non-empty.
+ * As soon as this applies, one of them succeeds in getting the lock
+ * and then executes the job.
+ */
+static void *t_pool_worker(void *arg) {
+    t_pool_worker_t *w = (t_pool_worker_t *)arg;
+    t_pool *p = w->p;
+    t_pool_job *j;
+#ifdef DEBUG_TIME
+    struct timeval t1, t2, t3;
+#endif
+
+    for (;;) {
+	// Pop an item off the pool queue
+#ifdef DEBUG_TIME
+	gettimeofday(&t1, NULL);
+#endif
+
+	pthread_mutex_lock(&p->pool_m);
+
+#ifdef DEBUG_TIME
+	gettimeofday(&t2, NULL);
+	p->wait_time += TDIFF(t2,t1);
+	w->wait_time += TDIFF(t2,t1);
+#endif
+
+	// If there is something on the job list and a higher priority
+	// thread waiting, let it handle this instead.
+//	while (p->head && p->t_stack_top != -1 && p->t_stack_top < w->idx) {
+//	    pthread_mutex_unlock(&p->pool_m);
+//	    pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
+//	    pthread_mutex_lock(&p->pool_m);
+//	}
+
+	while (!p->head && !p->shutdown) {
+	    p->nwaiting++;
+
+	    if (p->njobs == 0)
+		pthread_cond_signal(&p->empty_c);
+#ifdef DEBUG_TIME
+	    gettimeofday(&t2, NULL);
+#endif
+
+#ifdef IN_ORDER
+	    // Push this thread to the top of the waiting stack
+	    if (p->t_stack_top == -1 || p->t_stack_top > w->idx)
+		p->t_stack_top = w->idx;
+
+	    p->t_stack[w->idx] = 1;
+	    pthread_cond_wait(&w->pending_c, &p->pool_m);
+	    p->t_stack[w->idx] = 0;
+
+	    /* Find new t_stack_top */
+	    {
+		int i;
+		p->t_stack_top = -1;
+		for (i = 0; i < p->tsize; i++) {
+		    if (p->t_stack[i]) {
+			p->t_stack_top = i;
+			break;
+		    }
+		}
+	    }
+#else
+	    pthread_cond_wait(&p->pending_c, &p->pool_m);
+#endif
+
+#ifdef DEBUG_TIME
+	    gettimeofday(&t3, NULL);
+	    p->wait_time += TDIFF(t3,t2);
+	    w->wait_time += TDIFF(t3,t2);
+#endif
+	    p->nwaiting--;
+	}
+
+	if (p->shutdown) {
+#ifdef DEBUG_TIME
+	    p->total_time += TDIFF(t3,t1);
+#endif
+#ifdef DEBUG
+	    fprintf(stderr, "%d: Shutting down\n", worker_id(p));
+#endif
+	    pthread_mutex_unlock(&p->pool_m);
+	    pthread_exit(NULL);
+	}
+
+	j = p->head;
+	if (!(p->head = j->next))
+	    p->tail = NULL;
+
+	if (p->njobs-- >= p->qsize)
+	    pthread_cond_signal(&p->full_c);
+
+	if (p->njobs == 0)
+	    pthread_cond_signal(&p->empty_c);
+
+	pthread_mutex_unlock(&p->pool_m);
+	    
+	// We have job 'j' - now execute it.
+	t_pool_add_result(j, j->func(j->arg));	
+#ifdef DEBUG_TIME
+	pthread_mutex_lock(&p->pool_m);
+	gettimeofday(&t3, NULL);
+	p->total_time += TDIFF(t3,t1);
+	pthread_mutex_unlock(&p->pool_m);
+#endif
+	memset(j, 0xbb, sizeof(*j));
+	free(j);
+    }
+
+    return NULL;
+}
+
+/*
+ * Creates a worker pool of length qsize with tsize worker threads.
+ *
+ * Returns pool pointer on success;
+ *         NULL on failure
+ */
+t_pool *t_pool_init(int qsize, int tsize) {
+    int i;
+    t_pool *p = malloc(sizeof(*p));
+    p->qsize = qsize;
+    p->tsize = tsize;
+    p->njobs = 0;
+    p->nwaiting = 0;
+    p->shutdown = 0;
+    p->head = p->tail = NULL;
+    p->t_stack = NULL;
+#ifdef DEBUG_TIME
+    p->total_time = p->wait_time = 0;
+#endif
+
+    p->t = malloc(tsize * sizeof(p->t[0]));
+
+    pthread_mutex_init(&p->pool_m, NULL);
+    pthread_cond_init(&p->empty_c, NULL);
+    pthread_cond_init(&p->full_c, NULL);
+
+    pthread_mutex_lock(&p->pool_m);
+
+#ifdef IN_ORDER
+    if (!(p->t_stack = malloc(tsize * sizeof(*p->t_stack))))
+	return NULL;
+    p->t_stack_top = -1;
+
+    for (i = 0; i < tsize; i++) {
+	t_pool_worker_t *w = &p->t[i];
+	p->t_stack[i] = 0;
+	w->p = p;
+	w->idx = i;
+	w->wait_time = 0;
+	pthread_cond_init(&w->pending_c, NULL);
+	if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w))
+	    return NULL;
+    }
+#else
+    pthread_cond_init(&p->pending_c, NULL);
+
+    for (i = 0; i < tsize; i++) {
+	t_pool_worker_t *w = &p->t[i];
+	w->p = p;
+	w->idx = i;
+	pthread_cond_init(&w->pending_c, NULL);
+	if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w))
+	    return NULL;
+    }
+#endif
+
+    pthread_mutex_unlock(&p->pool_m);
+
+    return p;
+}
+
+/*
+ * Adds an item to the work pool.
+ *
+ * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
+ * result returned. Ie rather than blocking on full queue we're permitted
+ * to return early on "result available" event too.
+ * Caller would then have a while loop around t_pool_dispatch.
+ * Or, return -1 and set errno to EAGAIN to indicate job not yet submitted.
+ *
+ * Returns 0 on success
+ *        -1 on failure
+ */
+int t_pool_dispatch(t_pool *p, t_results_queue *q,
+		    void *(*func)(void *arg), void *arg) {
+    t_pool_job *j = malloc(sizeof(*j));
+
+    if (!j)
+	return -1;
+    j->func = func;
+    j->arg = arg;
+    j->next = NULL;
+    j->p = p;
+    j->q = q;
+    if (q) {
+	pthread_mutex_lock(&q->result_m);
+	j->serial = q->curr_serial++;
+	q->pending++;
+	pthread_mutex_unlock(&q->result_m);
+    } else {
+	j->serial = 0;
+    }
+
+#ifdef DEBUG
+    fprintf(stderr, "Dispatching job %p for queue %p, serial %d\n", j, q, j->serial);
+#endif
+
+    pthread_mutex_lock(&p->pool_m);
+
+    // Check if queue is full
+    while (p->njobs >= p->qsize)
+	pthread_cond_wait(&p->full_c, &p->pool_m);
+
+    p->njobs++;
+
+    if (p->tail) {
+	p->tail->next = j;
+	p->tail = j;
+    } else {
+	p->head = p->tail = j;
+    }
+
+    // Let a worker know we have data.
+#ifdef IN_ORDER
+    if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting)
+	pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
+#else
+    pthread_cond_signal(&p->pending_c);
+#endif
+    pthread_mutex_unlock(&p->pool_m);
+
+#ifdef DEBUG
+    fprintf(stderr, "Dispatched (serial %d)\n", j->serial);
+#endif
+
+    return 0;
+}
+
+/*
+ * As above but optional non-block flag.
+ *
+ * nonblock  0 => block if input queue is full
+ * nonblock +1 => don't block if input queue is full, but do not add task
+ * nonblock -1 => add task regardless of whether queue is full (over-size)
+ */
+int t_pool_dispatch2(t_pool *p, t_results_queue *q,
+		     void *(*func)(void *arg), void *arg, int nonblock) {
+    t_pool_job *j;
+
+#ifdef DEBUG
+    fprintf(stderr, "Dispatching job for queue %p, serial %d\n", q, q->curr_serial);
+#endif
+
+    pthread_mutex_lock(&p->pool_m);
+
+    if (p->njobs >= p->qsize && nonblock == 1) {
+	pthread_mutex_unlock(&p->pool_m);
+	errno = EAGAIN;
+	return -1;
+    }
+
+    if (!(j = malloc(sizeof(*j))))
+	return -1;
+    j->func = func;
+    j->arg = arg;
+    j->next = NULL;
+    j->p = p;
+    j->q = q;
+    if (q) {
+	pthread_mutex_lock(&q->result_m);
+	j->serial = q->curr_serial;
+	pthread_mutex_unlock(&q->result_m);
+    } else {
+	j->serial = 0;
+    }
+
+    if (q) {
+	pthread_mutex_lock(&q->result_m);
+	q->curr_serial++;
+	q->pending++;
+	pthread_mutex_unlock(&q->result_m);
+    }
+
+    // Check if queue is full
+    if (nonblock == 0)
+	while (p->njobs >= p->qsize)
+	    pthread_cond_wait(&p->full_c, &p->pool_m);
+
+    p->njobs++;
+    
+//    if (q->curr_serial % 100 == 0)
+//	fprintf(stderr, "p->njobs = %d    p->qsize = %d\n", p->njobs, p->qsize);
+
+    if (p->tail) {
+	p->tail->next = j;
+	p->tail = j;
+    } else {
+	p->head = p->tail = j;
+    }
+
+#ifdef DEBUG
+    fprintf(stderr, "Dispatched (serial %d)\n", j->serial);
+#endif
+
+    // Let a worker know we have data.
+#ifdef IN_ORDER
+    // Keep incoming queue at 1 per running thread, so there is always
+    // something waiting when they end their current task.  If we go above
+    // this signal to start more threads (if available). This has the effect
+    // of concentrating jobs to fewer cores when we are I/O bound, which in
+    // turn benefits systems with auto CPU frequency scaling.
+    if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting)
+	pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
+#else
+    pthread_cond_signal(&p->pending_c);
+#endif
+
+    pthread_mutex_unlock(&p->pool_m);
+
+    return 0;
+}
+
+/*
+ * Flushes the pool, but doesn't exit. This simply drains the queue and
+ * ensures all worker threads have finished their current task.
+ *
+ * Returns 0 on success;
+ *        -1 on failure
+ */
+int t_pool_flush(t_pool *p) {
+    int i;
+
+#ifdef DEBUG
+    fprintf(stderr, "Flushing pool %p\n", p);
+#endif
+
+    // Drains the queue
+    pthread_mutex_lock(&p->pool_m);
+
+    // Wake up everything for the final sprint!
+    for (i = 0; i < p->tsize; i++)
+	if (p->t_stack[i])
+	    pthread_cond_signal(&p->t[i].pending_c);
+
+    while (p->njobs || p->nwaiting != p->tsize)
+	pthread_cond_wait(&p->empty_c, &p->pool_m);
+
+    pthread_mutex_unlock(&p->pool_m);
+
+#ifdef DEBUG
+    fprintf(stderr, "Flushed complete for pool %p, njobs=%d, nwaiting=%d\n",
+	    p, p->njobs, p->nwaiting);
+#endif
+
+    return 0;
+}
+
+/*
+ * Destroys a thread pool. If 'kill' is true the threads are terminated now,
+ * otherwise they are joined into the main thread so they will finish their
+ * current work load.
+ *
+ * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
+ * t_pool_destroy(p,1) to quickly exit after a fatal error.
+ */
+void t_pool_destroy(t_pool *p, int kill) {
+    int i;
+    
+#ifdef DEBUG
+    fprintf(stderr, "Destroying pool %p, kill=%d\n", p, kill);
+#endif
+
+    /* Send shutdown message to worker threads */
+    if (!kill) {
+	pthread_mutex_lock(&p->pool_m);
+	p->shutdown = 1;
+
+#ifdef DEBUG
+	fprintf(stderr, "Sending shutdown request\n");
+#endif
+
+#ifdef IN_ORDER
+	for (i = 0; i < p->tsize; i++)
+	    pthread_cond_signal(&p->t[i].pending_c);
+#else
+	pthread_cond_broadcast(&p->pending_c);
+#endif
+	pthread_mutex_unlock(&p->pool_m);
+
+#ifdef DEBUG
+	fprintf(stderr, "Shutdown complete\n");
+#endif
+	for (i = 0; i < p->tsize; i++)
+	    pthread_join(p->t[i].tid, NULL);
+    } else {
+	for (i = 0; i < p->tsize; i++)
+	    pthread_kill(p->t[i].tid, SIGINT);
+    }
+
+    pthread_mutex_destroy(&p->pool_m);
+    pthread_cond_destroy(&p->empty_c);
+    pthread_cond_destroy(&p->full_c);
+#ifdef IN_ORDER
+    for (i = 0; i < p->tsize; i++)
+	pthread_cond_destroy(&p->t[i].pending_c);
+#else
+    pthread_cond_destroy(&p->pending_c);
+#endif
+
+#ifdef DEBUG_TIME
+    fprintf(stderr, "Total time=%f\n", p->total_time / 1000000.0);
+    fprintf(stderr, "Wait  time=%f\n", p->wait_time  / 1000000.0);
+    fprintf(stderr, "%d%% utilisation\n",
+	    (int)(100 - ((100.0 * p->wait_time) / p->total_time + 0.5)));
+    for (i = 0; i < p->tsize; i++)
+	fprintf(stderr, "%d: Wait time=%f\n", i,
+		p->t[i].wait_time / 1000000.0);
+#endif
+
+    if (p->t_stack)
+	free(p->t_stack);
+
+    free(p->t);
+    free(p);
+
+#ifdef DEBUG
+    fprintf(stderr, "Destroyed pool %p\n", p);
+#endif
+}
+
+
+/*-----------------------------------------------------------------------------
+ * Test app.
+ */
+
+#ifdef TEST_MAIN
+
+#include <stdio.h>
+#include <math.h>
+
+void *doit(void *arg) {
+    int i, k, x = 0;
+    int job = *(int *)arg;
+    int *res;
+
+    printf("Worker: execute job %d\n", job);
+
+    usleep(random() % 1000000); // to coerce job completion out of order
+    if (0) {
+	for (k = 0; k < 100; k++) {
+	    for (i = 0; i < 100000; i++) {
+		x++;
+		x += x * sin(i);
+		x += x * cos(x);
+	    }
+	}
+	x *= 100;
+	x += job;
+    } else {
+	x = job*job;
+    }
+
+    printf("Worker: job %d terminating, x=%d\n", job, x);
+
+    free(arg);
+
+    res = malloc(sizeof(*res));
+    *res = x;
+
+    return res;
+}
+
+#define NTHREADS 8
+
+int main(int argc, char **argv) {
+    t_pool *p = t_pool_init(NTHREADS*2, NTHREADS);
+    t_results_queue *q = t_results_queue_init();
+    int i;
+    t_pool_result *r;
+
+    // Dispatch jobs
+    for (i = 0; i < 20; i++) {
+	int *ip = malloc(sizeof(*ip));
+	*ip = i;
+	printf("Submitting %d\n", i);
+	t_pool_dispatch(p, q, doit, ip);
+	
+	// Check for results
+	if ((r = t_pool_next_result(q))) {
+	    printf("RESULT: %d\n", *(int *)r->data);
+	    t_pool_delete_result(r, 1);
+	}
+    }
+
+    t_pool_flush(p);
+
+    while ((r = t_pool_next_result(q))) {
+	printf("RESULT: %d\n", *(int *)r->data);
+	t_pool_delete_result(r, 1);
+    }
+
+    t_pool_destroy(p, 0);
+    t_results_queue_destroy(q);
+
+    return 0;
+}
+#endif