Mercurial > repos > petr-novak > re_utils
comparison parallel.py @ 0:a4cd8608ef6b draft
Uploaded
author | petr-novak |
---|---|
date | Mon, 01 Apr 2019 07:56:36 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:a4cd8608ef6b |
---|---|
1 #!/usr/bin/env python3 | |
2 import multiprocessing | |
3 import os | |
4 import time | |
5 from itertools import cycle | |
6 ''' | |
7 functions for parallel processing of data chunks using worker function | |
8 ''' | |
9 | |
10 | |
11 def run_multiple_pbs_jobs(cmds, status_files, qsub_params=""): | |
12 ''' | |
13 Example of pbs_params: | |
14 -l walltime=1000:00:00,nodes=1:ppn=8,mem=15G | |
15 -l walltime=150:00:00,nodes=1:ppn=1 | |
16 | |
17 ''' | |
18 jobs = [] | |
19 status_function = [] | |
20 status_command = [] | |
21 for cmd, sf in zip(cmds, status_files): | |
22 jobs.append(pbs_send_job(cmd, sf, qsub_params)) | |
23 for p in jobs: | |
24 p.join() | |
25 status_function.append(p.exitcode) | |
26 # collect pbs run status | |
27 for sf in status_files: | |
28 with open(sf) as f: | |
29 status_command.append(f.read().strip()) | |
30 status = {'function': status_function, 'command': status_command} | |
31 return status | |
32 | |
33 | |
34 def pbs_send_job(cmd, status_file, qsub_params): | |
35 ''' send job to pbs cluster, require status file''' | |
36 p = multiprocessing.Process(target=pbs_run, | |
37 args=(cmd, status_file, qsub_params)) | |
38 p.start() | |
39 return p | |
40 | |
41 | |
42 def pbs_run(cmd, status_file, qsub_params): | |
43 ''' | |
44 run shell command cmd on pbs cluster, wait for job to finish | |
45 and return status | |
46 ''' | |
47 print(status_file) | |
48 error_file = status_file + ".e" | |
49 # test if writable | |
50 try: | |
51 f = open(status_file, 'w').close() | |
52 f = open(error_file, 'w').close() | |
53 except IOError: | |
54 print("cannot write to status files, make sure path exists") | |
55 raise IOError | |
56 | |
57 if os.path.exists(status_file): | |
58 print("removing old status file") | |
59 os.remove(status_file) | |
60 cmd_full = ("echo '{cmd} && echo \"OK\" > {status_file} || echo \"ERROR\"" | |
61 " > {status_file}' | qsub -e {err}" | |
62 " {qsub_params} ").format(cmd=cmd, status_file=status_file, | |
63 err=error_file, | |
64 qsub_params=qsub_params) | |
65 os.system(cmd_full) | |
66 | |
67 while True: | |
68 if os.path.exists(status_file): | |
69 break | |
70 else: | |
71 time.sleep(3) | |
72 with open(status_file) as f: | |
73 status = f.read().strip() | |
74 return status | |
75 | |
76 | |
77 def spawn(f): | |
78 def fun(pipe, x): | |
79 pipe.send(f(x)) | |
80 pipe.close() | |
81 return fun | |
82 | |
83 | |
84 def get_max_proc(): | |
85 '''Number of cpu to ise in ether get from config.py is available or | |
86 from global PROC or from environment variable PRCO or set to system max''' | |
87 try: | |
88 from config import PROC as max_proc | |
89 except ImportError: | |
90 if "PROC" in globals(): | |
91 max_proc = PROC | |
92 elif "PROC" in os.environ: | |
93 max_proc = int(os.environ["PROC"]) | |
94 | |
95 else: | |
96 max_proc = multiprocessing.cpu_count() | |
97 return max_proc | |
98 | |
99 | |
100 def parmap2(f, X, groups, ppn): | |
101 max_proc = get_max_proc() | |
102 print("running in parallel using ", max_proc, "cpu(s)") | |
103 process_pool = [] | |
104 output = [None] * len(X) | |
105 # prepare processes | |
106 for x, index in zip(X, list(range(len(X)))): | |
107 # status: | |
108 # 0: waiting, 1: running, 2:collected | |
109 process_pool.append({ | |
110 'status': 0, | |
111 'proc': None, | |
112 'pipe': None, | |
113 'index': index, | |
114 'group': groups[index], | |
115 'ppn': ppn[index] | |
116 | |
117 }) | |
118 | |
119 # run processes | |
120 running = 0 | |
121 finished = 0 | |
122 sleep_time = 0.001 | |
123 while True: | |
124 # count alive processes | |
125 if not sleep_time: | |
126 sleep_time = 0.001 | |
127 for i in process_pool: | |
128 if i['status'] == 1 and not (i['proc'].exitcode is None): | |
129 sleep_time = 0.0 | |
130 # was running now finished --> collect | |
131 i['status'] = 2 | |
132 running -= 1 | |
133 finished += 1 | |
134 output[i['index']] = collect(i['proc'], i['pipe']) | |
135 del i['pipe'] | |
136 del i['proc'] | |
137 if i['status'] == 0 and running < max_proc: | |
138 # waiting and free --> run | |
139 # check if this group can be run | |
140 running_groups = [pp['group'] | |
141 for pp in process_pool if pp['status'] == 1] | |
142 # check max load of concurent runs: | |
143 current_load = sum([pp['ppn'] | |
144 for pp in process_pool if pp['status'] == 1]) | |
145 cond1 = (i['ppn'] + current_load) <= 1 | |
146 cond2 = not i['group'] in running_groups | |
147 if cond1 and cond2: | |
148 sleep_time = 0.0 | |
149 try: | |
150 i['pipe'] = multiprocessing.Pipe() | |
151 except OSError as e: | |
152 print('exception occured:',e) | |
153 continue | |
154 i['proc'] = multiprocessing.Process( | |
155 target=spawn(f), | |
156 args=(i['pipe'][1], X[i['index']]), | |
157 name=str(i['index'])) | |
158 i['proc'].start() | |
159 i['status'] = 1 | |
160 running += 1 | |
161 if finished == len(process_pool): | |
162 break | |
163 if sleep_time: | |
164 # sleep only if nothing changed in the last cycle | |
165 time.sleep(sleep_time) | |
166 # sleep time gradually increase to 1 sec | |
167 sleep_time = min(2 * sleep_time, 1) | |
168 return output | |
169 | |
170 | |
171 def print_status(pp): | |
172 states = ['waiting', 'running', 'collected'] | |
173 print("___________________________________") | |
174 print("jobid status group ppn exitcode") | |
175 print("===================================") | |
176 for i in pp: | |
177 print( | |
178 i['index'], " ", | |
179 states[i['status']], " ", | |
180 i['group'], " ", | |
181 i['ppn'], " ", | |
182 i['proc'].exitcode | |
183 ) | |
184 | |
185 | |
186 def collect(pf, pp): | |
187 if pf.pid and not pf.exitcode and not pf.is_alive(): | |
188 returnvalue = pp[0].recv() | |
189 pf.join() | |
190 pp[0].close() | |
191 pp[1].close() | |
192 return returnvalue | |
193 elif pf.exitcode: | |
194 print("job finished with exit code {}".format(pf.exitcode)) | |
195 pf.join() | |
196 pp[0].close() | |
197 pp[1].close() | |
198 return None | |
199 # return None | |
200 else: | |
201 raise Exception('not collected') | |
202 | |
203 | |
204 def parmap(f, X): | |
205 | |
206 max_proc = get_max_proc() | |
207 | |
208 pipe = [] | |
209 proc = [] | |
210 returnvalue = {} | |
211 | |
212 for x, index in zip(X, list(range(len(X)))): | |
213 pipe.append(multiprocessing.Pipe()) | |
214 proc.append(multiprocessing.Process(target=spawn(f), | |
215 args=(pipe[-1][1], x), name=str(index))) | |
216 p = proc[-1] | |
217 # count alive processes | |
218 while True: | |
219 running = 0 | |
220 for i in proc: | |
221 if i.is_alive(): | |
222 running += 1 | |
223 # print "running:"+str(running) | |
224 if running < max_proc: | |
225 break | |
226 else: | |
227 time.sleep(0.1) | |
228 p.start() | |
229 # print "process started:"+str(p.pid) | |
230 # check for finished | |
231 | |
232 for pf, pp, index in zip(proc, pipe, range(len(pipe))): | |
233 if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue): | |
234 pf.join() | |
235 returnvalue[str(pf.name)] = pp[0].recv() | |
236 pp[0].close() | |
237 pp[1].close() | |
238 # proc must be garbage collected - to free all file connection | |
239 del proc[index] | |
240 del pipe[index] | |
241 | |
242 # collect the rest: | |
243 [pf.join() for pf in proc] | |
244 for pf, pp in zip(proc, pipe): | |
245 if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue): | |
246 returnvalue[str(pf.name)] = pp[0].recv() | |
247 pp[0].close() | |
248 pp[1].close() | |
249 # convert to list in input correct order | |
250 returnvalue = [returnvalue[str(i)] for i in range(len(X))] | |
251 return returnvalue | |
252 | |
253 | |
254 def parallel2(command, *args, groups=None, ppn=None): | |
255 ''' same as parallel but groups are used to identifie mutually | |
256 exclusive jobs, jobs with the same goup id are never run together | |
257 ppn params is 'load' of the job - sum of loads cannot exceed 1 | |
258 ''' | |
259 # check args, expand if necessary | |
260 args = list(args) | |
261 N = [len(i) for i in args] # lengths of lists | |
262 Mx = max(N) | |
263 if len(set(N)) == 1: | |
264 # all good | |
265 pass | |
266 elif set(N) == set([1, Mx]): | |
267 # expand args of length 1 | |
268 for i in range(len(args)): | |
269 if len(args[i]) == 1: | |
270 args[i] = args[i] * Mx | |
271 else: | |
272 raise ValueError | |
273 if not groups: | |
274 groups = range(Mx) | |
275 elif len(groups) != Mx: | |
276 print("length of groups must be same as number of job or None") | |
277 raise ValueError | |
278 | |
279 if not ppn: | |
280 ppn = [0] * Mx | |
281 elif len(ppn) != Mx: | |
282 print("length of ppn must be same as number of job or None") | |
283 raise ValueError | |
284 elif max(ppn) > 1 and min(ppn): | |
285 print("ppn values must be in 0 - 1 range") | |
286 raise ValueError | |
287 # convert argument to suitable format - 'transpose' | |
288 argsTuples = list(zip(*args)) | |
289 args = [list(i) for i in argsTuples] | |
290 | |
291 # multiprocessing.Pool() | |
292 | |
293 def command_star(args): | |
294 return(command(*args)) | |
295 | |
296 x = parmap2(command_star, argsTuples, groups, ppn) | |
297 return x | |
298 | |
299 | |
300 def parallel(command, *args): | |
301 ''' Execute command in parallel using multiprocessing | |
302 command is the function to be executed | |
303 args is list of list of arguments | |
304 execution is : | |
305 command(args[0][0],args[1][0],args[2][0],args[3][0],....) | |
306 command(args[0][1],args[1][1],args[2][1],args[3][1],....) | |
307 command(args[0][2],args[1][2],args[2][2],args[3][2],....) | |
308 ... | |
309 output of command is returned as list | |
310 ''' | |
311 # check args, expand if necessary | |
312 args = list(args) | |
313 N = [len(i) for i in args] # lengths of lists | |
314 Mx = max(N) | |
315 if len(set(N)) == 1: | |
316 # all good | |
317 pass | |
318 elif set(N) == set([1, Mx]): | |
319 # expand args of length 1 | |
320 for i in range(len(args)): | |
321 if len(args[i]) == 1: | |
322 args[i] = args[i] * Mx | |
323 else: | |
324 raise ValueError | |
325 | |
326 # convert argument to suitable format - 'transpose' | |
327 argsTuples = list(zip(*args)) | |
328 args = [list(i) for i in argsTuples] | |
329 | |
330 multiprocessing.Pool() | |
331 | |
332 def command_star(args): | |
333 return(command(*args)) | |
334 | |
335 x = parmap(command_star, argsTuples) | |
336 return x | |
337 | |
338 | |
339 def worker(*a): | |
340 x = 0 | |
341 y = 0 | |
342 for i in a: | |
343 if i == 1.1: | |
344 print("raising exception") | |
345 s = 1 / 0 | |
346 y += i | |
347 for j in range(10): | |
348 x += i | |
349 for j in range(100000): | |
350 x = 1.0 / (float(j) + 1.0) | |
351 return(y) | |
352 | |
353 # test | |
354 if __name__ == "__main__": | |
355 # x = parallel2(worker, [1], [2], [3], [4], [1], [1, 2, 3, 7, 10, 1.1, 20, 30, 40, 10, 30, 20, 40, 50, 50], [ | |
356 # 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2]) | |
357 | |
358 x = parallel2( | |
359 worker, [1], [2], [3], [4], [1], | |
360 [1, 2, 3, 7, 10, 1.2, 20, 30, 40, 10, 30, 20, 40, 50, 50], | |
361 [3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2], | |
362 groups=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], | |
363 ppn=[0.6, 0.6, 0.2, 0.6, 0.2, 0.2, 0.4, | |
364 0.1, 0.1, 0.3, 0.3, 0.3, 0.1, 0.1, 0.1] | |
365 ) | |
366 print(x) |