comparison ezBAMQC/src/htslib/cram/thread_pool.h @ 0:dfa3745e5fd8

Uploaded
author youngkim
date Thu, 24 Mar 2016 17:12:52 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:dfa3745e5fd8
1 /*
2 Copyright (c) 2013 Genome Research Ltd.
3 Author: James Bonfield <jkb@sanger.ac.uk>
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7
8 1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10
11 2. Redistributions in binary form must reproduce the above copyright notice,
12 this list of conditions and the following disclaimer in the documentation
13 and/or other materials provided with the distribution.
14
15 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
16 Institute nor the names of its contributors may be used to endorse or promote
17 products derived from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND
20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 /*
32 * This file implements a thread pool for multi-threading applications.
33 * It consists of two distinct interfaces: thread pools an results queues.
34 *
35 * The pool of threads is given a function pointer and void* data to pass in.
36 * This means the pool can run jobs of multiple types, albeit first come
37 * first served with no job scheduling.
38 *
39 * Upon completion, the return value from the function pointer is added to
40 * a results queue. We may have multiple queues in use for the one pool.
41 *
42 * An example: reading from BAM and writing to CRAM with 10 threads. We'll
43 * have a pool of 10 threads and two results queues holding decoded BAM blocks
44 * and encoded CRAM blocks respectively.
45 */
46
47 #ifndef _THREAD_POOL_H_
48 #define _THREAD_POOL_H_
49
50 #include <pthread.h>
51
52 struct t_pool;
53 struct t_results_queue;
54
55 typedef struct t_pool_job {
56 void *(*func)(void *arg);
57 void *arg;
58 struct t_pool_job *next;
59
60 struct t_pool *p;
61 struct t_results_queue *q;
62 int serial;
63 } t_pool_job;
64
65 typedef struct t_res {
66 struct t_res *next;
67 int serial; // sequential number for ordering
68 void *data; // result itself
69 } t_pool_result;
70
71 struct t_pool;
72
73 typedef struct {
74 struct t_pool *p;
75 int idx;
76 pthread_t tid;
77 pthread_cond_t pending_c;
78 long long wait_time;
79 } t_pool_worker_t;
80
81 typedef struct t_pool {
82 int qsize; // size of queue
83 int njobs; // pending job count
84 int nwaiting; // how many workers waiting for new jobs
85 int shutdown; // true if pool is being destroyed
86
87 // queue of pending jobs
88 t_pool_job *head, *tail;
89
90 // threads
91 int tsize; // maximum number of jobs
92 t_pool_worker_t *t;
93
94 // Mutexes
95 pthread_mutex_t pool_m; // used when updating head/tail
96
97 pthread_cond_t empty_c;
98 pthread_cond_t pending_c; // not empty
99 pthread_cond_t full_c;
100
101 // array of worker IDs free
102 int *t_stack, t_stack_top;
103
104 // Debugging to check wait time
105 long long total_time, wait_time;
106 } t_pool;
107
108 typedef struct t_results_queue {
109 t_pool_result *result_head;
110 t_pool_result *result_tail;
111 int next_serial;
112 int curr_serial;
113 int queue_len; // number of items in queue
114 int pending; // number of pending items (in progress or in pool list)
115 pthread_mutex_t result_m;
116 pthread_cond_t result_avail_c;
117 } t_results_queue;
118
119
120 /*
121 * Creates a worker pool of length qsize with tsize worker threads.
122 *
123 * Returns pool pointer on success;
124 * NULL on failure
125 */
126 t_pool *t_pool_init(int qsize, int tsize);
127
128 /*
129 * Adds an item to the work pool.
130 *
131 * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
132 * result returned. Ie rather than blocking on full queue we're permitted
133 * to return early on "result available" event too.
134 * Caller would then have a while loop around t_pool_dispatch.
135 * Or, return -1 and set errno to E_AGAIN to indicate job not yet submitted.
136 *
137 * Returns 0 on success
138 * -1 on failure
139 */
140 int t_pool_dispatch(t_pool *p, t_results_queue *q,
141 void *(*func)(void *arg), void *arg);
142 int t_pool_dispatch2(t_pool *p, t_results_queue *q,
143 void *(*func)(void *arg), void *arg, int nonblock);
144
145 /*
146 * Flushes the pool, but doesn't exit. This simply drains the queue and
147 * ensures all worker threads have finished their current task.
148 *
149 * Returns 0 on success;
150 * -1 on failure
151 */
152 int t_pool_flush(t_pool *p);
153
154 /*
155 * Destroys a thread pool. If 'kill' is true the threads are terminated now,
156 * otherwise they are joined into the main thread so they will finish their
157 * current work load.
158 *
159 * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
160 * t_pool_destroy(p,1) to quickly exit after a fatal error.
161 */
162 void t_pool_destroy(t_pool *p, int kill);
163
164 /*
165 * Pulls a result off the head of the result queue. Caller should
166 * free it (and any internals as appropriate) after use. This doesn't
167 * wait for a result to be present.
168 *
169 * Results will be returned in strict order.
170 *
171 * Returns t_pool_result pointer if a result is ready.
172 * NULL if not.
173 */
174 t_pool_result *t_pool_next_result(t_results_queue *q);
175 t_pool_result *t_pool_next_result_wait(t_results_queue *q);
176
177 /*
178 * Frees a result 'r' and if free_data is true also frees
179 * the internal r->data result too.
180 */
181 void t_pool_delete_result(t_pool_result *r, int free_data);
182
183 /*
184 * Initialises a results queue.
185 *
186 * Results queue pointer on success;
187 * NULL on failure
188 */
189 t_results_queue *t_results_queue_init(void);
190
191 /* Deallocates memory for a results queue */
192 void t_results_queue_destroy(t_results_queue *q);
193
194 /*
195 * Returns true if there are no items on the finished results queue and
196 * also none still pending.
197 */
198 int t_pool_results_queue_empty(t_results_queue *q);
199
200 /*
201 * Returns the number of completed jobs on the results queue.
202 */
203 int t_pool_results_queue_len(t_results_queue *q);
204
205 /*
206 * Returns the number of completed jobs plus the number queued up to run.
207 */
208 int t_pool_results_queue_sz(t_results_queue *q);
209
210 #endif /* _THREAD_POOL_H_ */