comparison env/lib/python3.7/site-packages/planemo/galaxy/activity.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """Module provides generic interface to running Galaxy tools and workflows."""
2
3 import json
4 import os
5 import tempfile
6 import time
7
8 import bioblend
9 import requests
10 import yaml
11 from bioblend.galaxy.client import Client
12 from bioblend.util import attach_file
13 from galaxy.tool_util.cwl.util import (
14 DirectoryUploadTarget,
15 FileUploadTarget,
16 galactic_job_json,
17 invocation_to_output,
18 output_properties,
19 output_to_cwl_json,
20 path_or_uri_to_uri,
21 tool_response_to_output,
22 )
23 from galaxy.tool_util.parser import get_tool_source
24 from galaxy.util import (
25 safe_makedirs,
26 unicodify,
27 )
28
29 from planemo.galaxy.api import summarize_history
30 from planemo.io import wait_on
31 from planemo.runnable import (
32 ErrorRunResponse,
33 get_outputs,
34 RunnableType,
35 SuccessfulRunResponse,
36 )
37
38 DEFAULT_HISTORY_NAME = "CWL Target History"
39 ERR_NO_SUCH_TOOL = ("Failed to find tool with ID [%s] in Galaxy - cannot execute job. "
40 "You may need to enable verbose logging and determine why the tool did not load. [%s]")
41
42
43 def execute(ctx, config, runnable, job_path, **kwds):
44 """Execute a Galaxy activity."""
45 try:
46 return _execute(ctx, config, runnable, job_path, **kwds)
47 except Exception as e:
48 return ErrorRunResponse(unicodify(e))
49
50
51 def _verified_tool_id(runnable, user_gi):
52 tool_id = _tool_id(runnable.path)
53 try:
54 user_gi.tools.show_tool(tool_id)
55 except Exception as e:
56 raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e))
57 return tool_id
58
59
60 def _inputs_representation(runnable):
61 if runnable.type == RunnableType.cwl_tool:
62 inputs_representation = "cwl"
63 else:
64 inputs_representation = "galaxy"
65 return inputs_representation
66
67
68 def log_contents_str(config):
69 if hasattr(config, "log_contents"):
70 return config.log_contents
71 else:
72 return "No log for this engine type."
73
74
75 def _execute(ctx, config, runnable, job_path, **kwds):
76 user_gi = config.user_gi
77 admin_gi = config.gi
78
79 history_id = _history_id(user_gi, **kwds)
80
81 job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds)
82
83 if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]:
84 response_class = GalaxyToolRunResponse
85 tool_id = _verified_tool_id(runnable, user_gi)
86 inputs_representation = _inputs_representation(runnable)
87 run_tool_payload = dict(
88 history_id=history_id,
89 tool_id=tool_id,
90 inputs=job_dict,
91 inputs_representation=inputs_representation,
92 )
93 ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload)
94 tool_run_response = user_gi.tools._post(run_tool_payload)
95
96 job = tool_run_response["jobs"][0]
97 job_id = job["id"]
98 try:
99 final_state = _wait_for_job(user_gi, job_id)
100 except Exception:
101 summarize_history(ctx, user_gi, history_id)
102 raise
103 if final_state != "ok":
104 msg = "Failed to run CWL tool job final job state is [%s]." % final_state
105 summarize_history(ctx, user_gi, history_id)
106 with open("errored_galaxy.log", "w") as f:
107 f.write(log_contents_str(config))
108 raise Exception(msg)
109
110 ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id)
111 job_info = admin_gi.jobs.show_job(job_id)
112 response_kwds = {
113 'job_info': job_info,
114 'api_run_response': tool_run_response,
115 }
116 if ctx.verbose:
117 summarize_history(ctx, user_gi, history_id)
118 elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]:
119 response_class = GalaxyWorkflowRunResponse
120 workflow_id = config.workflow_id(runnable.path)
121 ctx.vlog("Found Galaxy workflow ID [%s] for path [%s]" % (workflow_id, runnable.path))
122 # TODO: update bioblend to allow inputs_by.
123 # invocation = user_gi.worklfows.invoke_workflow(
124 # workflow_id,
125 # history_id=history_id,
126 # inputs=job_dict,
127 # )
128 payload = dict(
129 workflow_id=workflow_id,
130 history_id=history_id,
131 inputs=job_dict,
132 inputs_by="name",
133 allow_tool_state_corrections=True,
134 )
135 invocations_url = "%s/%s/invocations" % (
136 user_gi._make_url(user_gi.workflows),
137 workflow_id,
138 )
139 invocation = Client._post(user_gi.workflows, payload, url=invocations_url)
140 invocation_id = invocation["id"]
141 ctx.vlog("Waiting for invocation [%s]" % invocation_id)
142 polling_backoff = kwds.get("polling_backoff", 0)
143 try:
144 final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff)
145 except Exception:
146 ctx.vlog("Problem waiting on invocation...")
147 summarize_history(ctx, user_gi, history_id)
148 raise
149 ctx.vlog("Final invocation state is [%s]" % final_invocation_state)
150 final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff)
151 if final_state != "ok":
152 msg = "Failed to run workflow final history state is [%s]." % final_state
153 summarize_history(ctx, user_gi, history_id)
154 with open("errored_galaxy.log", "w") as f:
155 f.write(log_contents_str(config))
156 raise Exception(msg)
157 ctx.vlog("Final history state is 'ok'")
158 response_kwds = {
159 'workflow_id': workflow_id,
160 'invocation_id': invocation_id,
161 }
162 else:
163 raise NotImplementedError()
164
165 run_response = response_class(
166 ctx=ctx,
167 runnable=runnable,
168 user_gi=user_gi,
169 history_id=history_id,
170 log=log_contents_str(config),
171 **response_kwds
172 )
173 output_directory = kwds.get("output_directory", None)
174 ctx.vlog("collecting outputs from run...")
175 run_response.collect_outputs(ctx, output_directory)
176 ctx.vlog("collecting outputs complete")
177 return run_response
178
179
180 def stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds):
181 files_attached = [False]
182
183 def upload_func(upload_target):
184
185 def _attach_file(upload_payload, uri, index=0):
186 uri = path_or_uri_to_uri(uri)
187 is_path = uri.startswith("file://")
188 if not is_path or config.use_path_paste:
189 upload_payload["inputs"]["files_%d|url_paste" % index] = uri
190 else:
191 files_attached[0] = True
192 path = uri[len("file://"):]
193 upload_payload["files_%d|file_data" % index] = attach_file(path)
194
195 if isinstance(upload_target, FileUploadTarget):
196 file_path = upload_target.path
197 upload_payload = user_gi.tools._upload_payload(
198 history_id,
199 file_type=upload_target.properties.get('filetype', None) or "auto",
200 )
201 name = os.path.basename(file_path)
202 upload_payload["inputs"]["files_0|auto_decompress"] = False
203 upload_payload["inputs"]["auto_decompress"] = False
204 _attach_file(upload_payload, file_path)
205 upload_payload["inputs"]["files_0|NAME"] = name
206 if upload_target.secondary_files:
207 _attach_file(upload_payload, upload_target.secondary_files, index=1)
208 upload_payload["inputs"]["files_1|type"] = "upload_dataset"
209 upload_payload["inputs"]["files_1|auto_decompress"] = True
210 upload_payload["inputs"]["file_count"] = "2"
211 upload_payload["inputs"]["force_composite"] = "True"
212
213 ctx.vlog("upload_payload is %s" % upload_payload)
214 return user_gi.tools._post(upload_payload, files_attached=files_attached[0])
215 elif isinstance(upload_target, DirectoryUploadTarget):
216 tar_path = upload_target.tar_path
217
218 upload_payload = user_gi.tools._upload_payload(
219 history_id,
220 file_type="tar",
221 )
222 upload_payload["inputs"]["files_0|auto_decompress"] = False
223 _attach_file(upload_payload, tar_path)
224 tar_upload_response = user_gi.tools._post(upload_payload, files_attached=files_attached[0])
225 convert_response = user_gi.tools.run_tool(
226 tool_id="CONVERTER_tar_to_directory",
227 tool_inputs={"input1": {"src": "hda", "id": tar_upload_response["outputs"][0]["id"]}},
228 history_id=history_id,
229 )
230 assert "outputs" in convert_response, convert_response
231 return convert_response
232 else:
233 content = json.dumps(upload_target.object)
234 return user_gi.tools.paste_content(
235 content,
236 history_id,
237 file_type="expression.json",
238 )
239
240 def create_collection_func(element_identifiers, collection_type):
241 payload = {
242 "name": "dataset collection",
243 "instance_type": "history",
244 "history_id": history_id,
245 "element_identifiers": element_identifiers,
246 "collection_type": collection_type,
247 "fields": None if collection_type != "record" else "auto",
248 }
249 dataset_collections_url = user_gi.url + "/dataset_collections"
250 dataset_collection = Client._post(user_gi.histories, payload, url=dataset_collections_url)
251 return dataset_collection
252
253 with open(job_path, "r") as f:
254 job = yaml.safe_load(f)
255
256 # Figure out what "." should be here instead.
257 job_dir = os.path.dirname(job_path)
258 job_dict, datasets = galactic_job_json(
259 job,
260 job_dir,
261 upload_func,
262 create_collection_func,
263 tool_or_workflow="tool" if runnable.type in [RunnableType.cwl_tool, RunnableType.galaxy_tool] else "workflow",
264 )
265
266 if datasets:
267 final_state = _wait_for_history(ctx, user_gi, history_id)
268
269 for (dataset, path) in datasets:
270 dataset_details = user_gi.histories.show_dataset(
271 history_id,
272 dataset["id"],
273 )
274 ctx.vlog("Uploaded dataset for path [%s] with metadata [%s]" % (path, dataset_details))
275 else:
276 # Mark uploads as ok because nothing to do.
277 final_state = "ok"
278
279 ctx.vlog("final state is %s" % final_state)
280 if final_state != "ok":
281 msg = "Failed to run job final job state is [%s]." % final_state
282 summarize_history(ctx, user_gi, history_id)
283 with open("errored_galaxy.log", "w") as f:
284 f.write(log_contents_str(config))
285 raise Exception(msg)
286
287 return job_dict, datasets
288
289
290 class GalaxyBaseRunResponse(SuccessfulRunResponse):
291
292 def __init__(
293 self,
294 ctx,
295 runnable,
296 user_gi,
297 history_id,
298 log,
299 ):
300 self._ctx = ctx
301 self._runnable = runnable
302 self._user_gi = user_gi
303 self._history_id = history_id
304 self._log = log
305
306 self._job_info = None
307
308 self._outputs_dict = None
309
310 def to_galaxy_output(self, output):
311 """Convert runnable output to a GalaxyOutput object.
312
313 Subclasses for workflow and tool execution override this.
314 """
315 raise NotImplementedError()
316
317 def _get_extra_files(self, dataset_details):
318 extra_files_url = "%s/%s/contents/%s/extra_files" % (
319 self._user_gi._make_url(self._user_gi.histories), self._history_id, dataset_details["id"]
320 )
321 extra_files = Client._get(self._user_gi.jobs, url=extra_files_url)
322 return extra_files
323
324 def _get_metadata(self, history_content_type, content_id):
325 if history_content_type == "dataset":
326 return self._user_gi.histories.show_dataset(
327 self._history_id,
328 content_id,
329 )
330 elif history_content_type == "dataset_collection":
331 return self._user_gi.histories.show_dataset_collection(
332 self._history_id,
333 content_id,
334 )
335 else:
336 raise Exception("Unknown history content type encountered [%s]" % history_content_type)
337
338 def collect_outputs(self, ctx, output_directory):
339 assert self._outputs_dict is None, "collect_outputs pre-condition violated"
340
341 outputs_dict = {}
342 if not output_directory:
343 # TODO: rather than creating a directory just use
344 # Galaxy paths if they are available in this
345 # configuration.
346 output_directory = tempfile.mkdtemp()
347
348 def get_dataset(dataset_details, filename=None):
349 parent_basename = dataset_details.get("cwl_file_name")
350 if not parent_basename:
351 parent_basename = dataset_details.get("name")
352 file_ext = dataset_details["file_ext"]
353 if file_ext == "directory":
354 # TODO: rename output_directory to outputs_directory because we can have output directories
355 # and this is confusing...
356 the_output_directory = os.path.join(output_directory, parent_basename)
357 safe_makedirs(the_output_directory)
358 destination = self.download_output_to(dataset_details, the_output_directory, filename=filename)
359 else:
360 destination = self.download_output_to(dataset_details, output_directory, filename=filename)
361 if filename is None:
362 basename = parent_basename
363 else:
364 basename = os.path.basename(filename)
365
366 return {"path": destination, "basename": basename}
367
368 ctx.vlog("collecting outputs to directory %s" % output_directory)
369 for runnable_output in get_outputs(self._runnable):
370 output_id = runnable_output.get_id()
371 if not output_id:
372 ctx.vlog("Workflow output identified without an ID (label), skipping")
373 continue
374 output_dict_value = None
375 if self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool]:
376 galaxy_output = self.to_galaxy_output(runnable_output)
377 cwl_output = output_to_cwl_json(
378 galaxy_output,
379 self._get_metadata,
380 get_dataset,
381 self._get_extra_files,
382 pseduo_location=True,
383 )
384 output_dict_value = cwl_output
385 else:
386 # TODO: deprecate this route for finding workflow outputs,
387 # it is a brittle and bad approach...
388 output_dataset_id = self.output_dataset_id(runnable_output)
389 dataset = self._get_metadata("dataset", output_dataset_id)
390 dataset_dict = get_dataset(dataset)
391 ctx.vlog("populated destination [%s]" % dataset_dict["path"])
392
393 if dataset["file_ext"] == "expression.json":
394 with open(dataset_dict["path"], "r") as f:
395 output_dict_value = json.load(f)
396 else:
397 output_dict_value = output_properties(**dataset_dict)
398
399 outputs_dict[output_id] = output_dict_value
400
401 self._outputs_dict = outputs_dict
402 ctx.vlog("collected outputs [%s]" % self._outputs_dict)
403
404 @property
405 def log(self):
406 return self._log
407
408 @property
409 def job_info(self):
410 if self._job_info is not None:
411 return dict(
412 stdout=self._job_info["stdout"],
413 stderr=self._job_info["stderr"],
414 command_line=self._job_info["command_line"],
415 )
416 return None
417
418 @property
419 def outputs_dict(self):
420 return self._outputs_dict
421
422 def download_output_to(self, dataset_details, output_directory, filename=None):
423 if filename is None:
424 local_filename = dataset_details.get("cwl_file_name") or dataset_details.get("name")
425 else:
426 local_filename = filename
427 destination = os.path.join(output_directory, local_filename)
428 self._history_content_download(
429 self._history_id,
430 dataset_details["id"],
431 to_path=destination,
432 filename=filename,
433 )
434 return destination
435
436 def _history_content_download(self, history_id, dataset_id, to_path, filename=None):
437 user_gi = self._user_gi
438 url = user_gi.url + "/histories/%s/contents/%s/display" % (history_id, dataset_id)
439
440 data = {}
441 if filename:
442 data["filename"] = filename
443
444 r = requests.get(url, params=data, verify=user_gi.verify, stream=True, timeout=user_gi.timeout)
445 r.raise_for_status()
446
447 with open(to_path, 'wb') as fp:
448 for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE):
449 if chunk:
450 fp.write(chunk)
451
452
453 class GalaxyToolRunResponse(GalaxyBaseRunResponse):
454
455 def __init__(
456 self,
457 ctx,
458 runnable,
459 user_gi,
460 history_id,
461 log,
462 job_info,
463 api_run_response,
464 ):
465 super(GalaxyToolRunResponse, self).__init__(
466 ctx=ctx,
467 runnable=runnable,
468 user_gi=user_gi,
469 history_id=history_id,
470 log=log,
471 )
472 self._job_info = job_info
473 self.api_run_response = api_run_response
474
475 def is_collection(self, output):
476 # TODO: Make this more rigorous - search both output and output
477 # collections - throw an exception if not found in either place instead
478 # of just assuming all non-datasets are collections.
479 return self.output_dataset_id(output) is None
480
481 def to_galaxy_output(self, runnable_output):
482 output_id = runnable_output.get_id()
483 return tool_response_to_output(self.api_run_response, self._history_id, output_id)
484
485 def output_dataset_id(self, output):
486 outputs = self.api_run_response["outputs"]
487 output_id = output.get_id()
488 output_dataset_id = None
489 self._ctx.vlog("Looking for id [%s] in outputs [%s]" % (output_id, outputs))
490 for output in outputs:
491 if output["output_name"] == output_id:
492 output_dataset_id = output["id"]
493
494 return output_dataset_id
495
496
497 class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse):
498
499 def __init__(
500 self,
501 ctx,
502 runnable,
503 user_gi,
504 history_id,
505 log,
506 workflow_id,
507 invocation_id,
508 ):
509 super(GalaxyWorkflowRunResponse, self).__init__(
510 ctx=ctx,
511 runnable=runnable,
512 user_gi=user_gi,
513 history_id=history_id,
514 log=log,
515 )
516 self._workflow_id = workflow_id
517 self._invocation_id = invocation_id
518
519 def to_galaxy_output(self, runnable_output):
520 output_id = runnable_output.get_id()
521 self._ctx.vlog("checking for output in invocation [%s]" % self._invocation)
522 return invocation_to_output(self._invocation, self._history_id, output_id)
523
524 def output_dataset_id(self, output):
525 invocation = self._invocation
526 if "outputs" in invocation:
527 # Use newer workflow outputs API.
528
529 output_name = output.get_id()
530 if output_name in invocation["outputs"]:
531 return invocation["outputs"][output.get_id()]["id"]
532 else:
533 raise Exception("Failed to find output [%s] in invocation outputs [%s]" % (output_name, invocation["outputs"]))
534 else:
535 # Assume the output knows its order_index and such - older line of
536 # development not worth persuing.
537 workflow_output = output.workflow_output
538 order_index = workflow_output.order_index
539
540 invocation_steps = invocation["steps"]
541 output_steps = [s for s in invocation_steps if s["order_index"] == order_index]
542 assert len(output_steps) == 1, "More than one step matching outputs, behavior undefined."
543 output_step = output_steps[0]
544 job_id = output_step["job_id"]
545 assert job_id, "Output doesn't define a job_id, behavior undefined."
546 job_info = self._user_gi.jobs.show_job(job_id, full_details=True)
547 job_outputs = job_info["outputs"]
548 output_name = workflow_output.output_name
549 assert output_name in job_outputs, "No output [%s] found for output job."
550 job_output = job_outputs[output_name]
551 assert "id" in job_output, "Job output [%s] does not contain 'id'." % job_output
552 return job_output["id"]
553
554 @property
555 def _invocation(self):
556 invocation = self._user_gi.workflows.show_invocation(
557 self._workflow_id,
558 self._invocation_id,
559 )
560 return invocation
561
562
563 def _tool_id(tool_path):
564 tool_source = get_tool_source(tool_path)
565 return tool_source.parse_id()
566
567
568 def _history_id(gi, **kwds):
569 history_id = kwds.get("history_id", None)
570 if history_id is None:
571 history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME)
572 history_id = gi.histories.create_history(history_name)["id"]
573 return history_id
574
575
576 def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0):
577
578 def state_func():
579 if _retry_on_timeouts(ctx, gi, lambda gi: has_jobs_in_states(gi, history_id, ["error", "deleted", "deleted_new"])):
580 raise Exception("Problem running workflow, one or more jobs failed.")
581
582 return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id))
583
584 return _wait_on_state(state_func, polling_backoff)
585
586
587 def _retry_on_timeouts(ctx, gi, f):
588 gi.timeout = 60
589 try_count = 5
590 try:
591 for try_num in range(try_count):
592 start_time = time.time()
593 try:
594 return f(gi)
595 except Exception:
596 end_time = time.time()
597 if end_time - start_time > 45 and (try_num + 1) < try_count:
598 ctx.vlog("Galaxy seems to have timedout, retrying to fetch status.")
599 continue
600 else:
601 raise
602 finally:
603 gi.timeout = None
604
605
606 def has_jobs_in_states(gi, history_id, states):
607 params = {"history_id": history_id}
608 jobs_url = gi._make_url(gi.jobs)
609 jobs = Client._get(gi.jobs, params=params, url=jobs_url)
610
611 target_jobs = [j for j in jobs if j["state"] in states]
612
613 return len(target_jobs) > 0
614
615
616 def _wait_for_history(ctx, gi, history_id, polling_backoff=0):
617
618 def has_active_jobs(gi):
619 if has_jobs_in_states(gi, history_id, ["new", "upload", "waiting", "queued", "running"]):
620 return True
621 else:
622 return None
623
624 timeout = 60 * 60 * 24
625 wait_on(lambda: _retry_on_timeouts(ctx, gi, has_active_jobs), "active jobs", timeout, polling_backoff)
626
627 def state_func():
628 return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))
629
630 return _wait_on_state(state_func, polling_backoff)
631
632
633 def _wait_for_job(gi, job_id):
634 def state_func():
635 return gi.jobs.show_job(job_id, full_details=True)
636
637 return _wait_on_state(state_func)
638
639
640 def _wait_on_state(state_func, polling_backoff=0):
641
642 def get_state():
643 response = state_func()
644 state = response["state"]
645 if str(state) not in ["running", "queued", "new", "ready"]:
646 return state
647 else:
648 return None
649 timeout = 60 * 60 * 24
650 final_state = wait_on(get_state, "state", timeout, polling_backoff)
651 return final_state
652
653
654 __all__ = (
655 "execute",
656 )