comparison planemo/lib/python3.7/site-packages/galaxy/tool_util/cwl/parser.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d30785e31577
1 """ This module provides proxy objects around objects from the common
2 workflow language reference implementation library cwltool. These proxies
3 adapt cwltool to Galaxy features and abstract the library away from the rest
4 of the framework.
5 """
6 from __future__ import absolute_import
7
8 import base64
9 import copy
10 import json
11 import logging
12 import os
13 import pickle
14 from abc import ABCMeta, abstractmethod
15 from collections import OrderedDict
16 from uuid import uuid4
17
18 import six
19
20 from galaxy.exceptions import MessageException
21 from galaxy.util import (
22 listify,
23 safe_makedirs,
24 unicodify,
25 )
26 from galaxy.util.bunch import Bunch
27 from .cwltool_deps import (
28 beta_relaxed_fmt_check,
29 ensure_cwltool_available,
30 getdefault,
31 pathmapper,
32 process,
33 ref_resolver,
34 relink_initialworkdir,
35 RuntimeContext,
36 sourceline,
37 StdFsAccess,
38 )
39 from .representation import (
40 field_to_field_type,
41 FIELD_TYPE_REPRESENTATION,
42 INPUT_TYPE,
43 type_descriptions_for_field_types,
44 USE_FIELD_TYPES,
45 USE_STEP_PARAMETERS,
46 )
47 from .schema import (
48 non_strict_non_validating_schema_loader,
49 schema_loader,
50 )
51 from .util import SECONDARY_FILES_EXTRA_PREFIX
52
53 log = logging.getLogger(__name__)
54
55 JOB_JSON_FILE = ".cwl_job.json"
56
57 DOCKER_REQUIREMENT = "DockerRequirement"
58 SUPPORTED_TOOL_REQUIREMENTS = [
59 "CreateFileRequirement",
60 "DockerRequirement",
61 "EnvVarRequirement",
62 "InitialWorkDirRequirement",
63 "InlineJavascriptRequirement",
64 "ResourceRequirement",
65 "ShellCommandRequirement",
66 "ScatterFeatureRequirement",
67 "SchemaDefRequirement",
68 "SubworkflowFeatureRequirement",
69 "StepInputExpressionRequirement",
70 "MultipleInputFeatureRequirement",
71 ]
72
73
74 SUPPORTED_WORKFLOW_REQUIREMENTS = SUPPORTED_TOOL_REQUIREMENTS + [
75 ]
76
77 PERSISTED_REPRESENTATION = "cwl_tool_object"
78
79
80 def tool_proxy(tool_path=None, tool_object=None, strict_cwl_validation=True, tool_directory=None, uuid=None):
81 """ Provide a proxy object to cwltool data structures to just
82 grab relevant data.
83 """
84 ensure_cwltool_available()
85 # if uuid is None:
86 # raise Exception("tool_proxy must be called with non-None uuid")
87 tool = _to_cwl_tool_object(
88 tool_path=tool_path,
89 tool_object=tool_object,
90 strict_cwl_validation=strict_cwl_validation,
91 tool_directory=tool_directory,
92 uuid=uuid
93 )
94 return tool
95
96
97 def tool_proxy_from_persistent_representation(persisted_tool, strict_cwl_validation=True, tool_directory=None):
98 """Load a ToolProxy from a previously persisted representation."""
99 ensure_cwltool_available()
100 if PERSISTED_REPRESENTATION == "cwl_tool_object":
101 kwds = {"cwl_tool_object": ToolProxy.from_persistent_representation(persisted_tool)}
102 else:
103 raw_process_reference = persisted_tool # ???
104 kwds = {"raw_process_reference": ToolProxy.from_persistent_representation(raw_process_reference)}
105 uuid = persisted_tool["uuid"]
106 tool = _to_cwl_tool_object(uuid=uuid, strict_cwl_validation=strict_cwl_validation, tool_directory=tool_directory, **kwds)
107 return tool
108
109
110 def workflow_proxy(workflow_path, strict_cwl_validation=True):
111 ensure_cwltool_available()
112 workflow = _to_cwl_workflow_object(workflow_path, strict_cwl_validation=strict_cwl_validation)
113 return workflow
114
115
116 def load_job_proxy(job_directory, strict_cwl_validation=True):
117 ensure_cwltool_available()
118 job_objects_path = os.path.join(job_directory, JOB_JSON_FILE)
119 job_objects = json.load(open(job_objects_path, "r"))
120 job_inputs = job_objects["job_inputs"]
121 output_dict = job_objects["output_dict"]
122 # Any reason to retain older tool_path variant of this? Probably not?
123 if "tool_path" in job_objects:
124 tool_path = job_objects["tool_path"]
125 cwl_tool = tool_proxy(tool_path, strict_cwl_validation=strict_cwl_validation)
126 else:
127 persisted_tool = job_objects["tool_representation"]
128 cwl_tool = tool_proxy_from_persistent_representation(persisted_tool=persisted_tool, strict_cwl_validation=strict_cwl_validation)
129 cwl_job = cwl_tool.job_proxy(job_inputs, output_dict, job_directory=job_directory)
130 return cwl_job
131
132
133 def _to_cwl_tool_object(tool_path=None, tool_object=None, cwl_tool_object=None, raw_process_reference=None, strict_cwl_validation=False, tool_directory=None, uuid=None):
134 if uuid is None:
135 uuid = str(uuid4())
136 schema_loader = _schema_loader(strict_cwl_validation)
137 if raw_process_reference is None and tool_path is not None:
138 assert cwl_tool_object is None
139 assert tool_object is None
140
141 raw_process_reference = schema_loader.raw_process_reference(tool_path)
142 cwl_tool = schema_loader.tool(
143 raw_process_reference=raw_process_reference,
144 )
145 elif tool_object is not None:
146 assert raw_process_reference is None
147 assert cwl_tool_object is None
148
149 # Allow loading tools from YAML...
150 from ruamel import yaml as ryaml
151 as_str = json.dumps(tool_object)
152 tool_object = ryaml.round_trip_load(as_str)
153 path = tool_directory
154 if path is None:
155 path = os.getcwd()
156 uri = ref_resolver.file_uri(path) + "/"
157 sourceline.add_lc_filename(tool_object, uri)
158 raw_process_reference = schema_loader.raw_process_reference_for_object(
159 tool_object,
160 uri=uri
161 )
162 cwl_tool = schema_loader.tool(
163 raw_process_reference=raw_process_reference,
164 )
165 else:
166 cwl_tool = cwl_tool_object
167
168 if isinstance(cwl_tool, int):
169 raise Exception("Failed to load tool.")
170
171 raw_tool = cwl_tool.tool
172 # Apply Galaxy hacks to CWL tool representation to bridge semantic differences
173 # between Galaxy and cwltool.
174 _hack_cwl_requirements(cwl_tool)
175 check_requirements(raw_tool)
176 return _cwl_tool_object_to_proxy(cwl_tool, uuid, raw_process_reference=raw_process_reference, tool_path=tool_path)
177
178
179 def _cwl_tool_object_to_proxy(cwl_tool, uuid, raw_process_reference=None, tool_path=None):
180 raw_tool = cwl_tool.tool
181 if "class" not in raw_tool:
182 raise Exception("File does not declare a class, not a valid Draft 3+ CWL tool.")
183
184 process_class = raw_tool["class"]
185 if process_class == "CommandLineTool":
186 proxy_class = CommandLineToolProxy
187 elif process_class == "ExpressionTool":
188 proxy_class = ExpressionToolProxy
189 else:
190 raise Exception("File not a CWL CommandLineTool.")
191 top_level_object = tool_path is not None
192 if top_level_object and ("cwlVersion" not in raw_tool):
193 raise Exception("File does not declare a CWL version, pre-draft 3 CWL tools are not supported.")
194
195 proxy = proxy_class(cwl_tool, uuid, raw_process_reference, tool_path)
196 return proxy
197
198
199 def _to_cwl_workflow_object(workflow_path, strict_cwl_validation=None):
200 proxy_class = WorkflowProxy
201 cwl_workflow = _schema_loader(strict_cwl_validation).tool(path=workflow_path)
202 raw_workflow = cwl_workflow.tool
203 check_requirements(raw_workflow, tool=False)
204
205 proxy = proxy_class(cwl_workflow, workflow_path)
206 return proxy
207
208
209 def _schema_loader(strict_cwl_validation):
210 target_schema_loader = schema_loader if strict_cwl_validation else non_strict_non_validating_schema_loader
211 return target_schema_loader
212
213
214 def _hack_cwl_requirements(cwl_tool):
215 move_to_hints = []
216 for i, requirement in enumerate(cwl_tool.requirements):
217 if requirement["class"] == DOCKER_REQUIREMENT:
218 move_to_hints.insert(0, i)
219
220 for i in move_to_hints:
221 del cwl_tool.requirements[i]
222 cwl_tool.hints.append(requirement)
223
224
225 def check_requirements(rec, tool=True):
226 if isinstance(rec, dict):
227 if "requirements" in rec:
228 for r in rec["requirements"]:
229 if tool:
230 possible = SUPPORTED_TOOL_REQUIREMENTS
231 else:
232 possible = SUPPORTED_WORKFLOW_REQUIREMENTS
233 if r["class"] not in possible:
234 raise Exception("Unsupported requirement %s" % r["class"])
235 for d in rec:
236 check_requirements(rec[d], tool=tool)
237 if isinstance(rec, list):
238 for d in rec:
239 check_requirements(d, tool=tool)
240
241
242 @six.add_metaclass(ABCMeta)
243 class ToolProxy(object):
244
245 def __init__(self, tool, uuid, raw_process_reference=None, tool_path=None):
246 self._tool = tool
247 self._uuid = uuid
248 self._tool_path = tool_path
249 self._raw_process_reference = raw_process_reference
250
251 def job_proxy(self, input_dict, output_dict, job_directory="."):
252 """ Build a cwltool.job.Job describing computation using a input_json
253 Galaxy will generate mapping the Galaxy description of the inputs into
254 a cwltool compatible variant.
255 """
256 return JobProxy(self, input_dict, output_dict, job_directory=job_directory)
257
258 @property
259 def id(self):
260 raw_id = self._tool.tool.get("id", None)
261 return raw_id
262
263 def galaxy_id(self):
264 raw_id = self.id
265 tool_id = None
266 # don't reduce "search.cwl#index" to search
267 if raw_id:
268 tool_id = os.path.basename(raw_id)
269 # tool_id = os.path.splitext(os.path.basename(raw_id))[0]
270 if not tool_id:
271 return self._uuid
272 assert tool_id
273 if tool_id.startswith("#"):
274 tool_id = tool_id[1:]
275 return tool_id
276
277 @abstractmethod
278 def input_instances(self):
279 """ Return InputInstance objects describing mapping to Galaxy inputs. """
280
281 @abstractmethod
282 def output_instances(self):
283 """ Return OutputInstance objects describing mapping to Galaxy inputs. """
284
285 @abstractmethod
286 def docker_identifier(self):
287 """ Return docker identifier for embedding in tool description. """
288
289 @abstractmethod
290 def description(self):
291 """ Return description to tool. """
292
293 @abstractmethod
294 def label(self):
295 """ Return label for tool. """
296
297 def to_persistent_representation(self):
298 """Return a JSON representation of this tool. Not for serialization
299 over the wire, but serialization in a database."""
300 # TODO: Replace this with some more readable serialization,
301 # I really don't like using pickle here.
302 if PERSISTED_REPRESENTATION == "cwl_tool_object":
303 persisted_obj = remove_pickle_problems(self._tool)
304 else:
305 persisted_obj = self._raw_process_reference
306 return {
307 "class": self._class,
308 "pickle": unicodify(base64.b64encode(pickle.dumps(persisted_obj, pickle.HIGHEST_PROTOCOL))),
309 "uuid": self._uuid,
310 }
311
312 @staticmethod
313 def from_persistent_representation(as_object):
314 """Recover an object serialized with to_persistent_representation."""
315 if "class" not in as_object:
316 raise Exception("Failed to deserialize tool proxy from JSON object - no class found.")
317 if "pickle" not in as_object:
318 raise Exception("Failed to deserialize tool proxy from JSON object - no pickle representation found.")
319 if "uuid" not in as_object:
320 raise Exception("Failed to deserialize tool proxy from JSON object - no uuid found.")
321 to_unpickle = base64.b64decode(as_object["pickle"])
322 loaded_object = pickle.loads(to_unpickle)
323 return loaded_object
324
325
326 class CommandLineToolProxy(ToolProxy):
327 _class = "CommandLineTool"
328
329 def description(self):
330 # Don't use description - typically too verbose.
331 return ''
332
333 def doc(self):
334 # TODO: parse multiple lines and merge - valid in cwl-1.1
335 doc = self._tool.tool.get('doc')
336 return doc
337
338 def label(self):
339 label = self._tool.tool.get('label')
340
341 if label is not None:
342 return label.partition(":")[0] # return substring before ':'
343 else:
344 return ''
345
346 def input_fields(self):
347 input_records_schema = self._eval_schema(self._tool.inputs_record_schema)
348 if input_records_schema["type"] != "record":
349 raise Exception("Unhandled CWL tool input structure")
350
351 # TODO: handle this somewhere else?
352 # schemadef_req_tool_param
353 rval = []
354 for input in input_records_schema["fields"]:
355 input_copy = copy.deepcopy(input)
356 input_type = input.get("type")
357 if isinstance(input_type, list) or isinstance(input_type, dict):
358 rval.append(input_copy)
359 continue
360
361 if input_type in self._tool.schemaDefs:
362 input_copy["type"] = self._tool.schemaDefs[input_type]
363
364 rval.append(input_copy)
365 return rval
366
367 def _eval_schema(self, io_schema):
368 schema_type = io_schema.get("type")
369 if schema_type in self._tool.schemaDefs:
370 io_schema = self._tool.schemaDefs[schema_type]
371 return io_schema
372
373 def input_instances(self):
374 return [_outer_field_to_input_instance(_) for _ in self.input_fields()]
375
376 def output_instances(self):
377 outputs_schema = self._eval_schema(self._tool.outputs_record_schema)
378 if outputs_schema["type"] != "record":
379 raise Exception("Unhandled CWL tool output structure")
380
381 rval = []
382 for output in outputs_schema["fields"]:
383 rval.append(_simple_field_to_output(output))
384
385 return rval
386
387 def docker_identifier(self):
388 for hint in self.hints_or_requirements_of_class("DockerRequirement"):
389 if "dockerImageId" in hint:
390 return hint["dockerImageId"]
391 else:
392 return hint["dockerPull"]
393
394 return None
395
396 def hints_or_requirements_of_class(self, class_name):
397 tool = self._tool.tool
398 reqs_and_hints = tool.get("requirements", []) + tool.get("hints", [])
399 for hint in reqs_and_hints:
400 if hint["class"] == class_name:
401 yield hint
402
403 def software_requirements(self):
404 # Roughest imaginable pass at parsing requirements, really need to take in specs, handle
405 # multiple versions, etc...
406 tool = self._tool.tool
407 reqs_and_hints = tool.get("requirements", []) + tool.get("hints", [])
408 requirements = []
409 for hint in reqs_and_hints:
410 if hint["class"] == "SoftwareRequirement":
411 packages = hint.get("packages", [])
412 for package in packages:
413 versions = package.get("version", [])
414 first_version = None if not versions else versions[0]
415 requirements.append((package["package"], first_version))
416 return requirements
417
418 @property
419 def requirements(self):
420 return getattr(self._tool, "requirements", [])
421
422
423 class ExpressionToolProxy(CommandLineToolProxy):
424 _class = "ExpressionTool"
425
426
427 class JobProxy(object):
428
429 def __init__(self, tool_proxy, input_dict, output_dict, job_directory):
430 self._tool_proxy = tool_proxy
431 self._input_dict = input_dict
432 self._output_dict = output_dict
433 self._job_directory = job_directory
434
435 self._final_output = None
436 self._ok = True
437 self._cwl_job = None
438 self._is_command_line_job = None
439
440 self._normalize_job()
441
442 def cwl_job(self):
443 self._ensure_cwl_job_initialized()
444 return self._cwl_job
445
446 @property
447 def is_command_line_job(self):
448 self._ensure_cwl_job_initialized()
449 assert self._is_command_line_job is not None
450 return self._is_command_line_job
451
452 def _ensure_cwl_job_initialized(self):
453 if self._cwl_job is None:
454 job_args = dict(
455 basedir=self._job_directory,
456 select_resources=self._select_resources,
457 outdir=os.path.join(self._job_directory, "working"),
458 tmpdir=os.path.join(self._job_directory, "cwltmp"),
459 stagedir=os.path.join(self._job_directory, "cwlstagedir"),
460 use_container=False,
461 beta_relaxed_fmt_check=beta_relaxed_fmt_check,
462 )
463
464 args = []
465 kwargs = {}
466 if RuntimeContext is not None:
467 args.append(RuntimeContext(job_args))
468 else:
469 kwargs = job_args
470 self._cwl_job = next(self._tool_proxy._tool.job(
471 self._input_dict,
472 self._output_callback,
473 *args, **kwargs
474 ))
475 self._is_command_line_job = hasattr(self._cwl_job, "command_line")
476
477 def _normalize_job(self):
478 # Somehow reuse whatever causes validate in cwltool... maybe?
479 def pathToLoc(p):
480 if "location" not in p and "path" in p:
481 p["location"] = p["path"]
482 del p["path"]
483
484 runtime_context = RuntimeContext({})
485 make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess)
486 fs_access = make_fs_access(runtime_context.basedir)
487 process.fill_in_defaults(self._tool_proxy._tool.tool["inputs"], self._input_dict, fs_access)
488 process.visit_class(self._input_dict, ("File", "Directory"), pathToLoc)
489 # TODO: Why doesn't fillInDefault fill in locations instead of paths?
490 process.normalizeFilesDirs(self._input_dict)
491 # TODO: validate like cwltool process _init_job.
492 # validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job,
493 # strict=False, logger=_logger_validation_warnings)
494
495 def rewrite_inputs_for_staging(self):
496 if hasattr(self._cwl_job, "pathmapper"):
497 pass
498 # DO SOMETHING LIKE THE FOLLOWING?
499 # path_rewrites = {}
500 # for f, p in self._cwl_job.pathmapper.items():
501 # if not p.staged:
502 # continue
503 # if p.type in ("File", "Directory"):
504 # path_rewrites[p.resolved] = p.target
505 # for key, value in self._input_dict.items():
506 # if key in path_rewrites:
507 # self._input_dict[key]["location"] = path_rewrites[value]
508 else:
509 stagedir = os.path.join(self._job_directory, "cwlstagedir")
510 safe_makedirs(stagedir)
511
512 def stage_recursive(value):
513 is_list = isinstance(value, list)
514 is_dict = isinstance(value, dict)
515 log.info("handling value %s, is_list %s, is_dict %s" % (value, is_list, is_dict))
516 if is_list:
517 for val in value:
518 stage_recursive(val)
519 elif is_dict:
520 if "location" in value and "basename" in value:
521 location = value["location"]
522 basename = value["basename"]
523 if not location.endswith(basename): # TODO: sep()[-1]
524 staged_loc = os.path.join(stagedir, basename)
525 if not os.path.exists(staged_loc):
526 os.symlink(location, staged_loc)
527 value["location"] = staged_loc
528 for key, dict_value in value.items():
529 stage_recursive(dict_value)
530 else:
531 log.info("skipping simple value...")
532 stage_recursive(self._input_dict)
533
534 def _select_resources(self, request, runtime_context=None):
535 new_request = request.copy()
536 new_request["cores"] = "$GALAXY_SLOTS"
537 return new_request
538
539 @property
540 def command_line(self):
541 if self.is_command_line_job:
542 return self.cwl_job().command_line
543 else:
544 return ["true"]
545
546 @property
547 def stdin(self):
548 if self.is_command_line_job:
549 return self.cwl_job().stdin
550 else:
551 return None
552
553 @property
554 def stdout(self):
555 if self.is_command_line_job:
556 return self.cwl_job().stdout
557 else:
558 return None
559
560 @property
561 def stderr(self):
562 if self.is_command_line_job:
563 return self.cwl_job().stderr
564 else:
565 return None
566
567 @property
568 def environment(self):
569 if self.is_command_line_job:
570 return self.cwl_job().environment
571 else:
572 return {}
573
574 @property
575 def generate_files(self):
576 if self.is_command_line_job:
577 return self.cwl_job().generatefiles
578 else:
579 return {}
580
581 def _output_callback(self, out, process_status):
582 self._process_status = process_status
583 if process_status == "success":
584 self._final_output = out
585 else:
586 self._ok = False
587
588 log.info("Output are %s, status is %s" % (out, process_status))
589
590 def collect_outputs(self, tool_working_directory, rcode):
591 if not self.is_command_line_job:
592 cwl_job = self.cwl_job()
593 if RuntimeContext is not None:
594 cwl_job.run(
595 RuntimeContext({})
596 )
597 else:
598 cwl_job.run()
599 if not self._ok:
600 raise Exception("Final process state not ok, [%s]" % self._process_status)
601 return self._final_output
602 else:
603 return self.cwl_job().collect_outputs(tool_working_directory, rcode)
604
605 def save_job(self):
606 job_file = JobProxy._job_file(self._job_directory)
607 job_objects = {
608 # "tool_path": os.path.abspath(self._tool_proxy._tool_path),
609 "tool_representation": self._tool_proxy.to_persistent_representation(),
610 "job_inputs": self._input_dict,
611 "output_dict": self._output_dict,
612 }
613 json.dump(job_objects, open(job_file, "w"))
614
615 def _output_extra_files_dir(self, output_name):
616 output_id = self.output_id(output_name)
617 return os.path.join(self._job_directory, "dataset_%s_files" % output_id)
618
619 def output_id(self, output_name):
620 output_id = self._output_dict[output_name]["id"]
621 return output_id
622
623 def output_path(self, output_name):
624 output_id = self._output_dict[output_name]["path"]
625 return output_id
626
627 def output_directory_contents_dir(self, output_name, create=False):
628 extra_files_dir = self._output_extra_files_dir(output_name)
629 return extra_files_dir
630
631 def output_secondary_files_dir(self, output_name, create=False):
632 extra_files_dir = self._output_extra_files_dir(output_name)
633 secondary_files_dir = os.path.join(extra_files_dir, SECONDARY_FILES_EXTRA_PREFIX)
634 if create and not os.path.exists(secondary_files_dir):
635 safe_makedirs(secondary_files_dir)
636 return secondary_files_dir
637
638 def stage_files(self):
639 cwl_job = self.cwl_job()
640
641 def stageFunc(resolved_path, target_path):
642 log.info("resolving %s to %s" % (resolved_path, target_path))
643 try:
644 os.symlink(resolved_path, target_path)
645 except OSError:
646 pass
647
648 if hasattr(cwl_job, "pathmapper"):
649 process.stage_files(cwl_job.pathmapper, stageFunc, ignore_writable=True, symlink=False)
650
651 if hasattr(cwl_job, "generatefiles"):
652 outdir = os.path.join(self._job_directory, "working")
653 # TODO: Why doesn't cwl_job.generatemapper work?
654 generate_mapper = pathmapper.PathMapper(cwl_job.generatefiles["listing"],
655 outdir, outdir, separateDirs=False)
656 # TODO: figure out what inplace_update should be.
657 inplace_update = getattr(cwl_job, "inplace_update")
658 process.stage_files(generate_mapper, stageFunc, ignore_writable=inplace_update, symlink=False)
659 relink_initialworkdir(generate_mapper, outdir, outdir, inplace_update=inplace_update)
660 # else: expression tools do not have a path mapper.
661
662 @staticmethod
663 def _job_file(job_directory):
664 return os.path.join(job_directory, JOB_JSON_FILE)
665
666
667 class WorkflowProxy(object):
668
669 def __init__(self, workflow, workflow_path=None):
670 self._workflow = workflow
671 self._workflow_path = workflow_path
672 self._step_proxies = None
673
674 @property
675 def cwl_id(self):
676 return self._workflow.tool["id"]
677
678 def get_outputs_for_label(self, label):
679 outputs = []
680 for output in self._workflow.tool['outputs']:
681 step, output_name = split_step_references(
682 output["outputSource"],
683 multiple=False,
684 workflow_id=self.cwl_id,
685 )
686 if step == label:
687 output_id = output["id"]
688 if "#" not in self.cwl_id:
689 _, output_label = output_id.rsplit("#", 1)
690 else:
691 _, output_label = output_id.rsplit("/", 1)
692
693 outputs.append({
694 "output_name": output_name,
695 "label": output_label,
696 })
697 return outputs
698
699 def tool_reference_proxies(self):
700 """Fetch tool source definitions for all referenced tools."""
701 references = []
702 for step in self.step_proxies():
703 references.extend(step.tool_reference_proxies())
704 return references
705
706 def step_proxies(self):
707 if self._step_proxies is None:
708 proxies = []
709 num_input_steps = len(self._workflow.tool['inputs'])
710 for i, step in enumerate(self._workflow.steps):
711 proxies.append(build_step_proxy(self, step, i + num_input_steps))
712 self._step_proxies = proxies
713 return self._step_proxies
714
715 @property
716 def runnables(self):
717 runnables = []
718 for step in self._workflow.steps:
719 if "run" in step.tool:
720 runnables.append(step.tool["run"])
721 return runnables
722
723 def cwl_ids_to_index(self, step_proxies):
724 index = 0
725 cwl_ids_to_index = {}
726 for i, input_dict in enumerate(self._workflow.tool['inputs']):
727 cwl_ids_to_index[input_dict["id"]] = index
728 index += 1
729
730 for step_proxy in step_proxies:
731 cwl_ids_to_index[step_proxy.cwl_id] = index
732 index += 1
733
734 return cwl_ids_to_index
735
736 @property
737 def output_labels(self):
738 return [self.jsonld_id_to_label(o['id']) for o in self._workflow.tool['outputs']]
739
740 def input_connections_by_step(self, step_proxies):
741 cwl_ids_to_index = self.cwl_ids_to_index(step_proxies)
742 input_connections_by_step = []
743 for step_proxy in step_proxies:
744 input_connections_step = {}
745 for input_proxy in step_proxy.input_proxies:
746 cwl_source_id = input_proxy.cwl_source_id
747 input_name = input_proxy.input_name
748 # Consider only allow multiple if MultipleInputFeatureRequirement is enabled
749 for (output_step_name, output_name) in split_step_references(cwl_source_id, workflow_id=self.cwl_id):
750 if "#" in self.cwl_id:
751 sep_on = "/"
752 else:
753 sep_on = "#"
754 output_step_id = self.cwl_id + sep_on + output_step_name
755
756 if output_step_id not in cwl_ids_to_index:
757 template = "Output [%s] does not appear in ID-to-index map [%s]."
758 msg = template % (output_step_id, cwl_ids_to_index.keys())
759 raise AssertionError(msg)
760
761 if input_name not in input_connections_step:
762 input_connections_step[input_name] = []
763
764 input_connections_step[input_name].append({
765 "id": cwl_ids_to_index[output_step_id],
766 "output_name": output_name,
767 "input_type": "dataset"
768 })
769
770 input_connections_by_step.append(input_connections_step)
771
772 return input_connections_by_step
773
774 def to_dict(self):
775 name = os.path.basename(self._workflow.tool.get('label') or self._workflow_path or 'TODO - derive a name from ID')
776 steps = {}
777
778 step_proxies = self.step_proxies()
779 input_connections_by_step = self.input_connections_by_step(step_proxies)
780 index = 0
781 for i, input_dict in enumerate(self._workflow.tool['inputs']):
782 steps[index] = self.cwl_input_to_galaxy_step(input_dict, i)
783 index += 1
784
785 for i, step_proxy in enumerate(step_proxies):
786 input_connections = input_connections_by_step[i]
787 steps[index] = step_proxy.to_dict(input_connections)
788 index += 1
789
790 return {
791 'name': name,
792 'steps': steps,
793 'annotation': self.cwl_object_to_annotation(self._workflow.tool),
794 }
795
796 def find_inputs_step_index(self, label):
797 for i, input in enumerate(self._workflow.tool['inputs']):
798 if self.jsonld_id_to_label(input["id"]) == label:
799 return i
800
801 raise Exception("Failed to find index for label %s" % label)
802
803 def jsonld_id_to_label(self, id):
804 if "#" in self.cwl_id:
805 return id.rsplit("/", 1)[-1]
806 else:
807 return id.rsplit("#", 1)[-1]
808
809 def cwl_input_to_galaxy_step(self, input, i):
810 input_type = input["type"]
811 label = self.jsonld_id_to_label(input["id"])
812 input_as_dict = {
813 "id": i,
814 "label": label,
815 "position": {"left": 0, "top": 0},
816 "annotation": self.cwl_object_to_annotation(input),
817 "input_connections": {}, # Should the Galaxy API really require this? - Seems to.
818 "workflow_outputs": self.get_outputs_for_label(label),
819 }
820
821 if input_type == "File" and "default" not in input:
822 input_as_dict["type"] = "data_input"
823 elif isinstance(input_type, dict) and input_type.get("type") == "array":
824 input_as_dict["type"] = "data_collection_input"
825 input_as_dict["collection_type"] = "list"
826 elif isinstance(input_type, dict) and input_type.get("type") == "record":
827 input_as_dict["type"] = "data_collection_input"
828 input_as_dict["collection_type"] = "record"
829 else:
830 if USE_STEP_PARAMETERS:
831 input_as_dict["type"] = "parameter_input"
832 # TODO: dispatch on actual type so this doesn't always need
833 # to be field - simpler types could be simpler inputs.
834 tool_state = {}
835 tool_state["parameter_type"] = "field"
836 default_value = input.get("default")
837 optional = False
838 if isinstance(input_type, list) and "null" in input_type:
839 optional = True
840 if not optional and isinstance(input_type, dict) and "type" in input_type:
841 assert False
842 tool_state["default_value"] = {"src": "json", "value": default_value}
843 tool_state["optional"] = optional
844 input_as_dict["tool_state"] = tool_state
845 else:
846 input_as_dict["type"] = "data_input"
847 # TODO: format = expression.json
848
849 return input_as_dict
850
851 def cwl_object_to_annotation(self, cwl_obj):
852 return cwl_obj.get("doc", None)
853
854
855 def split_step_references(step_references, workflow_id=None, multiple=True):
856 """Split a CWL step input or output reference into step id and name."""
857 # Trim off the workflow id part of the reference.
858 step_references = listify(step_references)
859 split_references = []
860
861 for step_reference in step_references:
862 if workflow_id is None:
863 # This path works fine for some simple workflows - but not so much
864 # for subworkflows (maybe same for $graph workflows?)
865 assert "#" in step_reference
866 _, step_reference = step_reference.split("#", 1)
867 else:
868 if "#" in workflow_id:
869 sep_on = "/"
870 else:
871 sep_on = "#"
872 expected_prefix = workflow_id + sep_on
873 if not step_reference.startswith(expected_prefix):
874 raise AssertionError("step_reference [%s] doesn't start with %s" % (step_reference, expected_prefix))
875 step_reference = step_reference[len(expected_prefix):]
876
877 # Now just grab the step name and input/output name.
878 assert "#" not in step_reference
879 if "/" in step_reference:
880 step_name, io_name = step_reference.split("/", 1)
881 else:
882 # Referencing an input, not a step.
883 # In Galaxy workflows input steps have an implicit output named
884 # "output" for consistency with tools - in cwl land
885 # just the input name is referenced.
886 step_name = step_reference
887 io_name = "output"
888 split_references.append((step_name, io_name))
889
890 if multiple:
891 return split_references
892 else:
893 assert len(split_references) == 1
894 return split_references[0]
895
896
897 def build_step_proxy(workflow_proxy, step, index):
898 step_type = step.embedded_tool.tool["class"]
899 if step_type == "Workflow":
900 return SubworkflowStepProxy(workflow_proxy, step, index)
901 else:
902 return ToolStepProxy(workflow_proxy, step, index)
903
904
905 class BaseStepProxy(object):
906
907 def __init__(self, workflow_proxy, step, index):
908 self._workflow_proxy = workflow_proxy
909 self._step = step
910 self._index = index
911 self._uuid = str(uuid4())
912 self._input_proxies = None
913
914 @property
915 def step_class(self):
916 return self.cwl_tool_object.tool["class"]
917
918 @property
919 def cwl_id(self):
920 return self._step.id
921
922 @property
923 def cwl_workflow_id(self):
924 return self._workflow_proxy.cwl_id
925
926 @property
927 def requirements(self):
928 return self._step.requirements
929
930 @property
931 def hints(self):
932 return self._step.hints
933
934 @property
935 def label(self):
936 label = self._workflow_proxy.jsonld_id_to_label(self._step.id)
937 return label
938
939 def galaxy_workflow_outputs_list(self):
940 return self._workflow_proxy.get_outputs_for_label(self.label)
941
942 @property
943 def cwl_tool_object(self):
944 return self._step.embedded_tool
945
946 @property
947 def input_proxies(self):
948 if self._input_proxies is None:
949 input_proxies = []
950 cwl_inputs = self._step.tool["inputs"]
951 for cwl_input in cwl_inputs:
952 input_proxies.append(InputProxy(self, cwl_input))
953 self._input_proxies = input_proxies
954 return self._input_proxies
955
956 def inputs_to_dicts(self):
957 inputs_as_dicts = []
958 for input_proxy in self.input_proxies:
959 inputs_as_dicts.append(input_proxy.to_dict())
960 return inputs_as_dicts
961
962
963 class InputProxy(object):
964
965 def __init__(self, step_proxy, cwl_input):
966 self._cwl_input = cwl_input
967 self.step_proxy = step_proxy
968 self.workflow_proxy = step_proxy._workflow_proxy
969
970 cwl_input_id = cwl_input["id"]
971 cwl_source_id = cwl_input.get("source", None)
972 if cwl_source_id is None:
973 if "valueFrom" not in cwl_input and "default" not in cwl_input:
974 msg = "Workflow step input must define a source, a valueFrom, or a default value. Obtained [%s]." % cwl_input
975 raise MessageException(msg)
976
977 assert cwl_input_id
978 step_name, input_name = split_step_references(
979 cwl_input_id,
980 multiple=False,
981 workflow_id=step_proxy.cwl_workflow_id
982 )
983 self.step_name = step_name
984 self.input_name = input_name
985
986 self.cwl_input_id = cwl_input_id
987 self.cwl_source_id = cwl_source_id
988
989 scatter_inputs = [split_step_references(
990 i, multiple=False, workflow_id=step_proxy.cwl_workflow_id
991 )[1] for i in listify(step_proxy._step.tool.get("scatter", []))]
992 scatter = self.input_name in scatter_inputs
993 self.scatter = scatter
994
995 def to_dict(self):
996 as_dict = {
997 "name": self.input_name,
998 }
999 if "linkMerge" in self._cwl_input:
1000 as_dict["merge_type"] = self._cwl_input["linkMerge"]
1001 if "scatterMethod" in self.step_proxy._step.tool:
1002 as_dict["scatter_type"] = self.step_proxy._step.tool.get("scatterMethod", "dotproduct")
1003 else:
1004 as_dict["scatter_type"] = "dotproduct" if self.scatter else "disabled"
1005 if "valueFrom" in self._cwl_input:
1006 # TODO: Add a table for expressions - mark the type as CWL 1.0 JavaScript.
1007 as_dict["value_from"] = self._cwl_input["valueFrom"]
1008 if "default" in self._cwl_input:
1009 as_dict["default"] = self._cwl_input["default"]
1010 return as_dict
1011
1012
1013 class ToolStepProxy(BaseStepProxy):
1014
1015 def __init__(self, workflow_proxy, step, index):
1016 super(ToolStepProxy, self).__init__(workflow_proxy, step, index)
1017 self._tool_proxy = None
1018
1019 @property
1020 def tool_proxy(self):
1021 # Neeeds to be cached so UUID that is loaded matches UUID generated with to_dict.
1022 if self._tool_proxy is None:
1023 self._tool_proxy = _cwl_tool_object_to_proxy(self.cwl_tool_object, uuid=str(uuid4()))
1024 return self._tool_proxy
1025
1026 def tool_reference_proxies(self):
1027 # Return a list so we can handle subworkflows recursively.
1028 return [self.tool_proxy]
1029
1030 def to_dict(self, input_connections):
1031 # We need to stub out null entries for things getting replaced by
1032 # connections. This doesn't seem ideal - consider just making Galaxy
1033 # handle this.
1034 tool_state = {}
1035 for input_name in input_connections.keys():
1036 tool_state[input_name] = None
1037
1038 outputs = self.galaxy_workflow_outputs_list()
1039 return {
1040 "id": self._index,
1041 "tool_uuid": self.tool_proxy._uuid, # TODO: make sure this is respected...
1042 "label": self.label,
1043 "position": {"left": 0, "top": 0},
1044 "type": "tool",
1045 "annotation": self._workflow_proxy.cwl_object_to_annotation(self._step.tool),
1046 "input_connections": input_connections,
1047 "inputs": self.inputs_to_dicts(),
1048 "workflow_outputs": outputs,
1049 }
1050
1051
1052 class SubworkflowStepProxy(BaseStepProxy):
1053
1054 def __init__(self, workflow_proxy, step, index):
1055 super(SubworkflowStepProxy, self).__init__(workflow_proxy, step, index)
1056 self._subworkflow_proxy = None
1057
1058 def to_dict(self, input_connections):
1059 outputs = self.galaxy_workflow_outputs_list()
1060 for key, input_connection_list in input_connections.items():
1061 input_subworkflow_step_id = self.subworkflow_proxy.find_inputs_step_index(
1062 key
1063 )
1064 for input_connection in input_connection_list:
1065 input_connection["input_subworkflow_step_id"] = input_subworkflow_step_id
1066
1067 return {
1068 "id": self._index,
1069 "label": self.label,
1070 "position": {"left": 0, "top": 0},
1071 "type": "subworkflow",
1072 "subworkflow": self.subworkflow_proxy.to_dict(),
1073 "annotation": self.subworkflow_proxy.cwl_object_to_annotation(self._step.tool),
1074 "input_connections": input_connections,
1075 "inputs": self.inputs_to_dicts(),
1076 "workflow_outputs": outputs,
1077 }
1078
1079 def tool_reference_proxies(self):
1080 return self.subworkflow_proxy.tool_reference_proxies()
1081
1082 @property
1083 def subworkflow_proxy(self):
1084 if self._subworkflow_proxy is None:
1085 self._subworkflow_proxy = WorkflowProxy(self.cwl_tool_object)
1086 return self._subworkflow_proxy
1087
1088
1089 def remove_pickle_problems(obj):
1090 """doc_loader does not pickle correctly"""
1091 if hasattr(obj, "doc_loader"):
1092 obj.doc_loader = None
1093 if hasattr(obj, "embedded_tool"):
1094 obj.embedded_tool = remove_pickle_problems(obj.embedded_tool)
1095 if hasattr(obj, "steps"):
1096 obj.steps = [remove_pickle_problems(s) for s in obj.steps]
1097 return obj
1098
1099
1100 @six.add_metaclass(ABCMeta)
1101 class WorkflowToolReference(object):
1102 pass
1103
1104
1105 class EmbeddedWorkflowToolReference(WorkflowToolReference):
1106 pass
1107
1108
1109 class ExternalWorkflowToolReference(WorkflowToolReference):
1110 pass
1111
1112
1113 def _outer_field_to_input_instance(field):
1114 field_type = field_to_field_type(field) # Must be a list if in here?
1115 if not isinstance(field_type, list):
1116 field_type = [field_type]
1117
1118 name, label, description = _field_metadata(field)
1119
1120 case_name = "_cwl__type_"
1121 case_label = "Specify Parameter %s As" % label
1122
1123 def value_input(type_description):
1124 value_name = "_cwl__value_"
1125 value_label = label
1126 value_description = description
1127 return InputInstance(
1128 value_name,
1129 value_label,
1130 value_description,
1131 input_type=type_description.galaxy_param_type,
1132 collection_type=type_description.collection_type,
1133 )
1134
1135 select_options = []
1136 case_options = []
1137 type_descriptions = type_descriptions_for_field_types(field_type)
1138 for type_description in type_descriptions:
1139 select_options.append({"value": type_description.name, "label": type_description.label})
1140 input_instances = []
1141 if type_description.uses_param():
1142 input_instances.append(value_input(type_description))
1143 case_options.append((type_description.name, input_instances))
1144
1145 # If there is more than one way to represent this parameter - produce a conditional
1146 # requesting user to ask for what form they want to submit the data in, else just map
1147 # a simple Galaxy parameter.
1148 if len(case_options) > 1 and not USE_FIELD_TYPES:
1149 case_input = SelectInputInstance(
1150 name=case_name,
1151 label=case_label,
1152 description=False,
1153 options=select_options,
1154 )
1155
1156 return ConditionalInstance(name, case_input, case_options)
1157 else:
1158 if len(case_options) > 1:
1159 only_type_description = FIELD_TYPE_REPRESENTATION
1160 else:
1161 only_type_description = type_descriptions[0]
1162
1163 return InputInstance(
1164 name, label, description, input_type=only_type_description.galaxy_param_type, collection_type=only_type_description.collection_type
1165 )
1166
1167 # Older array to repeat handling, now we are just representing arrays as
1168 # dataset collections - we should offer a blended approach in the future.
1169 # if field_type in simple_map_type_map.keys():
1170 # input_type = simple_map_type_map[field_type]
1171 # return {"input_type": input_type, "array": False}
1172 # elif field_type == "array":
1173 # if isinstance(field["type"], dict):
1174 # array_type = field["type"]["items"]
1175 # else:
1176 # array_type = field["items"]
1177 # if array_type in simple_map_type_map.keys():
1178 # input_type = simple_map_type_map[array_type]
1179 # return {"input_type": input_type, "array": True}
1180 # else:
1181 # raise Exception("Unhandled simple field type encountered - [%s]." % field_type)
1182
1183
1184 def _field_metadata(field):
1185 name = field["name"]
1186 label = field.get("label", None)
1187 description = field.get("doc", None)
1188 return name, label, description
1189
1190
1191 def _simple_field_to_output(field):
1192 name = field["name"]
1193 output_data_class = field["type"]
1194 output_instance = OutputInstance(
1195 name,
1196 output_data_type=output_data_class,
1197 output_type=OUTPUT_TYPE.GLOB
1198 )
1199 return output_instance
1200
1201
1202 class ConditionalInstance(object):
1203
1204 def __init__(self, name, case, whens):
1205 self.input_type = INPUT_TYPE.CONDITIONAL
1206 self.name = name
1207 self.case = case
1208 self.whens = whens
1209
1210 def to_dict(self):
1211
1212 as_dict = dict(
1213 name=self.name,
1214 type=INPUT_TYPE.CONDITIONAL,
1215 test=self.case.to_dict(),
1216 when=OrderedDict(),
1217 )
1218 for value, block in self.whens:
1219 as_dict["when"][value] = [i.to_dict() for i in block]
1220
1221 return as_dict
1222
1223
1224 class SelectInputInstance(object):
1225
1226 def __init__(self, name, label, description, options):
1227 self.input_type = INPUT_TYPE.SELECT
1228 self.name = name
1229 self.label = label
1230 self.description = description
1231 self.options = options
1232
1233 def to_dict(self):
1234 # TODO: serialize options...
1235 as_dict = dict(
1236 name=self.name,
1237 label=self.label or self.name,
1238 help=self.description,
1239 type=self.input_type,
1240 options=self.options,
1241 )
1242 return as_dict
1243
1244
1245 class InputInstance(object):
1246
1247 def __init__(self, name, label, description, input_type, array=False, area=False, collection_type=None):
1248 self.input_type = input_type
1249 self.collection_type = collection_type
1250 self.name = name
1251 self.label = label
1252 self.description = description
1253 self.required = True
1254 self.array = array
1255 self.area = area
1256
1257 def to_dict(self, itemwise=True):
1258 if itemwise and self.array:
1259 as_dict = dict(
1260 type="repeat",
1261 name="%s_repeat" % self.name,
1262 title="%s" % self.name,
1263 blocks=[
1264 self.to_dict(itemwise=False)
1265 ]
1266 )
1267 else:
1268 as_dict = dict(
1269 name=self.name,
1270 label=self.label or self.name,
1271 help=self.description,
1272 type=self.input_type,
1273 optional=not self.required,
1274 )
1275 if self.area:
1276 as_dict["area"] = True
1277
1278 if self.input_type == INPUT_TYPE.INTEGER:
1279 as_dict["value"] = "0"
1280 if self.input_type == INPUT_TYPE.FLOAT:
1281 as_dict["value"] = "0.0"
1282 elif self.input_type == INPUT_TYPE.DATA_COLLECTON:
1283 as_dict["collection_type"] = self.collection_type
1284
1285 return as_dict
1286
1287
1288 OUTPUT_TYPE = Bunch(
1289 GLOB="glob",
1290 STDOUT="stdout",
1291 )
1292
1293
1294 # TODO: Different subclasses - this is representing different types of things.
1295 class OutputInstance(object):
1296
1297 def __init__(self, name, output_data_type, output_type, path=None, fields=None):
1298 self.name = name
1299 self.output_data_type = output_data_type
1300 self.output_type = output_type
1301 self.path = path
1302 self.fields = fields
1303
1304
1305 __all__ = (
1306 'tool_proxy',
1307 'load_job_proxy',
1308 )