Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/planemo/galaxy/activity.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 """Module provides generic interface to running Galaxy tools and workflows.""" | |
2 | |
3 import json | |
4 import os | |
5 import tempfile | |
6 import time | |
7 | |
8 import bioblend | |
9 import requests | |
10 import yaml | |
11 from bioblend.galaxy.client import Client | |
12 from bioblend.util import attach_file | |
13 from galaxy.tool_util.cwl.util import ( | |
14 DirectoryUploadTarget, | |
15 FileUploadTarget, | |
16 galactic_job_json, | |
17 invocation_to_output, | |
18 output_properties, | |
19 output_to_cwl_json, | |
20 path_or_uri_to_uri, | |
21 tool_response_to_output, | |
22 ) | |
23 from galaxy.tool_util.parser import get_tool_source | |
24 from galaxy.util import ( | |
25 safe_makedirs, | |
26 unicodify, | |
27 ) | |
28 | |
29 from planemo.galaxy.api import summarize_history | |
30 from planemo.io import wait_on | |
31 from planemo.runnable import ( | |
32 ErrorRunResponse, | |
33 get_outputs, | |
34 RunnableType, | |
35 SuccessfulRunResponse, | |
36 ) | |
37 | |
38 DEFAULT_HISTORY_NAME = "CWL Target History" | |
39 ERR_NO_SUCH_TOOL = ("Failed to find tool with ID [%s] in Galaxy - cannot execute job. " | |
40 "You may need to enable verbose logging and determine why the tool did not load. [%s]") | |
41 | |
42 | |
43 def execute(ctx, config, runnable, job_path, **kwds): | |
44 """Execute a Galaxy activity.""" | |
45 try: | |
46 return _execute(ctx, config, runnable, job_path, **kwds) | |
47 except Exception as e: | |
48 return ErrorRunResponse(unicodify(e)) | |
49 | |
50 | |
51 def _verified_tool_id(runnable, user_gi): | |
52 tool_id = _tool_id(runnable.path) | |
53 try: | |
54 user_gi.tools.show_tool(tool_id) | |
55 except Exception as e: | |
56 raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e)) | |
57 return tool_id | |
58 | |
59 | |
60 def _inputs_representation(runnable): | |
61 if runnable.type == RunnableType.cwl_tool: | |
62 inputs_representation = "cwl" | |
63 else: | |
64 inputs_representation = "galaxy" | |
65 return inputs_representation | |
66 | |
67 | |
68 def log_contents_str(config): | |
69 if hasattr(config, "log_contents"): | |
70 return config.log_contents | |
71 else: | |
72 return "No log for this engine type." | |
73 | |
74 | |
75 def _execute(ctx, config, runnable, job_path, **kwds): | |
76 user_gi = config.user_gi | |
77 admin_gi = config.gi | |
78 | |
79 history_id = _history_id(user_gi, **kwds) | |
80 | |
81 job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds) | |
82 | |
83 if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]: | |
84 response_class = GalaxyToolRunResponse | |
85 tool_id = _verified_tool_id(runnable, user_gi) | |
86 inputs_representation = _inputs_representation(runnable) | |
87 run_tool_payload = dict( | |
88 history_id=history_id, | |
89 tool_id=tool_id, | |
90 inputs=job_dict, | |
91 inputs_representation=inputs_representation, | |
92 ) | |
93 ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload) | |
94 tool_run_response = user_gi.tools._post(run_tool_payload) | |
95 | |
96 job = tool_run_response["jobs"][0] | |
97 job_id = job["id"] | |
98 try: | |
99 final_state = _wait_for_job(user_gi, job_id) | |
100 except Exception: | |
101 summarize_history(ctx, user_gi, history_id) | |
102 raise | |
103 if final_state != "ok": | |
104 msg = "Failed to run CWL tool job final job state is [%s]." % final_state | |
105 summarize_history(ctx, user_gi, history_id) | |
106 with open("errored_galaxy.log", "w") as f: | |
107 f.write(log_contents_str(config)) | |
108 raise Exception(msg) | |
109 | |
110 ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id) | |
111 job_info = admin_gi.jobs.show_job(job_id) | |
112 response_kwds = { | |
113 'job_info': job_info, | |
114 'api_run_response': tool_run_response, | |
115 } | |
116 if ctx.verbose: | |
117 summarize_history(ctx, user_gi, history_id) | |
118 elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]: | |
119 response_class = GalaxyWorkflowRunResponse | |
120 workflow_id = config.workflow_id(runnable.path) | |
121 ctx.vlog("Found Galaxy workflow ID [%s] for path [%s]" % (workflow_id, runnable.path)) | |
122 # TODO: update bioblend to allow inputs_by. | |
123 # invocation = user_gi.worklfows.invoke_workflow( | |
124 # workflow_id, | |
125 # history_id=history_id, | |
126 # inputs=job_dict, | |
127 # ) | |
128 payload = dict( | |
129 workflow_id=workflow_id, | |
130 history_id=history_id, | |
131 inputs=job_dict, | |
132 inputs_by="name", | |
133 allow_tool_state_corrections=True, | |
134 ) | |
135 invocations_url = "%s/%s/invocations" % ( | |
136 user_gi._make_url(user_gi.workflows), | |
137 workflow_id, | |
138 ) | |
139 invocation = Client._post(user_gi.workflows, payload, url=invocations_url) | |
140 invocation_id = invocation["id"] | |
141 ctx.vlog("Waiting for invocation [%s]" % invocation_id) | |
142 polling_backoff = kwds.get("polling_backoff", 0) | |
143 try: | |
144 final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff) | |
145 except Exception: | |
146 ctx.vlog("Problem waiting on invocation...") | |
147 summarize_history(ctx, user_gi, history_id) | |
148 raise | |
149 ctx.vlog("Final invocation state is [%s]" % final_invocation_state) | |
150 final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff) | |
151 if final_state != "ok": | |
152 msg = "Failed to run workflow final history state is [%s]." % final_state | |
153 summarize_history(ctx, user_gi, history_id) | |
154 with open("errored_galaxy.log", "w") as f: | |
155 f.write(log_contents_str(config)) | |
156 raise Exception(msg) | |
157 ctx.vlog("Final history state is 'ok'") | |
158 response_kwds = { | |
159 'workflow_id': workflow_id, | |
160 'invocation_id': invocation_id, | |
161 } | |
162 else: | |
163 raise NotImplementedError() | |
164 | |
165 run_response = response_class( | |
166 ctx=ctx, | |
167 runnable=runnable, | |
168 user_gi=user_gi, | |
169 history_id=history_id, | |
170 log=log_contents_str(config), | |
171 **response_kwds | |
172 ) | |
173 output_directory = kwds.get("output_directory", None) | |
174 ctx.vlog("collecting outputs from run...") | |
175 run_response.collect_outputs(ctx, output_directory) | |
176 ctx.vlog("collecting outputs complete") | |
177 return run_response | |
178 | |
179 | |
180 def stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds): | |
181 files_attached = [False] | |
182 | |
183 def upload_func(upload_target): | |
184 | |
185 def _attach_file(upload_payload, uri, index=0): | |
186 uri = path_or_uri_to_uri(uri) | |
187 is_path = uri.startswith("file://") | |
188 if not is_path or config.use_path_paste: | |
189 upload_payload["inputs"]["files_%d|url_paste" % index] = uri | |
190 else: | |
191 files_attached[0] = True | |
192 path = uri[len("file://"):] | |
193 upload_payload["files_%d|file_data" % index] = attach_file(path) | |
194 | |
195 if isinstance(upload_target, FileUploadTarget): | |
196 file_path = upload_target.path | |
197 upload_payload = user_gi.tools._upload_payload( | |
198 history_id, | |
199 file_type=upload_target.properties.get('filetype', None) or "auto", | |
200 ) | |
201 name = os.path.basename(file_path) | |
202 upload_payload["inputs"]["files_0|auto_decompress"] = False | |
203 upload_payload["inputs"]["auto_decompress"] = False | |
204 _attach_file(upload_payload, file_path) | |
205 upload_payload["inputs"]["files_0|NAME"] = name | |
206 if upload_target.secondary_files: | |
207 _attach_file(upload_payload, upload_target.secondary_files, index=1) | |
208 upload_payload["inputs"]["files_1|type"] = "upload_dataset" | |
209 upload_payload["inputs"]["files_1|auto_decompress"] = True | |
210 upload_payload["inputs"]["file_count"] = "2" | |
211 upload_payload["inputs"]["force_composite"] = "True" | |
212 | |
213 ctx.vlog("upload_payload is %s" % upload_payload) | |
214 return user_gi.tools._post(upload_payload, files_attached=files_attached[0]) | |
215 elif isinstance(upload_target, DirectoryUploadTarget): | |
216 tar_path = upload_target.tar_path | |
217 | |
218 upload_payload = user_gi.tools._upload_payload( | |
219 history_id, | |
220 file_type="tar", | |
221 ) | |
222 upload_payload["inputs"]["files_0|auto_decompress"] = False | |
223 _attach_file(upload_payload, tar_path) | |
224 tar_upload_response = user_gi.tools._post(upload_payload, files_attached=files_attached[0]) | |
225 convert_response = user_gi.tools.run_tool( | |
226 tool_id="CONVERTER_tar_to_directory", | |
227 tool_inputs={"input1": {"src": "hda", "id": tar_upload_response["outputs"][0]["id"]}}, | |
228 history_id=history_id, | |
229 ) | |
230 assert "outputs" in convert_response, convert_response | |
231 return convert_response | |
232 else: | |
233 content = json.dumps(upload_target.object) | |
234 return user_gi.tools.paste_content( | |
235 content, | |
236 history_id, | |
237 file_type="expression.json", | |
238 ) | |
239 | |
240 def create_collection_func(element_identifiers, collection_type): | |
241 payload = { | |
242 "name": "dataset collection", | |
243 "instance_type": "history", | |
244 "history_id": history_id, | |
245 "element_identifiers": element_identifiers, | |
246 "collection_type": collection_type, | |
247 "fields": None if collection_type != "record" else "auto", | |
248 } | |
249 dataset_collections_url = user_gi.url + "/dataset_collections" | |
250 dataset_collection = Client._post(user_gi.histories, payload, url=dataset_collections_url) | |
251 return dataset_collection | |
252 | |
253 with open(job_path, "r") as f: | |
254 job = yaml.safe_load(f) | |
255 | |
256 # Figure out what "." should be here instead. | |
257 job_dir = os.path.dirname(job_path) | |
258 job_dict, datasets = galactic_job_json( | |
259 job, | |
260 job_dir, | |
261 upload_func, | |
262 create_collection_func, | |
263 tool_or_workflow="tool" if runnable.type in [RunnableType.cwl_tool, RunnableType.galaxy_tool] else "workflow", | |
264 ) | |
265 | |
266 if datasets: | |
267 final_state = _wait_for_history(ctx, user_gi, history_id) | |
268 | |
269 for (dataset, path) in datasets: | |
270 dataset_details = user_gi.histories.show_dataset( | |
271 history_id, | |
272 dataset["id"], | |
273 ) | |
274 ctx.vlog("Uploaded dataset for path [%s] with metadata [%s]" % (path, dataset_details)) | |
275 else: | |
276 # Mark uploads as ok because nothing to do. | |
277 final_state = "ok" | |
278 | |
279 ctx.vlog("final state is %s" % final_state) | |
280 if final_state != "ok": | |
281 msg = "Failed to run job final job state is [%s]." % final_state | |
282 summarize_history(ctx, user_gi, history_id) | |
283 with open("errored_galaxy.log", "w") as f: | |
284 f.write(log_contents_str(config)) | |
285 raise Exception(msg) | |
286 | |
287 return job_dict, datasets | |
288 | |
289 | |
290 class GalaxyBaseRunResponse(SuccessfulRunResponse): | |
291 | |
292 def __init__( | |
293 self, | |
294 ctx, | |
295 runnable, | |
296 user_gi, | |
297 history_id, | |
298 log, | |
299 ): | |
300 self._ctx = ctx | |
301 self._runnable = runnable | |
302 self._user_gi = user_gi | |
303 self._history_id = history_id | |
304 self._log = log | |
305 | |
306 self._job_info = None | |
307 | |
308 self._outputs_dict = None | |
309 | |
310 def to_galaxy_output(self, output): | |
311 """Convert runnable output to a GalaxyOutput object. | |
312 | |
313 Subclasses for workflow and tool execution override this. | |
314 """ | |
315 raise NotImplementedError() | |
316 | |
317 def _get_extra_files(self, dataset_details): | |
318 extra_files_url = "%s/%s/contents/%s/extra_files" % ( | |
319 self._user_gi._make_url(self._user_gi.histories), self._history_id, dataset_details["id"] | |
320 ) | |
321 extra_files = Client._get(self._user_gi.jobs, url=extra_files_url) | |
322 return extra_files | |
323 | |
324 def _get_metadata(self, history_content_type, content_id): | |
325 if history_content_type == "dataset": | |
326 return self._user_gi.histories.show_dataset( | |
327 self._history_id, | |
328 content_id, | |
329 ) | |
330 elif history_content_type == "dataset_collection": | |
331 return self._user_gi.histories.show_dataset_collection( | |
332 self._history_id, | |
333 content_id, | |
334 ) | |
335 else: | |
336 raise Exception("Unknown history content type encountered [%s]" % history_content_type) | |
337 | |
338 def collect_outputs(self, ctx, output_directory): | |
339 assert self._outputs_dict is None, "collect_outputs pre-condition violated" | |
340 | |
341 outputs_dict = {} | |
342 if not output_directory: | |
343 # TODO: rather than creating a directory just use | |
344 # Galaxy paths if they are available in this | |
345 # configuration. | |
346 output_directory = tempfile.mkdtemp() | |
347 | |
348 def get_dataset(dataset_details, filename=None): | |
349 parent_basename = dataset_details.get("cwl_file_name") | |
350 if not parent_basename: | |
351 parent_basename = dataset_details.get("name") | |
352 file_ext = dataset_details["file_ext"] | |
353 if file_ext == "directory": | |
354 # TODO: rename output_directory to outputs_directory because we can have output directories | |
355 # and this is confusing... | |
356 the_output_directory = os.path.join(output_directory, parent_basename) | |
357 safe_makedirs(the_output_directory) | |
358 destination = self.download_output_to(dataset_details, the_output_directory, filename=filename) | |
359 else: | |
360 destination = self.download_output_to(dataset_details, output_directory, filename=filename) | |
361 if filename is None: | |
362 basename = parent_basename | |
363 else: | |
364 basename = os.path.basename(filename) | |
365 | |
366 return {"path": destination, "basename": basename} | |
367 | |
368 ctx.vlog("collecting outputs to directory %s" % output_directory) | |
369 for runnable_output in get_outputs(self._runnable): | |
370 output_id = runnable_output.get_id() | |
371 if not output_id: | |
372 ctx.vlog("Workflow output identified without an ID (label), skipping") | |
373 continue | |
374 output_dict_value = None | |
375 if self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool]: | |
376 galaxy_output = self.to_galaxy_output(runnable_output) | |
377 cwl_output = output_to_cwl_json( | |
378 galaxy_output, | |
379 self._get_metadata, | |
380 get_dataset, | |
381 self._get_extra_files, | |
382 pseduo_location=True, | |
383 ) | |
384 output_dict_value = cwl_output | |
385 else: | |
386 # TODO: deprecate this route for finding workflow outputs, | |
387 # it is a brittle and bad approach... | |
388 output_dataset_id = self.output_dataset_id(runnable_output) | |
389 dataset = self._get_metadata("dataset", output_dataset_id) | |
390 dataset_dict = get_dataset(dataset) | |
391 ctx.vlog("populated destination [%s]" % dataset_dict["path"]) | |
392 | |
393 if dataset["file_ext"] == "expression.json": | |
394 with open(dataset_dict["path"], "r") as f: | |
395 output_dict_value = json.load(f) | |
396 else: | |
397 output_dict_value = output_properties(**dataset_dict) | |
398 | |
399 outputs_dict[output_id] = output_dict_value | |
400 | |
401 self._outputs_dict = outputs_dict | |
402 ctx.vlog("collected outputs [%s]" % self._outputs_dict) | |
403 | |
404 @property | |
405 def log(self): | |
406 return self._log | |
407 | |
408 @property | |
409 def job_info(self): | |
410 if self._job_info is not None: | |
411 return dict( | |
412 stdout=self._job_info["stdout"], | |
413 stderr=self._job_info["stderr"], | |
414 command_line=self._job_info["command_line"], | |
415 ) | |
416 return None | |
417 | |
418 @property | |
419 def outputs_dict(self): | |
420 return self._outputs_dict | |
421 | |
422 def download_output_to(self, dataset_details, output_directory, filename=None): | |
423 if filename is None: | |
424 local_filename = dataset_details.get("cwl_file_name") or dataset_details.get("name") | |
425 else: | |
426 local_filename = filename | |
427 destination = os.path.join(output_directory, local_filename) | |
428 self._history_content_download( | |
429 self._history_id, | |
430 dataset_details["id"], | |
431 to_path=destination, | |
432 filename=filename, | |
433 ) | |
434 return destination | |
435 | |
436 def _history_content_download(self, history_id, dataset_id, to_path, filename=None): | |
437 user_gi = self._user_gi | |
438 url = user_gi.url + "/histories/%s/contents/%s/display" % (history_id, dataset_id) | |
439 | |
440 data = {} | |
441 if filename: | |
442 data["filename"] = filename | |
443 | |
444 r = requests.get(url, params=data, verify=user_gi.verify, stream=True, timeout=user_gi.timeout) | |
445 r.raise_for_status() | |
446 | |
447 with open(to_path, 'wb') as fp: | |
448 for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE): | |
449 if chunk: | |
450 fp.write(chunk) | |
451 | |
452 | |
453 class GalaxyToolRunResponse(GalaxyBaseRunResponse): | |
454 | |
455 def __init__( | |
456 self, | |
457 ctx, | |
458 runnable, | |
459 user_gi, | |
460 history_id, | |
461 log, | |
462 job_info, | |
463 api_run_response, | |
464 ): | |
465 super(GalaxyToolRunResponse, self).__init__( | |
466 ctx=ctx, | |
467 runnable=runnable, | |
468 user_gi=user_gi, | |
469 history_id=history_id, | |
470 log=log, | |
471 ) | |
472 self._job_info = job_info | |
473 self.api_run_response = api_run_response | |
474 | |
475 def is_collection(self, output): | |
476 # TODO: Make this more rigorous - search both output and output | |
477 # collections - throw an exception if not found in either place instead | |
478 # of just assuming all non-datasets are collections. | |
479 return self.output_dataset_id(output) is None | |
480 | |
481 def to_galaxy_output(self, runnable_output): | |
482 output_id = runnable_output.get_id() | |
483 return tool_response_to_output(self.api_run_response, self._history_id, output_id) | |
484 | |
485 def output_dataset_id(self, output): | |
486 outputs = self.api_run_response["outputs"] | |
487 output_id = output.get_id() | |
488 output_dataset_id = None | |
489 self._ctx.vlog("Looking for id [%s] in outputs [%s]" % (output_id, outputs)) | |
490 for output in outputs: | |
491 if output["output_name"] == output_id: | |
492 output_dataset_id = output["id"] | |
493 | |
494 return output_dataset_id | |
495 | |
496 | |
497 class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse): | |
498 | |
499 def __init__( | |
500 self, | |
501 ctx, | |
502 runnable, | |
503 user_gi, | |
504 history_id, | |
505 log, | |
506 workflow_id, | |
507 invocation_id, | |
508 ): | |
509 super(GalaxyWorkflowRunResponse, self).__init__( | |
510 ctx=ctx, | |
511 runnable=runnable, | |
512 user_gi=user_gi, | |
513 history_id=history_id, | |
514 log=log, | |
515 ) | |
516 self._workflow_id = workflow_id | |
517 self._invocation_id = invocation_id | |
518 | |
519 def to_galaxy_output(self, runnable_output): | |
520 output_id = runnable_output.get_id() | |
521 self._ctx.vlog("checking for output in invocation [%s]" % self._invocation) | |
522 return invocation_to_output(self._invocation, self._history_id, output_id) | |
523 | |
524 def output_dataset_id(self, output): | |
525 invocation = self._invocation | |
526 if "outputs" in invocation: | |
527 # Use newer workflow outputs API. | |
528 | |
529 output_name = output.get_id() | |
530 if output_name in invocation["outputs"]: | |
531 return invocation["outputs"][output.get_id()]["id"] | |
532 else: | |
533 raise Exception("Failed to find output [%s] in invocation outputs [%s]" % (output_name, invocation["outputs"])) | |
534 else: | |
535 # Assume the output knows its order_index and such - older line of | |
536 # development not worth persuing. | |
537 workflow_output = output.workflow_output | |
538 order_index = workflow_output.order_index | |
539 | |
540 invocation_steps = invocation["steps"] | |
541 output_steps = [s for s in invocation_steps if s["order_index"] == order_index] | |
542 assert len(output_steps) == 1, "More than one step matching outputs, behavior undefined." | |
543 output_step = output_steps[0] | |
544 job_id = output_step["job_id"] | |
545 assert job_id, "Output doesn't define a job_id, behavior undefined." | |
546 job_info = self._user_gi.jobs.show_job(job_id, full_details=True) | |
547 job_outputs = job_info["outputs"] | |
548 output_name = workflow_output.output_name | |
549 assert output_name in job_outputs, "No output [%s] found for output job." | |
550 job_output = job_outputs[output_name] | |
551 assert "id" in job_output, "Job output [%s] does not contain 'id'." % job_output | |
552 return job_output["id"] | |
553 | |
554 @property | |
555 def _invocation(self): | |
556 invocation = self._user_gi.workflows.show_invocation( | |
557 self._workflow_id, | |
558 self._invocation_id, | |
559 ) | |
560 return invocation | |
561 | |
562 | |
563 def _tool_id(tool_path): | |
564 tool_source = get_tool_source(tool_path) | |
565 return tool_source.parse_id() | |
566 | |
567 | |
568 def _history_id(gi, **kwds): | |
569 history_id = kwds.get("history_id", None) | |
570 if history_id is None: | |
571 history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME) | |
572 history_id = gi.histories.create_history(history_name)["id"] | |
573 return history_id | |
574 | |
575 | |
576 def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0): | |
577 | |
578 def state_func(): | |
579 if _retry_on_timeouts(ctx, gi, lambda gi: has_jobs_in_states(gi, history_id, ["error", "deleted", "deleted_new"])): | |
580 raise Exception("Problem running workflow, one or more jobs failed.") | |
581 | |
582 return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id)) | |
583 | |
584 return _wait_on_state(state_func, polling_backoff) | |
585 | |
586 | |
587 def _retry_on_timeouts(ctx, gi, f): | |
588 gi.timeout = 60 | |
589 try_count = 5 | |
590 try: | |
591 for try_num in range(try_count): | |
592 start_time = time.time() | |
593 try: | |
594 return f(gi) | |
595 except Exception: | |
596 end_time = time.time() | |
597 if end_time - start_time > 45 and (try_num + 1) < try_count: | |
598 ctx.vlog("Galaxy seems to have timedout, retrying to fetch status.") | |
599 continue | |
600 else: | |
601 raise | |
602 finally: | |
603 gi.timeout = None | |
604 | |
605 | |
606 def has_jobs_in_states(gi, history_id, states): | |
607 params = {"history_id": history_id} | |
608 jobs_url = gi._make_url(gi.jobs) | |
609 jobs = Client._get(gi.jobs, params=params, url=jobs_url) | |
610 | |
611 target_jobs = [j for j in jobs if j["state"] in states] | |
612 | |
613 return len(target_jobs) > 0 | |
614 | |
615 | |
616 def _wait_for_history(ctx, gi, history_id, polling_backoff=0): | |
617 | |
618 def has_active_jobs(gi): | |
619 if has_jobs_in_states(gi, history_id, ["new", "upload", "waiting", "queued", "running"]): | |
620 return True | |
621 else: | |
622 return None | |
623 | |
624 timeout = 60 * 60 * 24 | |
625 wait_on(lambda: _retry_on_timeouts(ctx, gi, has_active_jobs), "active jobs", timeout, polling_backoff) | |
626 | |
627 def state_func(): | |
628 return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) | |
629 | |
630 return _wait_on_state(state_func, polling_backoff) | |
631 | |
632 | |
633 def _wait_for_job(gi, job_id): | |
634 def state_func(): | |
635 return gi.jobs.show_job(job_id, full_details=True) | |
636 | |
637 return _wait_on_state(state_func) | |
638 | |
639 | |
640 def _wait_on_state(state_func, polling_backoff=0): | |
641 | |
642 def get_state(): | |
643 response = state_func() | |
644 state = response["state"] | |
645 if str(state) not in ["running", "queued", "new", "ready"]: | |
646 return state | |
647 else: | |
648 return None | |
649 timeout = 60 * 60 * 24 | |
650 final_state = wait_on(get_state, "state", timeout, polling_backoff) | |
651 return final_state | |
652 | |
653 | |
654 __all__ = ( | |
655 "execute", | |
656 ) |