comparison env/lib/python3.7/site-packages/gxformat2/converter.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """Functionality for converting a Format 2 workflow into a standard Galaxy workflow."""
2 from __future__ import print_function
3
4 import copy
5 import json
6 import logging
7 import os
8 import sys
9 import uuid
10 from collections import OrderedDict
11
12 from ._labels import Labels
13 from ._yaml import ordered_load
14
15 # source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output
16 SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1"
17 STEP_TYPES = [
18 "subworkflow",
19 "data_input",
20 "data_collection_input",
21 "tool",
22 "pause",
23 "parameter_input",
24 ]
25
26 STEP_TYPE_ALIASES = {
27 'input': 'data_input',
28 'input_collection': 'data_collection_input',
29 'parameter': 'parameter_input',
30 }
31
32 RUN_ACTIONS_TO_STEPS = {
33 'GalaxyWorkflow': 'run_workflow_to_step',
34 'GalaxyTool': 'run_tool_to_step',
35 }
36
37 POST_JOB_ACTIONS = {
38 'hide': {
39 'action_class': "HideDatasetAction",
40 'default': False,
41 'arguments': lambda x: x,
42 },
43 'rename': {
44 'action_class': 'RenameDatasetAction',
45 'default': {},
46 'arguments': lambda x: {'newname': x},
47 },
48 'delete_intermediate_datasets': {
49 'action_class': 'DeleteIntermediatesAction',
50 'default': False,
51 'arguments': lambda x: x,
52 },
53 'change_datatype': {
54 'action_class': 'ChangeDatatypeAction',
55 'default': {},
56 'arguments': lambda x: {'newtype': x},
57 },
58 'set_columns': {
59 'action_class': 'ColumnSetAction',
60 'default': {},
61 'arguments': lambda x: x,
62 },
63 'add_tags': {
64 'action_class': 'TagDatasetAction',
65 'default': [],
66 'arguments': lambda x: {'tags': ",".join(x)},
67 },
68 'remove_tags': {
69 'action_class': 'RemoveTagDatasetAction',
70 'default': [],
71 'arguments': lambda x: {'tags': ",".join(x)},
72 },
73 }
74
75 log = logging.getLogger(__name__)
76
77
78 def rename_arg(argument):
79 return argument
80
81
82 def clean_connection(value):
83 if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS:
84 # Hope these are just used by Galaxy testing workflows and such, and not in production workflows.
85 log.warn("Legacy workflow syntax for connections [%s] will not be supported in the future" % value)
86 value = value.replace("#", "/", 1)
87 else:
88 return value
89
90
91 class ImportOptions(object):
92
93 def __init__(self):
94 self.deduplicate_subworkflows = False
95
96
97 def yaml_to_workflow(has_yaml, galaxy_interface, workflow_directory, import_options=None):
98 """Convert a Format 2 workflow into standard Galaxy format from supplied stream."""
99 as_python = ordered_load(has_yaml)
100 return python_to_workflow(as_python, galaxy_interface, workflow_directory, import_options=import_options)
101
102
103 def python_to_workflow(as_python, galaxy_interface, workflow_directory=None, import_options=None):
104 """Convert a Format 2 workflow into standard Galaxy format from supplied dictionary."""
105 if "yaml_content" in as_python:
106 as_python = ordered_load(as_python["yaml_content"])
107
108 if workflow_directory is None:
109 workflow_directory = os.path.abspath(".")
110
111 conversion_context = ConversionContext(
112 galaxy_interface,
113 workflow_directory,
114 import_options,
115 )
116 as_python = _preprocess_graphs(as_python, conversion_context)
117 subworkflows = None
118 if conversion_context.import_options.deduplicate_subworkflows:
119 # TODO: import only required workflows...
120 # TODO: dag sort these...
121 subworkflows = OrderedDict()
122 for graph_id, subworkflow_content in conversion_context.graph_ids.items():
123 if graph_id == "main":
124 continue
125 subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context_graph("#" + graph_id)
126 subworkflows[graph_id] = _python_to_workflow(copy.deepcopy(subworkflow_content), subworkflow_conversion_context)
127 converted = _python_to_workflow(as_python, conversion_context)
128 if subworkflows is not None:
129 converted["subworkflows"] = subworkflows
130 return converted
131
132
133 # move to a utils file?
134 def steps_as_list(format2_workflow, add_label=True):
135 """Return steps as a list, converting ID map to list representation if needed."""
136 steps = format2_workflow["steps"]
137 steps = _convert_dict_to_id_list_if_needed(steps, add_label=True)
138 return steps
139
140
141 def ensure_step_position(step, order_index):
142 """Ensure step contains a position definition."""
143 if "position" not in step:
144 step["position"] = {
145 "left": 10 * order_index,
146 "top": 10 * order_index
147 }
148
149
150 def _python_to_workflow(as_python, conversion_context):
151
152 if "class" not in as_python:
153 raise Exception("This is not a not a valid Galaxy workflow definition, must define a class.")
154
155 if as_python["class"] != "GalaxyWorkflow":
156 raise Exception("This is not a not a valid Galaxy workflow definition, 'class' must be 'GalaxyWorkflow'.")
157
158 # .ga files don't have this, drop it so it isn't interpreted as a format 2 workflow.
159 as_python.pop("class")
160
161 _ensure_defaults(as_python, {
162 "a_galaxy_workflow": "true",
163 "format-version": "0.1",
164 "name": "Workflow",
165 "uuid": str(uuid.uuid4()),
166 })
167 _populate_annotation(as_python)
168
169 steps = steps_as_list(as_python)
170
171 convert_inputs_to_steps(as_python, steps)
172
173 if isinstance(steps, list):
174 steps_as_dict = OrderedDict()
175 for i, step in enumerate(steps):
176 steps_as_dict[str(i)] = step
177 if "id" not in step:
178 step["id"] = i
179
180 if "label" in step:
181 label = step["label"]
182 conversion_context.labels[label] = i
183
184 # TODO: this really should be optional in Galaxy API.
185 ensure_step_position(step, i)
186
187 as_python["steps"] = steps_as_dict
188 steps = steps_as_dict
189
190 for step in steps.values():
191 step_type = step.get("type", None)
192 if "run" in step:
193 if step_type is not None:
194 raise Exception("Steps specified as run actions cannot specify a type.")
195 run_action = step.get("run")
196 run_action = conversion_context.get_runnable_description(run_action)
197 if isinstance(run_action, dict):
198 run_class = run_action["class"]
199 run_to_step_function = eval(RUN_ACTIONS_TO_STEPS[run_class])
200
201 run_to_step_function(conversion_context, step, run_action)
202 else:
203 step["content_id"] = run_action
204 step["type"] = "subworkflow"
205 del step["run"]
206
207 for step in steps.values():
208 step_type = step.get("type", "tool")
209 step_type = STEP_TYPE_ALIASES.get(step_type, step_type)
210 if step_type not in STEP_TYPES:
211 raise Exception("Unknown step type encountered %s" % step_type)
212 step["type"] = step_type
213 eval("transform_%s" % step_type)(conversion_context, step)
214
215 outputs = as_python.get("outputs", [])
216 outputs = _convert_dict_to_id_list_if_needed(outputs)
217
218 for output in outputs:
219 assert isinstance(output, dict), "Output definition must be dictionary"
220 assert "source" in output or "outputSource" in output, "Output definition must specify source"
221
222 if "label" in output and "id" in output:
223 raise Exception("label and id are aliases for outputs, may only define one")
224 if "label" not in output and "id" not in output:
225 label = ""
226
227 raw_label = output.pop("label", None)
228 raw_id = output.pop("id", None)
229 label = raw_label or raw_id
230 if Labels.is_anonymous_output_label(label):
231 label = None
232 source = clean_connection(output.get("outputSource"))
233 if source is None and SUPPORT_LEGACY_CONNECTIONS:
234 source = output.get("source").replace("#", "/", 1)
235 id, output_name = conversion_context.step_output(source)
236 step = steps[str(id)]
237 workflow_output = {
238 "output_name": output_name,
239 "label": label,
240 "uuid": output.get("uuid", None)
241 }
242 if "workflow_outputs" not in step:
243 step["workflow_outputs"] = []
244 step["workflow_outputs"].append(workflow_output)
245
246 return as_python
247
248
249 def _preprocess_graphs(as_python, conversion_context):
250 if not isinstance(as_python, dict):
251 raise Exception("This is not a not a valid Galaxy workflow definition.")
252
253 format_version = as_python.get("format-version", "v2.0")
254 assert format_version == "v2.0"
255
256 if "class" not in as_python and "$graph" in as_python:
257 for subworkflow in as_python["$graph"]:
258 if not isinstance(subworkflow, dict):
259 raise Exception("Malformed workflow content in $graph")
260 if "id" not in subworkflow:
261 raise Exception("No subworkflow ID found for entry in $graph.")
262 subworkflow_id = subworkflow["id"]
263 if subworkflow_id == "main":
264 as_python = subworkflow
265
266 conversion_context.register_runnable(subworkflow)
267
268 return as_python
269
270
271 def convert_inputs_to_steps(workflow_dict, steps):
272 """Convert workflow inputs to a steps in array - like in native Galaxy."""
273 if "inputs" not in workflow_dict:
274 return
275
276 inputs = workflow_dict["inputs"]
277 new_steps = []
278 inputs = _convert_dict_to_id_list_if_needed(inputs)
279 for input_def_raw in inputs:
280 input_def = input_def_raw.copy()
281
282 if "label" in input_def and "id" in input_def:
283 raise Exception("label and id are aliases for inputs, may only define one")
284 if "label" not in input_def and "id" not in input_def:
285 raise Exception("Input must define a label.")
286
287 raw_label = input_def.pop("label", None)
288 raw_id = input_def.pop("id", None)
289 label = raw_label or raw_id
290
291 if not label:
292 raise Exception("Input label must not be empty.")
293
294 input_type = input_def.pop("type", "data")
295 if input_type in ["File", "data", "data_input"]:
296 step_type = "data_input"
297 elif input_type in ["collection", "data_collection", "data_collection_input"]:
298 step_type = "data_collection_input"
299 elif input_type in ["text", "integer", "float", "color", "boolean"]:
300 step_type = "parameter_input"
301 input_def["parameter_type"] = input_type
302 else:
303 raise Exception("Input type must be a data file or collection.")
304
305 step_def = input_def
306 step_def.update({
307 "type": step_type,
308 "label": label,
309 })
310 new_steps.append(step_def)
311
312 for i, new_step in enumerate(new_steps):
313 steps.insert(i, new_step)
314
315
316 def run_workflow_to_step(conversion_context, step, run_action):
317 step["type"] = "subworkflow"
318 if conversion_context.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
319 step["content_id"] = run_action
320 else:
321 subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context(step)
322 step["subworkflow"] = _python_to_workflow(
323 copy.deepcopy(run_action),
324 subworkflow_conversion_context,
325 )
326
327
328 def _is_graph_id_reference(run_action):
329 return run_action and not isinstance(run_action, dict)
330
331
332 def transform_data_input(context, step):
333 transform_input(context, step, default_name="Input dataset")
334
335
336 def transform_data_collection_input(context, step):
337 transform_input(context, step, default_name="Input dataset collection")
338
339
340 def transform_parameter_input(context, step):
341 transform_input(context, step, default_name="input_parameter")
342
343
344 def transform_input(context, step, default_name):
345 default_name = step.get("label", default_name)
346 _populate_annotation(step)
347 _ensure_inputs_connections(step)
348
349 if "inputs" not in step:
350 step["inputs"] = [{}]
351
352 step_inputs = step["inputs"][0]
353 if "name" in step_inputs:
354 name = step_inputs["name"]
355 else:
356 name = default_name
357
358 _ensure_defaults(step_inputs, {
359 "name": name,
360 "description": "",
361 })
362 tool_state = {
363 "name": name
364 }
365 for attrib in ["collection_type", "parameter_type", "optional", "default"]:
366 if attrib in step:
367 tool_state[attrib] = step[attrib]
368
369 _populate_tool_state(step, tool_state)
370
371
372 def transform_pause(context, step, default_name="Pause for dataset review"):
373 default_name = step.get("label", default_name)
374 _populate_annotation(step)
375
376 _ensure_inputs_connections(step)
377
378 if "inputs" not in step:
379 step["inputs"] = [{}]
380
381 step_inputs = step["inputs"][0]
382 if "name" in step_inputs:
383 name = step_inputs["name"]
384 else:
385 name = default_name
386
387 _ensure_defaults(step_inputs, {
388 "name": name,
389 })
390 tool_state = {
391 "name": name
392 }
393
394 connect = _init_connect_dict(step)
395 _populate_input_connections(context, step, connect)
396 _populate_tool_state(step, tool_state)
397
398
399 def transform_subworkflow(context, step):
400 _populate_annotation(step)
401
402 _ensure_inputs_connections(step)
403
404 tool_state = {
405 }
406
407 connect = _init_connect_dict(step)
408 _populate_input_connections(context, step, connect)
409 _populate_tool_state(step, tool_state)
410
411
412 def _runtime_value():
413 return {"__class__": "RuntimeValue"}
414
415
416 def transform_tool(context, step):
417 if "tool_id" not in step:
418 raise Exception("Tool steps must define a tool_id.")
419
420 _ensure_defaults(step, {
421 "name": step['tool_id'],
422 "post_job_actions": {},
423 "tool_version": None,
424 })
425 post_job_actions = step["post_job_actions"]
426 _populate_annotation(step)
427
428 tool_state = {
429 # TODO: Galaxy should not require tool state actually specify a __page__.
430 "__page__": 0,
431 }
432
433 connect = _init_connect_dict(step)
434
435 def append_link(key, value):
436 if key not in connect:
437 connect[key] = []
438 assert "$link" in value
439 link_value = value["$link"]
440 connect[key].append(clean_connection(link_value))
441
442 def replace_links(value, key=""):
443 if _is_link(value):
444 append_link(key, value)
445 # Filled in by the connection, so to force late
446 # validation of the field just mark as RuntimeValue.
447 # It would be better I guess if this were some other
448 # value dedicated to this purpose (e.g. a ficitious
449 # {"__class__": "ConnectedValue"}) that could be further
450 # validated by Galaxy.
451 return _runtime_value()
452 if isinstance(value, dict):
453 new_values = {}
454 for k, v in value.items():
455 new_key = _join_prefix(key, k)
456 new_values[k] = replace_links(v, new_key)
457 return new_values
458 elif isinstance(value, list):
459 new_values = []
460 for i, v in enumerate(value):
461 # If we are a repeat we need to modify the key
462 # but not if values are actually $links.
463 if _is_link(v):
464 append_link(key, v)
465 new_values.append(None)
466 else:
467 new_key = "%s_%d" % (key, i)
468 new_values.append(replace_links(v, new_key))
469 return new_values
470 else:
471 return value
472
473 # TODO: handle runtime inputs and state together.
474 runtime_inputs = step.get("runtime_inputs", [])
475 if "state" in step or runtime_inputs:
476 step_state = step.pop("state", {})
477 step_state = replace_links(step_state)
478
479 for key, value in step_state.items():
480 tool_state[key] = json.dumps(value)
481 for runtime_input in runtime_inputs:
482 tool_state[runtime_input] = json.dumps(_runtime_value())
483 elif "tool_state" in step:
484 tool_state.update(step.get("tool_state"))
485
486 # Fill in input connections
487 _populate_input_connections(context, step, connect)
488
489 _populate_tool_state(step, tool_state)
490
491 # Handle outputs.
492 out = step.pop("out", None)
493 if out is None:
494 # Handle LEGACY 19.XX outputs key.
495 out = step.pop("outputs", [])
496 out = _convert_dict_to_id_list_if_needed(out)
497 for output in out:
498 name = output["id"]
499 for action_key, action_dict in POST_JOB_ACTIONS.items():
500 action_argument = output.get(action_key, action_dict['default'])
501 if action_argument:
502 action_class = action_dict['action_class']
503 action_name = action_class + name
504 action = _action(
505 action_class,
506 name,
507 arguments=action_dict['arguments'](action_argument)
508 )
509 post_job_actions[action_name] = action
510
511
512 def run_tool_to_step(conversion_context, step, run_action):
513 tool_description = conversion_context.galaxy_interface.import_tool(
514 run_action
515 )
516 step["type"] = "tool"
517 step["tool_id"] = tool_description["tool_id"]
518 step["tool_version"] = tool_description["tool_version"]
519 step["tool_hash"] = tool_description.get("tool_hash")
520 step["tool_uuid"] = tool_description.get("uuid")
521
522
523 class BaseConversionContext(object):
524
525 def __init__(self):
526 self.labels = {}
527 self.subworkflow_conversion_contexts = {}
528
529 def step_id(self, label_or_id):
530 if label_or_id in self.labels:
531 id = self.labels[label_or_id]
532 else:
533 id = label_or_id
534 return int(id)
535
536 def step_output(self, value):
537 value_parts = str(value).split("/")
538 if len(value_parts) == 1:
539 value_parts.append("output")
540 id = self.step_id(value_parts[0])
541 return id, value_parts[1]
542
543 def get_subworkflow_conversion_context(self, step):
544 # TODO: sometimes this method takes format2 steps and some times converted native ones
545 # (for input connections) - redo this so the type signature is stronger.
546 step_id = step.get("id")
547 run_action = step.get("run")
548 if self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
549 subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(run_action)
550 return subworkflow_conversion_context
551 if "content_id" in step:
552 subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(step["content_id"])
553 return subworkflow_conversion_context
554
555 if step_id not in self.subworkflow_conversion_contexts:
556
557 subworkflow_conversion_context = SubworkflowConversionContext(
558 self
559 )
560 self.subworkflow_conversion_contexts[step_id] = subworkflow_conversion_context
561 return self.subworkflow_conversion_contexts[step_id]
562
563 def get_runnable_description(self, run_action):
564 if "@import" in run_action:
565 if len(run_action) > 1:
566 raise Exception("@import must be only key if present.")
567
568 run_action_path = run_action["@import"]
569 runnable_path = os.path.join(self.workflow_directory, run_action_path)
570 with open(runnable_path, "r") as f:
571 runnable_description = ordered_load(f)
572 run_action = runnable_description
573
574 if not self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
575 run_action = self.graph_ids[run_action[1:]]
576
577 return run_action
578
579
580 class ConversionContext(BaseConversionContext):
581
582 def __init__(self, galaxy_interface, workflow_directory, import_options=None):
583 super(ConversionContext, self).__init__()
584 self.import_options = import_options or ImportOptions()
585 self.graph_ids = OrderedDict()
586 self.graph_id_subworkflow_conversion_contexts = {}
587 self.workflow_directory = workflow_directory
588 self.galaxy_interface = galaxy_interface
589
590 def register_runnable(self, run_action):
591 assert "id" in run_action
592 self.graph_ids[run_action["id"]] = run_action
593
594 def get_subworkflow_conversion_context_graph(self, graph_id):
595 if graph_id not in self.graph_id_subworkflow_conversion_contexts:
596 subworkflow_conversion_context = SubworkflowConversionContext(
597 self
598 )
599 self.graph_id_subworkflow_conversion_contexts[graph_id] = subworkflow_conversion_context
600 return self.graph_id_subworkflow_conversion_contexts[graph_id]
601
602
603 class SubworkflowConversionContext(BaseConversionContext):
604
605 def __init__(self, parent_context):
606 super(SubworkflowConversionContext, self).__init__()
607 self.parent_context = parent_context
608
609 @property
610 def graph_ids(self):
611 return self.parent_context.graph_ids
612
613 @property
614 def workflow_directory(self):
615 return self.parent_context.workflow_directory
616
617 @property
618 def import_options(self):
619 return self.parent_context.import_options
620
621 @property
622 def galaxy_interface(self):
623 return self.parent_context.galaxy_interface
624
625 def get_subworkflow_conversion_context_graph(self, graph_id):
626 return self.parent_context.get_subworkflow_conversion_context_graph(graph_id)
627
628
629 def _action(type, name, arguments={}):
630 return {
631 "action_arguments": arguments,
632 "action_type": type,
633 "output_name": name,
634 }
635
636
637 def _is_link(value):
638 return isinstance(value, dict) and "$link" in value
639
640
641 def _join_prefix(prefix, key):
642 if prefix:
643 new_key = "%s|%s" % (prefix, key)
644 else:
645 new_key = key
646 return new_key
647
648
649 def _init_connect_dict(step):
650 if "connect" not in step:
651 step["connect"] = {}
652
653 connect = step["connect"]
654 del step["connect"]
655
656 # handle CWL-style in dict connections.
657 if "in" in step:
658 step_in = step["in"]
659 assert isinstance(step_in, dict)
660 connection_keys = set()
661 for key, value in step_in.items():
662 # TODO: this can be a list right?
663 if isinstance(value, dict) and 'source' in value:
664 value = value["source"]
665 elif isinstance(value, dict) and 'default' in value:
666 continue
667 elif isinstance(value, dict):
668 raise KeyError('step input must define either source or default %s' % value)
669 connect[key] = [value]
670 connection_keys.add(key)
671
672 for key in connection_keys:
673 del step_in[key]
674
675 if len(step_in) == 0:
676 del step['in']
677
678 return connect
679
680
681 def _populate_input_connections(context, step, connect):
682 _ensure_inputs_connections(step)
683 input_connections = step["input_connections"]
684 is_subworkflow_step = step.get("type") == "subworkflow"
685
686 for key, values in connect.items():
687 input_connection_value = []
688 if not isinstance(values, list):
689 values = [values]
690 for value in values:
691 if not isinstance(value, dict):
692 if key == "$step":
693 value += "/__NO_INPUT_OUTPUT_NAME__"
694 id, output_name = context.step_output(value)
695 value = {"id": id, "output_name": output_name}
696 if is_subworkflow_step:
697 subworkflow_conversion_context = context.get_subworkflow_conversion_context(step)
698 input_subworkflow_step_id = subworkflow_conversion_context.step_id(key)
699 value["input_subworkflow_step_id"] = input_subworkflow_step_id
700 input_connection_value.append(value)
701 if key == "$step":
702 key = "__NO_INPUT_OUTPUT_NAME__"
703 input_connections[key] = input_connection_value
704
705
706 def _populate_annotation(step):
707 if "annotation" not in step and "doc" in step:
708 annotation = step.pop("doc")
709 step["annotation"] = annotation
710 elif "annotation" not in step:
711 step["annotation"] = ""
712
713
714 def _ensure_inputs_connections(step):
715 if "input_connections" not in step:
716 step["input_connections"] = {}
717
718
719 def _ensure_defaults(in_dict, defaults):
720 for key, value in defaults.items():
721 if key not in in_dict:
722 in_dict[key] = value
723
724
725 def _populate_tool_state(step, tool_state):
726 step["tool_state"] = json.dumps(tool_state)
727
728
729 def _convert_dict_to_id_list_if_needed(dict_or_list, add_label=False):
730 rval = dict_or_list
731 if isinstance(dict_or_list, dict):
732 rval = []
733 for key, value in dict_or_list.items():
734 if not isinstance(value, dict):
735 value = {"type": value}
736 if add_label:
737 value["label"] = key
738 else:
739 value["id"] = key
740 rval.append(value)
741 return rval
742
743
744 def main(argv):
745 print(json.dumps(yaml_to_workflow(argv[0])))
746
747
748 if __name__ == "__main__":
749 main(sys.argv)
750
751 __all__ = (
752 'yaml_to_workflow',
753 'python_to_workflow',
754 )