annotate ezBAMQC/src/htslib/cram/thread_pool.h @ 11:5bfcc6c131ed

Uploaded
author cshl-bsr
date Wed, 30 Mar 2016 12:14:21 -0400
parents dfa3745e5fd8
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
0
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
1 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
2 Copyright (c) 2013 Genome Research Ltd.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
3 Author: James Bonfield <jkb@sanger.ac.uk>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
4
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
5 Redistribution and use in source and binary forms, with or without
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
6 modification, are permitted provided that the following conditions are met:
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
7
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
8 1. Redistributions of source code must retain the above copyright notice,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
9 this list of conditions and the following disclaimer.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
10
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
11 2. Redistributions in binary form must reproduce the above copyright notice,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
12 this list of conditions and the following disclaimer in the documentation
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
13 and/or other materials provided with the distribution.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
14
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
15 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
16 Institute nor the names of its contributors may be used to endorse or promote
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
17 products derived from this software without specific prior written permission.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
18
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
29 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
30
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
31 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
32 * This file implements a thread pool for multi-threading applications.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
33 * It consists of two distinct interfaces: thread pools an results queues.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
34 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
35 * The pool of threads is given a function pointer and void* data to pass in.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
36 * This means the pool can run jobs of multiple types, albeit first come
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
37 * first served with no job scheduling.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
38 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
39 * Upon completion, the return value from the function pointer is added to
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
40 * a results queue. We may have multiple queues in use for the one pool.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
41 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
42 * An example: reading from BAM and writing to CRAM with 10 threads. We'll
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
43 * have a pool of 10 threads and two results queues holding decoded BAM blocks
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
44 * and encoded CRAM blocks respectively.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
45 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
46
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
47 #ifndef _THREAD_POOL_H_
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
48 #define _THREAD_POOL_H_
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
49
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
50 #include <pthread.h>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
51
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
52 struct t_pool;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
53 struct t_results_queue;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
54
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
55 typedef struct t_pool_job {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
56 void *(*func)(void *arg);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
57 void *arg;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
58 struct t_pool_job *next;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
59
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
60 struct t_pool *p;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
61 struct t_results_queue *q;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
62 int serial;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
63 } t_pool_job;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
64
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
65 typedef struct t_res {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
66 struct t_res *next;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
67 int serial; // sequential number for ordering
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
68 void *data; // result itself
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
69 } t_pool_result;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
70
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
71 struct t_pool;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
72
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
73 typedef struct {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
74 struct t_pool *p;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
75 int idx;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
76 pthread_t tid;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
77 pthread_cond_t pending_c;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
78 long long wait_time;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
79 } t_pool_worker_t;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
80
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
81 typedef struct t_pool {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
82 int qsize; // size of queue
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
83 int njobs; // pending job count
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
84 int nwaiting; // how many workers waiting for new jobs
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
85 int shutdown; // true if pool is being destroyed
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
86
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
87 // queue of pending jobs
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
88 t_pool_job *head, *tail;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
89
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
90 // threads
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
91 int tsize; // maximum number of jobs
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
92 t_pool_worker_t *t;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
93
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
94 // Mutexes
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
95 pthread_mutex_t pool_m; // used when updating head/tail
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
96
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
97 pthread_cond_t empty_c;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
98 pthread_cond_t pending_c; // not empty
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
99 pthread_cond_t full_c;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
100
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
101 // array of worker IDs free
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
102 int *t_stack, t_stack_top;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
103
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
104 // Debugging to check wait time
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
105 long long total_time, wait_time;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
106 } t_pool;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
107
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
108 typedef struct t_results_queue {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
109 t_pool_result *result_head;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
110 t_pool_result *result_tail;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
111 int next_serial;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
112 int curr_serial;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
113 int queue_len; // number of items in queue
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
114 int pending; // number of pending items (in progress or in pool list)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
115 pthread_mutex_t result_m;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
116 pthread_cond_t result_avail_c;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
117 } t_results_queue;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
118
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
119
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
120 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
121 * Creates a worker pool of length qsize with tsize worker threads.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
122 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
123 * Returns pool pointer on success;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
124 * NULL on failure
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
125 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
126 t_pool *t_pool_init(int qsize, int tsize);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
127
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
128 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
129 * Adds an item to the work pool.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
130 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
131 * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
132 * result returned. Ie rather than blocking on full queue we're permitted
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
133 * to return early on "result available" event too.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
134 * Caller would then have a while loop around t_pool_dispatch.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
135 * Or, return -1 and set errno to E_AGAIN to indicate job not yet submitted.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
136 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
137 * Returns 0 on success
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
138 * -1 on failure
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
139 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
140 int t_pool_dispatch(t_pool *p, t_results_queue *q,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
141 void *(*func)(void *arg), void *arg);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
142 int t_pool_dispatch2(t_pool *p, t_results_queue *q,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
143 void *(*func)(void *arg), void *arg, int nonblock);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
144
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
145 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
146 * Flushes the pool, but doesn't exit. This simply drains the queue and
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
147 * ensures all worker threads have finished their current task.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
148 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
149 * Returns 0 on success;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
150 * -1 on failure
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
151 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
152 int t_pool_flush(t_pool *p);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
153
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
154 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
155 * Destroys a thread pool. If 'kill' is true the threads are terminated now,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
156 * otherwise they are joined into the main thread so they will finish their
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
157 * current work load.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
158 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
159 * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
160 * t_pool_destroy(p,1) to quickly exit after a fatal error.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
161 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
162 void t_pool_destroy(t_pool *p, int kill);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
163
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
164 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
165 * Pulls a result off the head of the result queue. Caller should
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
166 * free it (and any internals as appropriate) after use. This doesn't
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
167 * wait for a result to be present.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
168 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
169 * Results will be returned in strict order.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
170 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
171 * Returns t_pool_result pointer if a result is ready.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
172 * NULL if not.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
173 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
174 t_pool_result *t_pool_next_result(t_results_queue *q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
175 t_pool_result *t_pool_next_result_wait(t_results_queue *q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
176
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
177 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
178 * Frees a result 'r' and if free_data is true also frees
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
179 * the internal r->data result too.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
180 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
181 void t_pool_delete_result(t_pool_result *r, int free_data);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
182
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
183 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
184 * Initialises a results queue.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
185 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
186 * Results queue pointer on success;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
187 * NULL on failure
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
188 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
189 t_results_queue *t_results_queue_init(void);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
190
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
191 /* Deallocates memory for a results queue */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
192 void t_results_queue_destroy(t_results_queue *q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
193
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
194 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
195 * Returns true if there are no items on the finished results queue and
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
196 * also none still pending.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
197 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
198 int t_pool_results_queue_empty(t_results_queue *q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
199
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
200 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
201 * Returns the number of completed jobs on the results queue.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
202 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
203 int t_pool_results_queue_len(t_results_queue *q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
204
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
205 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
206 * Returns the number of completed jobs plus the number queued up to run.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
207 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
208 int t_pool_results_queue_sz(t_results_queue *q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
209
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
210 #endif /* _THREAD_POOL_H_ */