annotate ezBAMQC/src/htslib/cram/thread_pool.c @ 0:dfa3745e5fd8

Uploaded
author youngkim
date Thu, 24 Mar 2016 17:12:52 -0400
parents
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
0
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
1 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
2 Copyright (c) 2013 Genome Research Ltd.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
3 Author: James Bonfield <jkb@sanger.ac.uk>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
4
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
5 Redistribution and use in source and binary forms, with or without
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
6 modification, are permitted provided that the following conditions are met:
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
7
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
8 1. Redistributions of source code must retain the above copyright notice,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
9 this list of conditions and the following disclaimer.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
10
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
11 2. Redistributions in binary form must reproduce the above copyright notice,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
12 this list of conditions and the following disclaimer in the documentation
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
13 and/or other materials provided with the distribution.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
14
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
15 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
16 Institute nor the names of its contributors may be used to endorse or promote
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
17 products derived from this software without specific prior written permission.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
18
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
29 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
30
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
31 #include <stdlib.h>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
32
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
33 #include <signal.h>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
34 #include <errno.h>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
35 #include <stdio.h>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
36 #include <string.h>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
37 #include <sys/time.h>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
38 #include <assert.h>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
39
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
40 #include "cram/thread_pool.h"
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
41
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
42 //#define DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
43 //#define DEBUG_TIME
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
44
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
45 #define IN_ORDER
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
46
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
47 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
48 static int worker_id(t_pool *p) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
49 int i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
50 pthread_t s = pthread_self();
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
51 for (i = 0; i < p->tsize; i++) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
52 if (pthread_equal(s, p->t[i].tid))
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
53 return i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
54 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
55 return -1;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
56 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
57 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
58
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
59 /* ----------------------------------------------------------------------------
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
60 * A queue to hold results from the thread pool.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
61 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
62 * Each thread pool may have jobs of multiple types being queued up and
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
63 * interleaved, so we allow several results queue per pool.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
64 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
65 * The jobs themselves are expected to push their results onto their
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
66 * appropriate results queue.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
67 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
68
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
69 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
70 * Adds a result to the end of the result queue.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
71 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
72 * Returns 0 on success;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
73 * -1 on failure
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
74 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
75 static int t_pool_add_result(t_pool_job *j, void *data) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
76 t_results_queue *q = j->q;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
77 t_pool_result *r;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
78
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
79 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
80 fprintf(stderr, "%d: Adding resulting to queue %p, serial %d\n",
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
81 worker_id(j->p), q, j->serial);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
82 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
83
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
84 /* No results queue is fine if we don't want any results back */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
85 if (!q)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
86 return 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
87
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
88 if (!(r = malloc(sizeof(*r))))
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
89 return -1;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
90
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
91 r->next = NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
92 r->data = data;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
93 r->serial = j->serial;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
94
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
95 pthread_mutex_lock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
96 if (q->result_tail) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
97 q->result_tail->next = r;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
98 q->result_tail = r;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
99 } else {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
100 q->result_head = q->result_tail = r;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
101 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
102 q->queue_len++;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
103 q->pending--;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
104
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
105 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
106 fprintf(stderr, "%d: Broadcasting result_avail (id %d)\n",
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
107 worker_id(j->p), r->serial);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
108 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
109 pthread_cond_signal(&q->result_avail_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
110 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
111 fprintf(stderr, "%d: Broadcast complete\n", worker_id(j->p));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
112 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
113
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
114 pthread_mutex_unlock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
115
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
116 return 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
117 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
118
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
119 /* Core of t_pool_next_result() */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
120 static t_pool_result *t_pool_next_result_locked(t_results_queue *q) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
121 t_pool_result *r, *last;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
122
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
123 for (last = NULL, r = q->result_head; r; last = r, r = r->next) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
124 if (r->serial == q->next_serial)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
125 break;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
126 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
127
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
128 if (r) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
129 if (q->result_head == r)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
130 q->result_head = r->next;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
131 else
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
132 last->next = r->next;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
133
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
134 if (q->result_tail == r)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
135 q->result_tail = last;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
136
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
137 if (!q->result_head)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
138 q->result_tail = NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
139
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
140 q->next_serial++;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
141 q->queue_len--;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
142 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
143
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
144 return r;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
145 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
146
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
147 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
148 * Pulls a result off the head of the result queue. Caller should
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
149 * free it (and any internals as appropriate) after use. This doesn't
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
150 * wait for a result to be present.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
151 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
152 * Results will be returned in strict order.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
153 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
154 * Returns t_pool_result pointer if a result is ready.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
155 * NULL if not.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
156 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
157 t_pool_result *t_pool_next_result(t_results_queue *q) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
158 t_pool_result *r;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
159
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
160 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
161 fprintf(stderr, "Requesting next result on queue %p\n", q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
162 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
163
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
164 pthread_mutex_lock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
165 r = t_pool_next_result_locked(q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
166 pthread_mutex_unlock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
167
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
168 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
169 fprintf(stderr, "(q=%p) Found %p\n", q, r);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
170 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
171
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
172 return r;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
173 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
174
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
175 t_pool_result *t_pool_next_result_wait(t_results_queue *q) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
176 t_pool_result *r;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
177
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
178 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
179 fprintf(stderr, "Waiting for result %d...\n", q->next_serial);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
180 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
181
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
182 pthread_mutex_lock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
183 while (!(r = t_pool_next_result_locked(q))) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
184 /* Possible race here now avoided via _locked() call, but incase... */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
185 struct timeval now;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
186 struct timespec timeout;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
187
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
188 gettimeofday(&now, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
189 timeout.tv_sec = now.tv_sec + 10;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
190 timeout.tv_nsec = now.tv_usec * 1000;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
191
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
192 pthread_cond_timedwait(&q->result_avail_c, &q->result_m, &timeout);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
193 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
194 pthread_mutex_unlock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
195
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
196 return r;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
197 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
198
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
199 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
200 * Returns true if there are no items on the finished results queue and
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
201 * also none still pending.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
202 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
203 int t_pool_results_queue_empty(t_results_queue *q) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
204 int empty;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
205
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
206 pthread_mutex_lock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
207 empty = q->queue_len == 0 && q->pending == 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
208 pthread_mutex_unlock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
209
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
210 return empty;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
211 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
212
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
213
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
214 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
215 * Returns the number of completed jobs on the results queue.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
216 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
217 int t_pool_results_queue_len(t_results_queue *q) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
218 int len;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
219
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
220 pthread_mutex_lock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
221 len = q->queue_len;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
222 pthread_mutex_unlock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
223
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
224 return len;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
225 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
226
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
227 int t_pool_results_queue_sz(t_results_queue *q) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
228 int len;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
229
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
230 pthread_mutex_lock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
231 len = q->queue_len + q->pending;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
232 pthread_mutex_unlock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
233
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
234 return len;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
235 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
236
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
237 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
238 * Frees a result 'r' and if free_data is true also frees
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
239 * the internal r->data result too.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
240 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
241 void t_pool_delete_result(t_pool_result *r, int free_data) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
242 if (!r)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
243 return;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
244
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
245 if (free_data && r->data)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
246 free(r->data);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
247
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
248 free(r);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
249 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
250
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
251 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
252 * Initialises a results queue.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
253 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
254 * Results queue pointer on success;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
255 * NULL on failure
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
256 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
257 t_results_queue *t_results_queue_init(void) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
258 t_results_queue *q = malloc(sizeof(*q));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
259
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
260 pthread_mutex_init(&q->result_m, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
261 pthread_cond_init(&q->result_avail_c, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
262
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
263 q->result_head = NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
264 q->result_tail = NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
265 q->next_serial = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
266 q->curr_serial = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
267 q->queue_len = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
268 q->pending = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
269
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
270 return q;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
271 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
272
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
273 /* Deallocates memory for a results queue */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
274 void t_results_queue_destroy(t_results_queue *q) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
275 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
276 fprintf(stderr, "Destroying results queue %p\n", q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
277 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
278
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
279 if (!q)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
280 return;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
281
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
282 pthread_mutex_destroy(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
283 pthread_cond_destroy(&q->result_avail_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
284
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
285 memset(q, 0xbb, sizeof(*q));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
286 free(q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
287
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
288 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
289 fprintf(stderr, "Destroyed results queue %p\n", q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
290 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
291 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
292
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
293 /* ----------------------------------------------------------------------------
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
294 * The thread pool.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
295 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
296
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
297 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
298
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
299 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
300 * A worker thread.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
301 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
302 * Each thread waits for the pool to be non-empty.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
303 * As soon as this applies, one of them succeeds in getting the lock
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
304 * and then executes the job.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
305 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
306 static void *t_pool_worker(void *arg) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
307 t_pool_worker_t *w = (t_pool_worker_t *)arg;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
308 t_pool *p = w->p;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
309 t_pool_job *j;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
310 #ifdef DEBUG_TIME
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
311 struct timeval t1, t2, t3;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
312 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
313
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
314 for (;;) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
315 // Pop an item off the pool queue
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
316 #ifdef DEBUG_TIME
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
317 gettimeofday(&t1, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
318 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
319
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
320 pthread_mutex_lock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
321
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
322 #ifdef DEBUG_TIME
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
323 gettimeofday(&t2, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
324 p->wait_time += TDIFF(t2,t1);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
325 w->wait_time += TDIFF(t2,t1);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
326 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
327
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
328 // If there is something on the job list and a higher priority
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
329 // thread waiting, let it handle this instead.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
330 // while (p->head && p->t_stack_top != -1 && p->t_stack_top < w->idx) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
331 // pthread_mutex_unlock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
332 // pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
333 // pthread_mutex_lock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
334 // }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
335
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
336 while (!p->head && !p->shutdown) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
337 p->nwaiting++;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
338
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
339 if (p->njobs == 0)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
340 pthread_cond_signal(&p->empty_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
341 #ifdef DEBUG_TIME
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
342 gettimeofday(&t2, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
343 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
344
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
345 #ifdef IN_ORDER
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
346 // Push this thread to the top of the waiting stack
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
347 if (p->t_stack_top == -1 || p->t_stack_top > w->idx)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
348 p->t_stack_top = w->idx;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
349
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
350 p->t_stack[w->idx] = 1;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
351 pthread_cond_wait(&w->pending_c, &p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
352 p->t_stack[w->idx] = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
353
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
354 /* Find new t_stack_top */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
355 {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
356 int i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
357 p->t_stack_top = -1;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
358 for (i = 0; i < p->tsize; i++) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
359 if (p->t_stack[i]) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
360 p->t_stack_top = i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
361 break;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
362 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
363 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
364 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
365 #else
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
366 pthread_cond_wait(&p->pending_c, &p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
367 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
368
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
369 #ifdef DEBUG_TIME
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
370 gettimeofday(&t3, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
371 p->wait_time += TDIFF(t3,t2);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
372 w->wait_time += TDIFF(t3,t2);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
373 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
374 p->nwaiting--;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
375 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
376
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
377 if (p->shutdown) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
378 #ifdef DEBUG_TIME
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
379 p->total_time += TDIFF(t3,t1);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
380 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
381 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
382 fprintf(stderr, "%d: Shutting down\n", worker_id(p));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
383 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
384 pthread_mutex_unlock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
385 pthread_exit(NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
386 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
387
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
388 j = p->head;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
389 if (!(p->head = j->next))
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
390 p->tail = NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
391
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
392 if (p->njobs-- >= p->qsize)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
393 pthread_cond_signal(&p->full_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
394
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
395 if (p->njobs == 0)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
396 pthread_cond_signal(&p->empty_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
397
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
398 pthread_mutex_unlock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
399
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
400 // We have job 'j' - now execute it.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
401 t_pool_add_result(j, j->func(j->arg));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
402 #ifdef DEBUG_TIME
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
403 pthread_mutex_lock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
404 gettimeofday(&t3, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
405 p->total_time += TDIFF(t3,t1);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
406 pthread_mutex_unlock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
407 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
408 memset(j, 0xbb, sizeof(*j));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
409 free(j);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
410 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
411
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
412 return NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
413 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
414
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
415 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
416 * Creates a worker pool of length qsize with tsize worker threads.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
417 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
418 * Returns pool pointer on success;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
419 * NULL on failure
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
420 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
421 t_pool *t_pool_init(int qsize, int tsize) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
422 int i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
423 t_pool *p = malloc(sizeof(*p));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
424 p->qsize = qsize;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
425 p->tsize = tsize;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
426 p->njobs = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
427 p->nwaiting = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
428 p->shutdown = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
429 p->head = p->tail = NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
430 p->t_stack = NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
431 #ifdef DEBUG_TIME
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
432 p->total_time = p->wait_time = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
433 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
434
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
435 p->t = malloc(tsize * sizeof(p->t[0]));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
436
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
437 pthread_mutex_init(&p->pool_m, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
438 pthread_cond_init(&p->empty_c, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
439 pthread_cond_init(&p->full_c, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
440
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
441 pthread_mutex_lock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
442
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
443 #ifdef IN_ORDER
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
444 if (!(p->t_stack = malloc(tsize * sizeof(*p->t_stack))))
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
445 return NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
446 p->t_stack_top = -1;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
447
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
448 for (i = 0; i < tsize; i++) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
449 t_pool_worker_t *w = &p->t[i];
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
450 p->t_stack[i] = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
451 w->p = p;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
452 w->idx = i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
453 w->wait_time = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
454 pthread_cond_init(&w->pending_c, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
455 if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w))
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
456 return NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
457 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
458 #else
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
459 pthread_cond_init(&p->pending_c, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
460
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
461 for (i = 0; i < tsize; i++) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
462 t_pool_worker_t *w = &p->t[i];
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
463 w->p = p;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
464 w->idx = i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
465 pthread_cond_init(&w->pending_c, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
466 if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w))
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
467 return NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
468 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
469 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
470
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
471 pthread_mutex_unlock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
472
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
473 return p;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
474 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
475
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
476 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
477 * Adds an item to the work pool.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
478 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
479 * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
480 * result returned. Ie rather than blocking on full queue we're permitted
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
481 * to return early on "result available" event too.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
482 * Caller would then have a while loop around t_pool_dispatch.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
483 * Or, return -1 and set errno to EAGAIN to indicate job not yet submitted.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
484 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
485 * Returns 0 on success
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
486 * -1 on failure
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
487 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
488 int t_pool_dispatch(t_pool *p, t_results_queue *q,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
489 void *(*func)(void *arg), void *arg) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
490 t_pool_job *j = malloc(sizeof(*j));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
491
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
492 if (!j)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
493 return -1;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
494 j->func = func;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
495 j->arg = arg;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
496 j->next = NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
497 j->p = p;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
498 j->q = q;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
499 if (q) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
500 pthread_mutex_lock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
501 j->serial = q->curr_serial++;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
502 q->pending++;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
503 pthread_mutex_unlock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
504 } else {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
505 j->serial = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
506 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
507
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
508 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
509 fprintf(stderr, "Dispatching job %p for queue %p, serial %d\n", j, q, j->serial);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
510 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
511
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
512 pthread_mutex_lock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
513
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
514 // Check if queue is full
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
515 while (p->njobs >= p->qsize)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
516 pthread_cond_wait(&p->full_c, &p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
517
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
518 p->njobs++;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
519
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
520 if (p->tail) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
521 p->tail->next = j;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
522 p->tail = j;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
523 } else {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
524 p->head = p->tail = j;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
525 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
526
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
527 // Let a worker know we have data.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
528 #ifdef IN_ORDER
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
529 if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
530 pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
531 #else
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
532 pthread_cond_signal(&p->pending_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
533 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
534 pthread_mutex_unlock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
535
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
536 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
537 fprintf(stderr, "Dispatched (serial %d)\n", j->serial);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
538 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
539
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
540 return 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
541 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
542
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
543 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
544 * As above but optional non-block flag.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
545 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
546 * nonblock 0 => block if input queue is full
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
547 * nonblock +1 => don't block if input queue is full, but do not add task
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
548 * nonblock -1 => add task regardless of whether queue is full (over-size)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
549 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
550 int t_pool_dispatch2(t_pool *p, t_results_queue *q,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
551 void *(*func)(void *arg), void *arg, int nonblock) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
552 t_pool_job *j;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
553
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
554 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
555 fprintf(stderr, "Dispatching job for queue %p, serial %d\n", q, q->curr_serial);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
556 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
557
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
558 pthread_mutex_lock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
559
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
560 if (p->njobs >= p->qsize && nonblock == 1) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
561 pthread_mutex_unlock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
562 errno = EAGAIN;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
563 return -1;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
564 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
565
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
566 if (!(j = malloc(sizeof(*j))))
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
567 return -1;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
568 j->func = func;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
569 j->arg = arg;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
570 j->next = NULL;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
571 j->p = p;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
572 j->q = q;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
573 if (q) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
574 pthread_mutex_lock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
575 j->serial = q->curr_serial;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
576 pthread_mutex_unlock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
577 } else {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
578 j->serial = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
579 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
580
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
581 if (q) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
582 pthread_mutex_lock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
583 q->curr_serial++;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
584 q->pending++;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
585 pthread_mutex_unlock(&q->result_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
586 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
587
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
588 // Check if queue is full
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
589 if (nonblock == 0)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
590 while (p->njobs >= p->qsize)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
591 pthread_cond_wait(&p->full_c, &p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
592
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
593 p->njobs++;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
594
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
595 // if (q->curr_serial % 100 == 0)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
596 // fprintf(stderr, "p->njobs = %d p->qsize = %d\n", p->njobs, p->qsize);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
597
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
598 if (p->tail) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
599 p->tail->next = j;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
600 p->tail = j;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
601 } else {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
602 p->head = p->tail = j;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
603 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
604
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
605 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
606 fprintf(stderr, "Dispatched (serial %d)\n", j->serial);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
607 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
608
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
609 // Let a worker know we have data.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
610 #ifdef IN_ORDER
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
611 // Keep incoming queue at 1 per running thread, so there is always
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
612 // something waiting when they end their current task. If we go above
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
613 // this signal to start more threads (if available). This has the effect
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
614 // of concentrating jobs to fewer cores when we are I/O bound, which in
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
615 // turn benefits systems with auto CPU frequency scaling.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
616 if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
617 pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
618 #else
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
619 pthread_cond_signal(&p->pending_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
620 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
621
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
622 pthread_mutex_unlock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
623
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
624 return 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
625 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
626
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
627 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
628 * Flushes the pool, but doesn't exit. This simply drains the queue and
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
629 * ensures all worker threads have finished their current task.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
630 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
631 * Returns 0 on success;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
632 * -1 on failure
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
633 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
634 int t_pool_flush(t_pool *p) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
635 int i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
636
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
637 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
638 fprintf(stderr, "Flushing pool %p\n", p);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
639 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
640
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
641 // Drains the queue
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
642 pthread_mutex_lock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
643
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
644 // Wake up everything for the final sprint!
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
645 for (i = 0; i < p->tsize; i++)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
646 if (p->t_stack[i])
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
647 pthread_cond_signal(&p->t[i].pending_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
648
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
649 while (p->njobs || p->nwaiting != p->tsize)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
650 pthread_cond_wait(&p->empty_c, &p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
651
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
652 pthread_mutex_unlock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
653
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
654 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
655 fprintf(stderr, "Flushed complete for pool %p, njobs=%d, nwaiting=%d\n",
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
656 p, p->njobs, p->nwaiting);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
657 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
658
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
659 return 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
660 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
661
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
662 /*
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
663 * Destroys a thread pool. If 'kill' is true the threads are terminated now,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
664 * otherwise they are joined into the main thread so they will finish their
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
665 * current work load.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
666 *
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
667 * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
668 * t_pool_destroy(p,1) to quickly exit after a fatal error.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
669 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
670 void t_pool_destroy(t_pool *p, int kill) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
671 int i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
672
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
673 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
674 fprintf(stderr, "Destroying pool %p, kill=%d\n", p, kill);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
675 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
676
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
677 /* Send shutdown message to worker threads */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
678 if (!kill) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
679 pthread_mutex_lock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
680 p->shutdown = 1;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
681
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
682 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
683 fprintf(stderr, "Sending shutdown request\n");
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
684 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
685
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
686 #ifdef IN_ORDER
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
687 for (i = 0; i < p->tsize; i++)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
688 pthread_cond_signal(&p->t[i].pending_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
689 #else
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
690 pthread_cond_broadcast(&p->pending_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
691 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
692 pthread_mutex_unlock(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
693
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
694 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
695 fprintf(stderr, "Shutdown complete\n");
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
696 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
697 for (i = 0; i < p->tsize; i++)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
698 pthread_join(p->t[i].tid, NULL);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
699 } else {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
700 for (i = 0; i < p->tsize; i++)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
701 pthread_kill(p->t[i].tid, SIGINT);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
702 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
703
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
704 pthread_mutex_destroy(&p->pool_m);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
705 pthread_cond_destroy(&p->empty_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
706 pthread_cond_destroy(&p->full_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
707 #ifdef IN_ORDER
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
708 for (i = 0; i < p->tsize; i++)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
709 pthread_cond_destroy(&p->t[i].pending_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
710 #else
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
711 pthread_cond_destroy(&p->pending_c);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
712 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
713
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
714 #ifdef DEBUG_TIME
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
715 fprintf(stderr, "Total time=%f\n", p->total_time / 1000000.0);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
716 fprintf(stderr, "Wait time=%f\n", p->wait_time / 1000000.0);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
717 fprintf(stderr, "%d%% utilisation\n",
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
718 (int)(100 - ((100.0 * p->wait_time) / p->total_time + 0.5)));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
719 for (i = 0; i < p->tsize; i++)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
720 fprintf(stderr, "%d: Wait time=%f\n", i,
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
721 p->t[i].wait_time / 1000000.0);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
722 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
723
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
724 if (p->t_stack)
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
725 free(p->t_stack);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
726
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
727 free(p->t);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
728 free(p);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
729
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
730 #ifdef DEBUG
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
731 fprintf(stderr, "Destroyed pool %p\n", p);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
732 #endif
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
733 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
734
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
735
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
736 /*-----------------------------------------------------------------------------
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
737 * Test app.
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
738 */
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
739
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
740 #ifdef TEST_MAIN
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
741
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
742 #include <stdio.h>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
743 #include <math.h>
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
744
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
745 void *doit(void *arg) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
746 int i, k, x = 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
747 int job = *(int *)arg;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
748 int *res;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
749
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
750 printf("Worker: execute job %d\n", job);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
751
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
752 usleep(random() % 1000000); // to coerce job completion out of order
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
753 if (0) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
754 for (k = 0; k < 100; k++) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
755 for (i = 0; i < 100000; i++) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
756 x++;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
757 x += x * sin(i);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
758 x += x * cos(x);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
759 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
760 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
761 x *= 100;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
762 x += job;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
763 } else {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
764 x = job*job;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
765 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
766
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
767 printf("Worker: job %d terminating, x=%d\n", job, x);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
768
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
769 free(arg);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
770
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
771 res = malloc(sizeof(*res));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
772 *res = x;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
773
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
774 return res;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
775 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
776
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
777 #define NTHREADS 8
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
778
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
779 int main(int argc, char **argv) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
780 t_pool *p = t_pool_init(NTHREADS*2, NTHREADS);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
781 t_results_queue *q = t_results_queue_init();
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
782 int i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
783 t_pool_result *r;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
784
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
785 // Dispatch jobs
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
786 for (i = 0; i < 20; i++) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
787 int *ip = malloc(sizeof(*ip));
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
788 *ip = i;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
789 printf("Submitting %d\n", i);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
790 t_pool_dispatch(p, q, doit, ip);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
791
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
792 // Check for results
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
793 if ((r = t_pool_next_result(q))) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
794 printf("RESULT: %d\n", *(int *)r->data);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
795 t_pool_delete_result(r, 1);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
796 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
797 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
798
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
799 t_pool_flush(p);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
800
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
801 while ((r = t_pool_next_result(q))) {
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
802 printf("RESULT: %d\n", *(int *)r->data);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
803 t_pool_delete_result(r, 1);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
804 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
805
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
806 t_pool_destroy(p, 0);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
807 t_results_queue_destroy(q);
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
808
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
809 return 0;
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
810 }
dfa3745e5fd8 Uploaded
youngkim
parents:
diff changeset
811 #endif