0
|
1 /*
|
|
2 Copyright (c) 2013 Genome Research Ltd.
|
|
3 Author: James Bonfield <jkb@sanger.ac.uk>
|
|
4
|
|
5 Redistribution and use in source and binary forms, with or without
|
|
6 modification, are permitted provided that the following conditions are met:
|
|
7
|
|
8 1. Redistributions of source code must retain the above copyright notice,
|
|
9 this list of conditions and the following disclaimer.
|
|
10
|
|
11 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
12 this list of conditions and the following disclaimer in the documentation
|
|
13 and/or other materials provided with the distribution.
|
|
14
|
|
15 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
|
|
16 Institute nor the names of its contributors may be used to endorse or promote
|
|
17 products derived from this software without specific prior written permission.
|
|
18
|
|
19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND
|
|
20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
|
|
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
|
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
|
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
29 */
|
|
30
|
|
31 /*
|
|
32 * This file implements a thread pool for multi-threading applications.
|
|
33 * It consists of two distinct interfaces: thread pools an results queues.
|
|
34 *
|
|
35 * The pool of threads is given a function pointer and void* data to pass in.
|
|
36 * This means the pool can run jobs of multiple types, albeit first come
|
|
37 * first served with no job scheduling.
|
|
38 *
|
|
39 * Upon completion, the return value from the function pointer is added to
|
|
40 * a results queue. We may have multiple queues in use for the one pool.
|
|
41 *
|
|
42 * An example: reading from BAM and writing to CRAM with 10 threads. We'll
|
|
43 * have a pool of 10 threads and two results queues holding decoded BAM blocks
|
|
44 * and encoded CRAM blocks respectively.
|
|
45 */
|
|
46
|
|
47 #ifndef _THREAD_POOL_H_
|
|
48 #define _THREAD_POOL_H_
|
|
49
|
|
50 #include <pthread.h>
|
|
51
|
|
52 struct t_pool;
|
|
53 struct t_results_queue;
|
|
54
|
|
55 typedef struct t_pool_job {
|
|
56 void *(*func)(void *arg);
|
|
57 void *arg;
|
|
58 struct t_pool_job *next;
|
|
59
|
|
60 struct t_pool *p;
|
|
61 struct t_results_queue *q;
|
|
62 int serial;
|
|
63 } t_pool_job;
|
|
64
|
|
65 typedef struct t_res {
|
|
66 struct t_res *next;
|
|
67 int serial; // sequential number for ordering
|
|
68 void *data; // result itself
|
|
69 } t_pool_result;
|
|
70
|
|
71 struct t_pool;
|
|
72
|
|
73 typedef struct {
|
|
74 struct t_pool *p;
|
|
75 int idx;
|
|
76 pthread_t tid;
|
|
77 pthread_cond_t pending_c;
|
|
78 long long wait_time;
|
|
79 } t_pool_worker_t;
|
|
80
|
|
81 typedef struct t_pool {
|
|
82 int qsize; // size of queue
|
|
83 int njobs; // pending job count
|
|
84 int nwaiting; // how many workers waiting for new jobs
|
|
85 int shutdown; // true if pool is being destroyed
|
|
86
|
|
87 // queue of pending jobs
|
|
88 t_pool_job *head, *tail;
|
|
89
|
|
90 // threads
|
|
91 int tsize; // maximum number of jobs
|
|
92 t_pool_worker_t *t;
|
|
93
|
|
94 // Mutexes
|
|
95 pthread_mutex_t pool_m; // used when updating head/tail
|
|
96
|
|
97 pthread_cond_t empty_c;
|
|
98 pthread_cond_t pending_c; // not empty
|
|
99 pthread_cond_t full_c;
|
|
100
|
|
101 // array of worker IDs free
|
|
102 int *t_stack, t_stack_top;
|
|
103
|
|
104 // Debugging to check wait time
|
|
105 long long total_time, wait_time;
|
|
106 } t_pool;
|
|
107
|
|
108 typedef struct t_results_queue {
|
|
109 t_pool_result *result_head;
|
|
110 t_pool_result *result_tail;
|
|
111 int next_serial;
|
|
112 int curr_serial;
|
|
113 int queue_len; // number of items in queue
|
|
114 int pending; // number of pending items (in progress or in pool list)
|
|
115 pthread_mutex_t result_m;
|
|
116 pthread_cond_t result_avail_c;
|
|
117 } t_results_queue;
|
|
118
|
|
119
|
|
120 /*
|
|
121 * Creates a worker pool of length qsize with tsize worker threads.
|
|
122 *
|
|
123 * Returns pool pointer on success;
|
|
124 * NULL on failure
|
|
125 */
|
|
126 t_pool *t_pool_init(int qsize, int tsize);
|
|
127
|
|
128 /*
|
|
129 * Adds an item to the work pool.
|
|
130 *
|
|
131 * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
|
|
132 * result returned. Ie rather than blocking on full queue we're permitted
|
|
133 * to return early on "result available" event too.
|
|
134 * Caller would then have a while loop around t_pool_dispatch.
|
|
135 * Or, return -1 and set errno to E_AGAIN to indicate job not yet submitted.
|
|
136 *
|
|
137 * Returns 0 on success
|
|
138 * -1 on failure
|
|
139 */
|
|
140 int t_pool_dispatch(t_pool *p, t_results_queue *q,
|
|
141 void *(*func)(void *arg), void *arg);
|
|
142 int t_pool_dispatch2(t_pool *p, t_results_queue *q,
|
|
143 void *(*func)(void *arg), void *arg, int nonblock);
|
|
144
|
|
145 /*
|
|
146 * Flushes the pool, but doesn't exit. This simply drains the queue and
|
|
147 * ensures all worker threads have finished their current task.
|
|
148 *
|
|
149 * Returns 0 on success;
|
|
150 * -1 on failure
|
|
151 */
|
|
152 int t_pool_flush(t_pool *p);
|
|
153
|
|
154 /*
|
|
155 * Destroys a thread pool. If 'kill' is true the threads are terminated now,
|
|
156 * otherwise they are joined into the main thread so they will finish their
|
|
157 * current work load.
|
|
158 *
|
|
159 * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
|
|
160 * t_pool_destroy(p,1) to quickly exit after a fatal error.
|
|
161 */
|
|
162 void t_pool_destroy(t_pool *p, int kill);
|
|
163
|
|
164 /*
|
|
165 * Pulls a result off the head of the result queue. Caller should
|
|
166 * free it (and any internals as appropriate) after use. This doesn't
|
|
167 * wait for a result to be present.
|
|
168 *
|
|
169 * Results will be returned in strict order.
|
|
170 *
|
|
171 * Returns t_pool_result pointer if a result is ready.
|
|
172 * NULL if not.
|
|
173 */
|
|
174 t_pool_result *t_pool_next_result(t_results_queue *q);
|
|
175 t_pool_result *t_pool_next_result_wait(t_results_queue *q);
|
|
176
|
|
177 /*
|
|
178 * Frees a result 'r' and if free_data is true also frees
|
|
179 * the internal r->data result too.
|
|
180 */
|
|
181 void t_pool_delete_result(t_pool_result *r, int free_data);
|
|
182
|
|
183 /*
|
|
184 * Initialises a results queue.
|
|
185 *
|
|
186 * Results queue pointer on success;
|
|
187 * NULL on failure
|
|
188 */
|
|
189 t_results_queue *t_results_queue_init(void);
|
|
190
|
|
191 /* Deallocates memory for a results queue */
|
|
192 void t_results_queue_destroy(t_results_queue *q);
|
|
193
|
|
194 /*
|
|
195 * Returns true if there are no items on the finished results queue and
|
|
196 * also none still pending.
|
|
197 */
|
|
198 int t_pool_results_queue_empty(t_results_queue *q);
|
|
199
|
|
200 /*
|
|
201 * Returns the number of completed jobs on the results queue.
|
|
202 */
|
|
203 int t_pool_results_queue_len(t_results_queue *q);
|
|
204
|
|
205 /*
|
|
206 * Returns the number of completed jobs plus the number queued up to run.
|
|
207 */
|
|
208 int t_pool_results_queue_sz(t_results_queue *q);
|
|
209
|
|
210 #endif /* _THREAD_POOL_H_ */
|