comparison env/lib/python3.9/site-packages/gxformat2/converter.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Functionality for converting a Format 2 workflow into a standard Galaxy workflow."""
2 from __future__ import print_function
3
4 import argparse
5 import copy
6 import json
7 import logging
8 import os
9 import sys
10 import uuid
11 from collections import OrderedDict
12 from typing import Dict, Optional
13
14 from ._labels import Labels
15 from .model import (
16 convert_dict_to_id_list_if_needed,
17 ensure_step_position,
18 inputs_as_native_steps,
19 with_step_ids,
20 )
21 from .yaml import ordered_load
22
23 SCRIPT_DESCRIPTION = """
24 Convert a Format 2 Galaxy workflow description into a native format.
25 """
26
27 # source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output
28 SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1"
29 STEP_TYPES = [
30 "subworkflow",
31 "data_input",
32 "data_collection_input",
33 "tool",
34 "pause",
35 "parameter_input",
36 ]
37
38 STEP_TYPE_ALIASES = {
39 'input': 'data_input',
40 'input_collection': 'data_collection_input',
41 'parameter': 'parameter_input',
42 }
43
44 RUN_ACTIONS_TO_STEPS = {
45 'GalaxyWorkflow': 'run_workflow_to_step',
46 'GalaxyTool': 'run_tool_to_step',
47 }
48
49 POST_JOB_ACTIONS = {
50 'hide': {
51 'action_class': "HideDatasetAction",
52 'default': False,
53 'arguments': lambda x: x,
54 },
55 'rename': {
56 'action_class': 'RenameDatasetAction',
57 'default': {},
58 'arguments': lambda x: {'newname': x},
59 },
60 'delete_intermediate_datasets': {
61 'action_class': 'DeleteIntermediatesAction',
62 'default': False,
63 'arguments': lambda x: x,
64 },
65 'change_datatype': {
66 'action_class': 'ChangeDatatypeAction',
67 'default': {},
68 'arguments': lambda x: {'newtype': x},
69 },
70 'set_columns': {
71 'action_class': 'ColumnSetAction',
72 'default': {},
73 'arguments': lambda x: x,
74 },
75 'add_tags': {
76 'action_class': 'TagDatasetAction',
77 'default': [],
78 'arguments': lambda x: {'tags': ",".join(x)},
79 },
80 'remove_tags': {
81 'action_class': 'RemoveTagDatasetAction',
82 'default': [],
83 'arguments': lambda x: {'tags': ",".join(x)},
84 },
85 }
86
87 log = logging.getLogger(__name__)
88
89
90 def rename_arg(argument):
91 return argument
92
93
94 def clean_connection(value):
95 if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS:
96 # Hope these are just used by Galaxy testing workflows and such, and not in production workflows.
97 log.warn("Legacy workflow syntax for connections [%s] will not be supported in the future" % value)
98 value = value.replace("#", "/", 1)
99 else:
100 return value
101
102
103 class ImportOptions(object):
104
105 def __init__(self):
106 self.deduplicate_subworkflows = False
107
108
109 def yaml_to_workflow(has_yaml, galaxy_interface, workflow_directory, import_options=None):
110 """Convert a Format 2 workflow into standard Galaxy format from supplied stream."""
111 as_python = ordered_load(has_yaml)
112 return python_to_workflow(as_python, galaxy_interface, workflow_directory, import_options=import_options)
113
114
115 def python_to_workflow(as_python, galaxy_interface, workflow_directory=None, import_options=None):
116 """Convert a Format 2 workflow into standard Galaxy format from supplied dictionary."""
117 if "yaml_content" in as_python:
118 as_python = ordered_load(as_python["yaml_content"])
119
120 if workflow_directory is None:
121 workflow_directory = os.path.abspath(".")
122
123 conversion_context = ConversionContext(
124 galaxy_interface,
125 workflow_directory,
126 import_options,
127 )
128 as_python = _preprocess_graphs(as_python, conversion_context)
129 subworkflows = None
130 if conversion_context.import_options.deduplicate_subworkflows:
131 # TODO: import only required workflows...
132 # TODO: dag sort these...
133 subworkflows = OrderedDict()
134 for graph_id, subworkflow_content in conversion_context.graph_ids.items():
135 if graph_id == "main":
136 continue
137 subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context_graph("#" + graph_id)
138 subworkflows[graph_id] = _python_to_workflow(copy.deepcopy(subworkflow_content), subworkflow_conversion_context)
139 converted = _python_to_workflow(as_python, conversion_context)
140 if subworkflows is not None:
141 converted["subworkflows"] = subworkflows
142 return converted
143
144
145 # move to a utils file?
146 def steps_as_list(format2_workflow: dict, add_ids: bool = False, inputs_offset: int = 0, mutate: bool = False):
147 """Return steps as a list, converting ID map to list representation if needed.
148
149 This method does mutate the supplied steps, try to make progress toward not doing this.
150
151 Add keys as labels instead of IDs. Why am I doing this?
152 """
153 if "steps" not in format2_workflow:
154 raise Exception("No 'steps' key in dict, keys are %s" % format2_workflow.keys())
155 steps = format2_workflow["steps"]
156 steps = convert_dict_to_id_list_if_needed(steps, add_label=True, mutate=mutate)
157 if add_ids:
158 if mutate:
159 _append_step_id_to_step_list_elements(steps, inputs_offset=inputs_offset)
160 else:
161 steps = with_step_ids(steps, inputs_offset=inputs_offset)
162 return steps
163
164
165 def _append_step_id_to_step_list_elements(steps: list, inputs_offset: int = 0):
166 assert isinstance(steps, list)
167 for i, step in enumerate(steps):
168 if "id" not in step:
169 step["id"] = i + inputs_offset
170 assert step["id"] is not None
171
172
173 def _python_to_workflow(as_python, conversion_context):
174
175 if "class" not in as_python:
176 raise Exception("This is not a not a valid Galaxy workflow definition, must define a class.")
177
178 if as_python["class"] != "GalaxyWorkflow":
179 raise Exception("This is not a not a valid Galaxy workflow definition, 'class' must be 'GalaxyWorkflow'.")
180
181 # .ga files don't have this, drop it so it isn't interpreted as a format 2 workflow.
182 as_python.pop("class")
183
184 _ensure_defaults(as_python, {
185 "a_galaxy_workflow": "true",
186 "format-version": "0.1",
187 "name": "Workflow",
188 "uuid": str(uuid.uuid4()),
189 })
190 _populate_annotation(as_python)
191
192 steps = steps_as_list(as_python, mutate=True)
193
194 convert_inputs_to_steps(as_python, steps)
195
196 if isinstance(steps, list):
197 _append_step_id_to_step_list_elements(steps)
198 steps_as_dict = OrderedDict()
199 for i, step in enumerate(steps):
200 steps_as_dict[str(i)] = step
201 if "label" in step:
202 label = step["label"]
203 conversion_context.labels[label] = i
204
205 # TODO: this really should be optional in Galaxy API.
206 ensure_step_position(step, i)
207
208 as_python["steps"] = steps_as_dict
209 steps = steps_as_dict
210
211 for step in steps.values():
212 step_type = step.get("type", None)
213 if "run" in step:
214 if step_type is not None:
215 raise Exception("Steps specified as run actions cannot specify a type.")
216 run_action = step.get("run")
217 run_action = conversion_context.get_runnable_description(run_action)
218 if isinstance(run_action, dict):
219 run_class = run_action["class"]
220 run_to_step_function = eval(RUN_ACTIONS_TO_STEPS[run_class])
221
222 run_to_step_function(conversion_context, step, run_action)
223 else:
224 step["content_id"] = run_action
225 step["type"] = "subworkflow"
226 del step["run"]
227
228 for step in steps.values():
229 step_type = step.get("type", "tool")
230 step_type = STEP_TYPE_ALIASES.get(step_type, step_type)
231 if step_type not in STEP_TYPES:
232 raise Exception("Unknown step type encountered %s" % step_type)
233 step["type"] = step_type
234 eval("transform_%s" % step_type)(conversion_context, step)
235
236 outputs = as_python.pop("outputs", [])
237 outputs = convert_dict_to_id_list_if_needed(outputs)
238
239 for output in outputs:
240 assert isinstance(output, dict), "Output definition must be dictionary"
241 assert "source" in output or "outputSource" in output, "Output definition must specify source"
242
243 if "label" in output and "id" in output:
244 raise Exception("label and id are aliases for outputs, may only define one")
245 if "label" not in output and "id" not in output:
246 label = ""
247
248 raw_label = output.pop("label", None)
249 raw_id = output.pop("id", None)
250 label = raw_label or raw_id
251 if Labels.is_anonymous_output_label(label):
252 label = None
253 source = clean_connection(output.get("outputSource"))
254 if source is None and SUPPORT_LEGACY_CONNECTIONS:
255 source = output.get("source").replace("#", "/", 1)
256 id, output_name = conversion_context.step_output(source)
257 step = steps[str(id)]
258 workflow_output = {
259 "output_name": output_name,
260 "label": label,
261 "uuid": output.get("uuid", None)
262 }
263 if "workflow_outputs" not in step:
264 step["workflow_outputs"] = []
265 step["workflow_outputs"].append(workflow_output)
266
267 return as_python
268
269
270 def _preprocess_graphs(as_python, conversion_context):
271 if not isinstance(as_python, dict):
272 raise Exception("This is not a not a valid Galaxy workflow definition.")
273
274 format_version = as_python.get("format-version", "v2.0")
275 assert format_version == "v2.0"
276
277 if "class" not in as_python and "$graph" in as_python:
278 for subworkflow in as_python["$graph"]:
279 if not isinstance(subworkflow, dict):
280 raise Exception("Malformed workflow content in $graph")
281 if "id" not in subworkflow:
282 raise Exception("No subworkflow ID found for entry in $graph.")
283 subworkflow_id = subworkflow["id"]
284 if subworkflow_id == "main":
285 as_python = subworkflow
286
287 conversion_context.register_runnable(subworkflow)
288
289 return as_python
290
291
292 def convert_inputs_to_steps(workflow_dict: dict, steps: list):
293 """Convert workflow inputs to a steps in array - like in native Galaxy.
294
295 workflow_dict is a Format 2 representation of a workflow and steps is a
296 list of steps. This method will prepend all the inputs as as steps to the
297 steps list. This method modifies both workflow_dict and steps.
298 """
299 if "inputs" not in workflow_dict:
300 return
301
302 input_steps = inputs_as_native_steps(workflow_dict)
303 workflow_dict.pop("inputs")
304 for i, new_step in enumerate(input_steps):
305 steps.insert(i, new_step)
306
307
308 def run_workflow_to_step(conversion_context, step, run_action):
309 step["type"] = "subworkflow"
310 if conversion_context.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
311 step["content_id"] = run_action
312 else:
313 subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context(step)
314 step["subworkflow"] = _python_to_workflow(
315 copy.deepcopy(run_action),
316 subworkflow_conversion_context,
317 )
318
319
320 def _is_graph_id_reference(run_action):
321 return run_action and not isinstance(run_action, dict)
322
323
324 def transform_data_input(context, step):
325 transform_input(context, step, default_name="Input dataset")
326
327
328 def transform_data_collection_input(context, step):
329 transform_input(context, step, default_name="Input dataset collection")
330
331
332 def transform_parameter_input(context, step):
333 transform_input(context, step, default_name="input_parameter")
334
335
336 def transform_input(context, step, default_name):
337 default_name = step.get("label", default_name)
338 _populate_annotation(step)
339 _ensure_inputs_connections(step)
340
341 if "inputs" not in step:
342 step["inputs"] = [{}]
343
344 step_inputs = step["inputs"][0]
345 if "name" in step_inputs:
346 name = step_inputs["name"]
347 else:
348 name = default_name
349
350 _ensure_defaults(step_inputs, {
351 "name": name,
352 "description": "",
353 })
354 tool_state = {
355 "name": name
356 }
357 for attrib in ["collection_type", "parameter_type", "optional", "default", "format", "restrictions", "restrictOnConnections", "suggestions"]:
358 if attrib in step:
359 tool_state[attrib] = step[attrib]
360
361 _populate_tool_state(step, tool_state)
362
363
364 def transform_pause(context, step, default_name="Pause for dataset review"):
365 default_name = step.get("label", default_name)
366 _populate_annotation(step)
367
368 _ensure_inputs_connections(step)
369
370 if "inputs" not in step:
371 step["inputs"] = [{}]
372
373 step_inputs = step["inputs"][0]
374 if "name" in step_inputs:
375 name = step_inputs["name"]
376 else:
377 name = default_name
378
379 _ensure_defaults(step_inputs, {
380 "name": name,
381 })
382 tool_state = {
383 "name": name
384 }
385
386 connect = _init_connect_dict(step)
387 _populate_input_connections(context, step, connect)
388 _populate_tool_state(step, tool_state)
389
390
391 def transform_subworkflow(context, step):
392 _populate_annotation(step)
393
394 _ensure_inputs_connections(step)
395
396 tool_state = {
397 }
398
399 connect = _init_connect_dict(step)
400 _populate_input_connections(context, step, connect)
401 _populate_tool_state(step, tool_state)
402
403
404 def _runtime_value():
405 return {"__class__": "RuntimeValue"}
406
407
408 def transform_tool(context, step):
409 if "tool_id" not in step:
410 raise Exception("Tool steps must define a tool_id.")
411
412 _ensure_defaults(step, {
413 "name": step['tool_id'],
414 "post_job_actions": {},
415 "tool_version": None,
416 })
417 post_job_actions = step["post_job_actions"]
418 _populate_annotation(step)
419
420 tool_state = {
421 # TODO: Galaxy should not require tool state actually specify a __page__.
422 "__page__": 0,
423 }
424
425 connect = _init_connect_dict(step)
426
427 def append_link(key, value):
428 if key not in connect:
429 connect[key] = []
430 assert "$link" in value
431 link_value = value["$link"]
432 connect[key].append(clean_connection(link_value))
433
434 def replace_links(value, key=""):
435 if _is_link(value):
436 append_link(key, value)
437 # Filled in by the connection, so to force late
438 # validation of the field just mark as RuntimeValue.
439 # It would be better I guess if this were some other
440 # value dedicated to this purpose (e.g. a ficitious
441 # {"__class__": "ConnectedValue"}) that could be further
442 # validated by Galaxy.
443 return _runtime_value()
444 if isinstance(value, dict):
445 new_values = {}
446 for k, v in value.items():
447 new_key = _join_prefix(key, k)
448 new_values[k] = replace_links(v, new_key)
449 return new_values
450 elif isinstance(value, list):
451 new_values = []
452 for i, v in enumerate(value):
453 # If we are a repeat we need to modify the key
454 # but not if values are actually $links.
455 if _is_link(v):
456 append_link(key, v)
457 new_values.append(None)
458 else:
459 new_key = "%s_%d" % (key, i)
460 new_values.append(replace_links(v, new_key))
461 return new_values
462 else:
463 return value
464
465 # TODO: handle runtime inputs and state together.
466 runtime_inputs = step.get("runtime_inputs", [])
467 if "state" in step or runtime_inputs:
468 step_state = step.pop("state", {})
469 step_state = replace_links(step_state)
470
471 for key, value in step_state.items():
472 tool_state[key] = json.dumps(value)
473 for runtime_input in runtime_inputs:
474 tool_state[runtime_input] = json.dumps(_runtime_value())
475 elif "tool_state" in step:
476 tool_state.update(step.get("tool_state"))
477
478 # Fill in input connections
479 _populate_input_connections(context, step, connect)
480
481 _populate_tool_state(step, tool_state)
482
483 # Handle outputs.
484 out = step.pop("out", None)
485 if out is None:
486 # Handle LEGACY 19.XX outputs key.
487 out = step.pop("outputs", [])
488 out = convert_dict_to_id_list_if_needed(out)
489 for output in out:
490 name = output["id"]
491 for action_key, action_dict in POST_JOB_ACTIONS.items():
492 action_argument = output.get(action_key, action_dict['default'])
493 if action_argument:
494 action_class = action_dict['action_class']
495 action_name = action_class + name
496 action = _action(
497 action_class,
498 name,
499 arguments=action_dict['arguments'](action_argument)
500 )
501 post_job_actions[action_name] = action
502
503
504 def run_tool_to_step(conversion_context, step, run_action):
505 tool_description = conversion_context.galaxy_interface.import_tool(
506 run_action
507 )
508 step["type"] = "tool"
509 step["tool_id"] = tool_description["tool_id"]
510 step["tool_version"] = tool_description["tool_version"]
511 step["tool_hash"] = tool_description.get("tool_hash")
512 step["tool_uuid"] = tool_description.get("uuid")
513
514
515 class BaseConversionContext(object):
516
517 def __init__(self):
518 self.labels = {}
519 self.subworkflow_conversion_contexts = {}
520
521 def step_id(self, label_or_id):
522 if label_or_id in self.labels:
523 id = self.labels[label_or_id]
524 else:
525 id = label_or_id
526 return int(id)
527
528 def step_output(self, value):
529 value_parts = str(value).split("/")
530 if len(value_parts) == 1:
531 value_parts.append("output")
532 id = self.step_id(value_parts[0])
533 return id, value_parts[1]
534
535 def get_subworkflow_conversion_context(self, step):
536 # TODO: sometimes this method takes format2 steps and some times converted native ones
537 # (for input connections) - redo this so the type signature is stronger.
538 step_id = step.get("id")
539 run_action = step.get("run")
540 if self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
541 subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(run_action)
542 return subworkflow_conversion_context
543 if "content_id" in step:
544 subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(step["content_id"])
545 return subworkflow_conversion_context
546
547 if step_id not in self.subworkflow_conversion_contexts:
548
549 subworkflow_conversion_context = SubworkflowConversionContext(
550 self
551 )
552 self.subworkflow_conversion_contexts[step_id] = subworkflow_conversion_context
553 return self.subworkflow_conversion_contexts[step_id]
554
555 def get_runnable_description(self, run_action):
556 if "@import" in run_action:
557 if len(run_action) > 1:
558 raise Exception("@import must be only key if present.")
559
560 run_action_path = run_action["@import"]
561 runnable_path = os.path.join(self.workflow_directory, run_action_path)
562 with open(runnable_path, "r") as f:
563 runnable_description = ordered_load(f)
564 run_action = runnable_description
565
566 if not self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
567 run_action = self.graph_ids[run_action[1:]]
568
569 return run_action
570
571
572 class ConversionContext(BaseConversionContext):
573
574 def __init__(self, galaxy_interface, workflow_directory, import_options: Optional[ImportOptions] = None):
575 super(ConversionContext, self).__init__()
576 self.import_options = import_options or ImportOptions()
577 self.graph_ids = OrderedDict() # type: Dict
578 self.graph_id_subworkflow_conversion_contexts = {} # type: Dict
579 self.workflow_directory = workflow_directory
580 self.galaxy_interface = galaxy_interface
581
582 def register_runnable(self, run_action):
583 assert "id" in run_action
584 self.graph_ids[run_action["id"]] = run_action
585
586 def get_subworkflow_conversion_context_graph(self, graph_id):
587 if graph_id not in self.graph_id_subworkflow_conversion_contexts:
588 subworkflow_conversion_context = SubworkflowConversionContext(
589 self
590 )
591 self.graph_id_subworkflow_conversion_contexts[graph_id] = subworkflow_conversion_context
592 return self.graph_id_subworkflow_conversion_contexts[graph_id]
593
594
595 class SubworkflowConversionContext(BaseConversionContext):
596
597 def __init__(self, parent_context):
598 super(SubworkflowConversionContext, self).__init__()
599 self.parent_context = parent_context
600
601 @property
602 def graph_ids(self):
603 return self.parent_context.graph_ids
604
605 @property
606 def workflow_directory(self):
607 return self.parent_context.workflow_directory
608
609 @property
610 def import_options(self):
611 return self.parent_context.import_options
612
613 @property
614 def galaxy_interface(self):
615 return self.parent_context.galaxy_interface
616
617 def get_subworkflow_conversion_context_graph(self, graph_id):
618 return self.parent_context.get_subworkflow_conversion_context_graph(graph_id)
619
620
621 def _action(type, name, arguments={}):
622 return {
623 "action_arguments": arguments,
624 "action_type": type,
625 "output_name": name,
626 }
627
628
629 def _is_link(value):
630 return isinstance(value, dict) and "$link" in value
631
632
633 def _join_prefix(prefix, key):
634 if prefix:
635 new_key = "%s|%s" % (prefix, key)
636 else:
637 new_key = key
638 return new_key
639
640
641 def _init_connect_dict(step):
642 if "connect" not in step:
643 step["connect"] = {}
644
645 connect = step["connect"]
646 del step["connect"]
647
648 # handle CWL-style in dict connections.
649 if "in" in step:
650 step_in = step["in"]
651 assert isinstance(step_in, dict)
652 connection_keys = set()
653 for key, value in step_in.items():
654 # TODO: this can be a list right?
655 if isinstance(value, dict) and 'source' in value:
656 value = value["source"]
657 elif isinstance(value, dict) and 'default' in value:
658 continue
659 elif isinstance(value, dict):
660 raise KeyError('step input must define either source or default %s' % value)
661 connect[key] = [value]
662 connection_keys.add(key)
663
664 for key in connection_keys:
665 del step_in[key]
666
667 if len(step_in) == 0:
668 del step['in']
669
670 return connect
671
672
673 def _populate_input_connections(context, step, connect):
674 _ensure_inputs_connections(step)
675 input_connections = step["input_connections"]
676 is_subworkflow_step = step.get("type") == "subworkflow"
677
678 for key, values in connect.items():
679 input_connection_value = []
680 if not isinstance(values, list):
681 values = [values]
682 for value in values:
683 if not isinstance(value, dict):
684 if key == "$step":
685 value += "/__NO_INPUT_OUTPUT_NAME__"
686 id, output_name = context.step_output(value)
687 value = {"id": id, "output_name": output_name}
688 if is_subworkflow_step:
689 subworkflow_conversion_context = context.get_subworkflow_conversion_context(step)
690 input_subworkflow_step_id = subworkflow_conversion_context.step_id(key)
691 value["input_subworkflow_step_id"] = input_subworkflow_step_id
692 input_connection_value.append(value)
693 if key == "$step":
694 key = "__NO_INPUT_OUTPUT_NAME__"
695 input_connections[key] = input_connection_value
696
697
698 def _populate_annotation(step):
699 if "annotation" not in step and "doc" in step:
700 annotation = step.pop("doc")
701 step["annotation"] = annotation
702 elif "annotation" not in step:
703 step["annotation"] = ""
704
705
706 def _ensure_inputs_connections(step):
707 if "input_connections" not in step:
708 step["input_connections"] = {}
709
710
711 def _ensure_defaults(in_dict, defaults):
712 for key, value in defaults.items():
713 if key not in in_dict:
714 in_dict[key] = value
715
716
717 def _populate_tool_state(step, tool_state):
718 step["tool_state"] = json.dumps(tool_state)
719
720
721 def main(argv=None):
722 """Entry point for script to conversion from Format 2 interface."""
723 if argv is None:
724 argv = sys.argv[1:]
725
726 args = _parser().parse_args(argv)
727
728 format2_path = args.input_path
729 output_path = args.output_path or (format2_path + ".gxwf.yml")
730
731 workflow_directory = os.path.abspath(format2_path)
732 galaxy_interface = None
733
734 with open(format2_path, "r") as f:
735 has_workflow = ordered_load(f)
736
737 output = python_to_workflow(has_workflow, galaxy_interface=galaxy_interface, workflow_directory=workflow_directory)
738 with open(output_path, "w") as f:
739 json.dump(output, f, indent=4)
740
741
742 def _parser():
743 parser = argparse.ArgumentParser(description=SCRIPT_DESCRIPTION)
744 parser.add_argument('input_path', metavar='INPUT', type=str,
745 help='input workflow path (.ga)')
746 parser.add_argument('output_path', metavar='OUTPUT', type=str, nargs="?",
747 help='output workflow path (.gxfw.yml)')
748 return parser
749
750
751 if __name__ == "__main__":
752 main(sys.argv)
753
754
755 __all__ = (
756 'main',
757 'python_to_workflow',
758 'yaml_to_workflow',
759 )