Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/gxformat2/converter.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """Functionality for converting a Format 2 workflow into a standard Galaxy workflow.""" | |
| 2 from __future__ import print_function | |
| 3 | |
| 4 import argparse | |
| 5 import copy | |
| 6 import json | |
| 7 import logging | |
| 8 import os | |
| 9 import sys | |
| 10 import uuid | |
| 11 from collections import OrderedDict | |
| 12 from typing import Dict, Optional | |
| 13 | |
| 14 from ._labels import Labels | |
| 15 from .model import ( | |
| 16 convert_dict_to_id_list_if_needed, | |
| 17 ensure_step_position, | |
| 18 inputs_as_native_steps, | |
| 19 with_step_ids, | |
| 20 ) | |
| 21 from .yaml import ordered_load | |
| 22 | |
| 23 SCRIPT_DESCRIPTION = """ | |
| 24 Convert a Format 2 Galaxy workflow description into a native format. | |
| 25 """ | |
| 26 | |
| 27 # source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output | |
| 28 SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1" | |
| 29 STEP_TYPES = [ | |
| 30 "subworkflow", | |
| 31 "data_input", | |
| 32 "data_collection_input", | |
| 33 "tool", | |
| 34 "pause", | |
| 35 "parameter_input", | |
| 36 ] | |
| 37 | |
| 38 STEP_TYPE_ALIASES = { | |
| 39 'input': 'data_input', | |
| 40 'input_collection': 'data_collection_input', | |
| 41 'parameter': 'parameter_input', | |
| 42 } | |
| 43 | |
| 44 RUN_ACTIONS_TO_STEPS = { | |
| 45 'GalaxyWorkflow': 'run_workflow_to_step', | |
| 46 'GalaxyTool': 'run_tool_to_step', | |
| 47 } | |
| 48 | |
| 49 POST_JOB_ACTIONS = { | |
| 50 'hide': { | |
| 51 'action_class': "HideDatasetAction", | |
| 52 'default': False, | |
| 53 'arguments': lambda x: x, | |
| 54 }, | |
| 55 'rename': { | |
| 56 'action_class': 'RenameDatasetAction', | |
| 57 'default': {}, | |
| 58 'arguments': lambda x: {'newname': x}, | |
| 59 }, | |
| 60 'delete_intermediate_datasets': { | |
| 61 'action_class': 'DeleteIntermediatesAction', | |
| 62 'default': False, | |
| 63 'arguments': lambda x: x, | |
| 64 }, | |
| 65 'change_datatype': { | |
| 66 'action_class': 'ChangeDatatypeAction', | |
| 67 'default': {}, | |
| 68 'arguments': lambda x: {'newtype': x}, | |
| 69 }, | |
| 70 'set_columns': { | |
| 71 'action_class': 'ColumnSetAction', | |
| 72 'default': {}, | |
| 73 'arguments': lambda x: x, | |
| 74 }, | |
| 75 'add_tags': { | |
| 76 'action_class': 'TagDatasetAction', | |
| 77 'default': [], | |
| 78 'arguments': lambda x: {'tags': ",".join(x)}, | |
| 79 }, | |
| 80 'remove_tags': { | |
| 81 'action_class': 'RemoveTagDatasetAction', | |
| 82 'default': [], | |
| 83 'arguments': lambda x: {'tags': ",".join(x)}, | |
| 84 }, | |
| 85 } | |
| 86 | |
| 87 log = logging.getLogger(__name__) | |
| 88 | |
| 89 | |
| 90 def rename_arg(argument): | |
| 91 return argument | |
| 92 | |
| 93 | |
| 94 def clean_connection(value): | |
| 95 if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS: | |
| 96 # Hope these are just used by Galaxy testing workflows and such, and not in production workflows. | |
| 97 log.warn("Legacy workflow syntax for connections [%s] will not be supported in the future" % value) | |
| 98 value = value.replace("#", "/", 1) | |
| 99 else: | |
| 100 return value | |
| 101 | |
| 102 | |
| 103 class ImportOptions(object): | |
| 104 | |
| 105 def __init__(self): | |
| 106 self.deduplicate_subworkflows = False | |
| 107 | |
| 108 | |
| 109 def yaml_to_workflow(has_yaml, galaxy_interface, workflow_directory, import_options=None): | |
| 110 """Convert a Format 2 workflow into standard Galaxy format from supplied stream.""" | |
| 111 as_python = ordered_load(has_yaml) | |
| 112 return python_to_workflow(as_python, galaxy_interface, workflow_directory, import_options=import_options) | |
| 113 | |
| 114 | |
| 115 def python_to_workflow(as_python, galaxy_interface, workflow_directory=None, import_options=None): | |
| 116 """Convert a Format 2 workflow into standard Galaxy format from supplied dictionary.""" | |
| 117 if "yaml_content" in as_python: | |
| 118 as_python = ordered_load(as_python["yaml_content"]) | |
| 119 | |
| 120 if workflow_directory is None: | |
| 121 workflow_directory = os.path.abspath(".") | |
| 122 | |
| 123 conversion_context = ConversionContext( | |
| 124 galaxy_interface, | |
| 125 workflow_directory, | |
| 126 import_options, | |
| 127 ) | |
| 128 as_python = _preprocess_graphs(as_python, conversion_context) | |
| 129 subworkflows = None | |
| 130 if conversion_context.import_options.deduplicate_subworkflows: | |
| 131 # TODO: import only required workflows... | |
| 132 # TODO: dag sort these... | |
| 133 subworkflows = OrderedDict() | |
| 134 for graph_id, subworkflow_content in conversion_context.graph_ids.items(): | |
| 135 if graph_id == "main": | |
| 136 continue | |
| 137 subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context_graph("#" + graph_id) | |
| 138 subworkflows[graph_id] = _python_to_workflow(copy.deepcopy(subworkflow_content), subworkflow_conversion_context) | |
| 139 converted = _python_to_workflow(as_python, conversion_context) | |
| 140 if subworkflows is not None: | |
| 141 converted["subworkflows"] = subworkflows | |
| 142 return converted | |
| 143 | |
| 144 | |
| 145 # move to a utils file? | |
| 146 def steps_as_list(format2_workflow: dict, add_ids: bool = False, inputs_offset: int = 0, mutate: bool = False): | |
| 147 """Return steps as a list, converting ID map to list representation if needed. | |
| 148 | |
| 149 This method does mutate the supplied steps, try to make progress toward not doing this. | |
| 150 | |
| 151 Add keys as labels instead of IDs. Why am I doing this? | |
| 152 """ | |
| 153 if "steps" not in format2_workflow: | |
| 154 raise Exception("No 'steps' key in dict, keys are %s" % format2_workflow.keys()) | |
| 155 steps = format2_workflow["steps"] | |
| 156 steps = convert_dict_to_id_list_if_needed(steps, add_label=True, mutate=mutate) | |
| 157 if add_ids: | |
| 158 if mutate: | |
| 159 _append_step_id_to_step_list_elements(steps, inputs_offset=inputs_offset) | |
| 160 else: | |
| 161 steps = with_step_ids(steps, inputs_offset=inputs_offset) | |
| 162 return steps | |
| 163 | |
| 164 | |
| 165 def _append_step_id_to_step_list_elements(steps: list, inputs_offset: int = 0): | |
| 166 assert isinstance(steps, list) | |
| 167 for i, step in enumerate(steps): | |
| 168 if "id" not in step: | |
| 169 step["id"] = i + inputs_offset | |
| 170 assert step["id"] is not None | |
| 171 | |
| 172 | |
| 173 def _python_to_workflow(as_python, conversion_context): | |
| 174 | |
| 175 if "class" not in as_python: | |
| 176 raise Exception("This is not a not a valid Galaxy workflow definition, must define a class.") | |
| 177 | |
| 178 if as_python["class"] != "GalaxyWorkflow": | |
| 179 raise Exception("This is not a not a valid Galaxy workflow definition, 'class' must be 'GalaxyWorkflow'.") | |
| 180 | |
| 181 # .ga files don't have this, drop it so it isn't interpreted as a format 2 workflow. | |
| 182 as_python.pop("class") | |
| 183 | |
| 184 _ensure_defaults(as_python, { | |
| 185 "a_galaxy_workflow": "true", | |
| 186 "format-version": "0.1", | |
| 187 "name": "Workflow", | |
| 188 "uuid": str(uuid.uuid4()), | |
| 189 }) | |
| 190 _populate_annotation(as_python) | |
| 191 | |
| 192 steps = steps_as_list(as_python, mutate=True) | |
| 193 | |
| 194 convert_inputs_to_steps(as_python, steps) | |
| 195 | |
| 196 if isinstance(steps, list): | |
| 197 _append_step_id_to_step_list_elements(steps) | |
| 198 steps_as_dict = OrderedDict() | |
| 199 for i, step in enumerate(steps): | |
| 200 steps_as_dict[str(i)] = step | |
| 201 if "label" in step: | |
| 202 label = step["label"] | |
| 203 conversion_context.labels[label] = i | |
| 204 | |
| 205 # TODO: this really should be optional in Galaxy API. | |
| 206 ensure_step_position(step, i) | |
| 207 | |
| 208 as_python["steps"] = steps_as_dict | |
| 209 steps = steps_as_dict | |
| 210 | |
| 211 for step in steps.values(): | |
| 212 step_type = step.get("type", None) | |
| 213 if "run" in step: | |
| 214 if step_type is not None: | |
| 215 raise Exception("Steps specified as run actions cannot specify a type.") | |
| 216 run_action = step.get("run") | |
| 217 run_action = conversion_context.get_runnable_description(run_action) | |
| 218 if isinstance(run_action, dict): | |
| 219 run_class = run_action["class"] | |
| 220 run_to_step_function = eval(RUN_ACTIONS_TO_STEPS[run_class]) | |
| 221 | |
| 222 run_to_step_function(conversion_context, step, run_action) | |
| 223 else: | |
| 224 step["content_id"] = run_action | |
| 225 step["type"] = "subworkflow" | |
| 226 del step["run"] | |
| 227 | |
| 228 for step in steps.values(): | |
| 229 step_type = step.get("type", "tool") | |
| 230 step_type = STEP_TYPE_ALIASES.get(step_type, step_type) | |
| 231 if step_type not in STEP_TYPES: | |
| 232 raise Exception("Unknown step type encountered %s" % step_type) | |
| 233 step["type"] = step_type | |
| 234 eval("transform_%s" % step_type)(conversion_context, step) | |
| 235 | |
| 236 outputs = as_python.pop("outputs", []) | |
| 237 outputs = convert_dict_to_id_list_if_needed(outputs) | |
| 238 | |
| 239 for output in outputs: | |
| 240 assert isinstance(output, dict), "Output definition must be dictionary" | |
| 241 assert "source" in output or "outputSource" in output, "Output definition must specify source" | |
| 242 | |
| 243 if "label" in output and "id" in output: | |
| 244 raise Exception("label and id are aliases for outputs, may only define one") | |
| 245 if "label" not in output and "id" not in output: | |
| 246 label = "" | |
| 247 | |
| 248 raw_label = output.pop("label", None) | |
| 249 raw_id = output.pop("id", None) | |
| 250 label = raw_label or raw_id | |
| 251 if Labels.is_anonymous_output_label(label): | |
| 252 label = None | |
| 253 source = clean_connection(output.get("outputSource")) | |
| 254 if source is None and SUPPORT_LEGACY_CONNECTIONS: | |
| 255 source = output.get("source").replace("#", "/", 1) | |
| 256 id, output_name = conversion_context.step_output(source) | |
| 257 step = steps[str(id)] | |
| 258 workflow_output = { | |
| 259 "output_name": output_name, | |
| 260 "label": label, | |
| 261 "uuid": output.get("uuid", None) | |
| 262 } | |
| 263 if "workflow_outputs" not in step: | |
| 264 step["workflow_outputs"] = [] | |
| 265 step["workflow_outputs"].append(workflow_output) | |
| 266 | |
| 267 return as_python | |
| 268 | |
| 269 | |
| 270 def _preprocess_graphs(as_python, conversion_context): | |
| 271 if not isinstance(as_python, dict): | |
| 272 raise Exception("This is not a not a valid Galaxy workflow definition.") | |
| 273 | |
| 274 format_version = as_python.get("format-version", "v2.0") | |
| 275 assert format_version == "v2.0" | |
| 276 | |
| 277 if "class" not in as_python and "$graph" in as_python: | |
| 278 for subworkflow in as_python["$graph"]: | |
| 279 if not isinstance(subworkflow, dict): | |
| 280 raise Exception("Malformed workflow content in $graph") | |
| 281 if "id" not in subworkflow: | |
| 282 raise Exception("No subworkflow ID found for entry in $graph.") | |
| 283 subworkflow_id = subworkflow["id"] | |
| 284 if subworkflow_id == "main": | |
| 285 as_python = subworkflow | |
| 286 | |
| 287 conversion_context.register_runnable(subworkflow) | |
| 288 | |
| 289 return as_python | |
| 290 | |
| 291 | |
| 292 def convert_inputs_to_steps(workflow_dict: dict, steps: list): | |
| 293 """Convert workflow inputs to a steps in array - like in native Galaxy. | |
| 294 | |
| 295 workflow_dict is a Format 2 representation of a workflow and steps is a | |
| 296 list of steps. This method will prepend all the inputs as as steps to the | |
| 297 steps list. This method modifies both workflow_dict and steps. | |
| 298 """ | |
| 299 if "inputs" not in workflow_dict: | |
| 300 return | |
| 301 | |
| 302 input_steps = inputs_as_native_steps(workflow_dict) | |
| 303 workflow_dict.pop("inputs") | |
| 304 for i, new_step in enumerate(input_steps): | |
| 305 steps.insert(i, new_step) | |
| 306 | |
| 307 | |
| 308 def run_workflow_to_step(conversion_context, step, run_action): | |
| 309 step["type"] = "subworkflow" | |
| 310 if conversion_context.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action): | |
| 311 step["content_id"] = run_action | |
| 312 else: | |
| 313 subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context(step) | |
| 314 step["subworkflow"] = _python_to_workflow( | |
| 315 copy.deepcopy(run_action), | |
| 316 subworkflow_conversion_context, | |
| 317 ) | |
| 318 | |
| 319 | |
| 320 def _is_graph_id_reference(run_action): | |
| 321 return run_action and not isinstance(run_action, dict) | |
| 322 | |
| 323 | |
| 324 def transform_data_input(context, step): | |
| 325 transform_input(context, step, default_name="Input dataset") | |
| 326 | |
| 327 | |
| 328 def transform_data_collection_input(context, step): | |
| 329 transform_input(context, step, default_name="Input dataset collection") | |
| 330 | |
| 331 | |
| 332 def transform_parameter_input(context, step): | |
| 333 transform_input(context, step, default_name="input_parameter") | |
| 334 | |
| 335 | |
| 336 def transform_input(context, step, default_name): | |
| 337 default_name = step.get("label", default_name) | |
| 338 _populate_annotation(step) | |
| 339 _ensure_inputs_connections(step) | |
| 340 | |
| 341 if "inputs" not in step: | |
| 342 step["inputs"] = [{}] | |
| 343 | |
| 344 step_inputs = step["inputs"][0] | |
| 345 if "name" in step_inputs: | |
| 346 name = step_inputs["name"] | |
| 347 else: | |
| 348 name = default_name | |
| 349 | |
| 350 _ensure_defaults(step_inputs, { | |
| 351 "name": name, | |
| 352 "description": "", | |
| 353 }) | |
| 354 tool_state = { | |
| 355 "name": name | |
| 356 } | |
| 357 for attrib in ["collection_type", "parameter_type", "optional", "default", "format", "restrictions", "restrictOnConnections", "suggestions"]: | |
| 358 if attrib in step: | |
| 359 tool_state[attrib] = step[attrib] | |
| 360 | |
| 361 _populate_tool_state(step, tool_state) | |
| 362 | |
| 363 | |
| 364 def transform_pause(context, step, default_name="Pause for dataset review"): | |
| 365 default_name = step.get("label", default_name) | |
| 366 _populate_annotation(step) | |
| 367 | |
| 368 _ensure_inputs_connections(step) | |
| 369 | |
| 370 if "inputs" not in step: | |
| 371 step["inputs"] = [{}] | |
| 372 | |
| 373 step_inputs = step["inputs"][0] | |
| 374 if "name" in step_inputs: | |
| 375 name = step_inputs["name"] | |
| 376 else: | |
| 377 name = default_name | |
| 378 | |
| 379 _ensure_defaults(step_inputs, { | |
| 380 "name": name, | |
| 381 }) | |
| 382 tool_state = { | |
| 383 "name": name | |
| 384 } | |
| 385 | |
| 386 connect = _init_connect_dict(step) | |
| 387 _populate_input_connections(context, step, connect) | |
| 388 _populate_tool_state(step, tool_state) | |
| 389 | |
| 390 | |
| 391 def transform_subworkflow(context, step): | |
| 392 _populate_annotation(step) | |
| 393 | |
| 394 _ensure_inputs_connections(step) | |
| 395 | |
| 396 tool_state = { | |
| 397 } | |
| 398 | |
| 399 connect = _init_connect_dict(step) | |
| 400 _populate_input_connections(context, step, connect) | |
| 401 _populate_tool_state(step, tool_state) | |
| 402 | |
| 403 | |
| 404 def _runtime_value(): | |
| 405 return {"__class__": "RuntimeValue"} | |
| 406 | |
| 407 | |
| 408 def transform_tool(context, step): | |
| 409 if "tool_id" not in step: | |
| 410 raise Exception("Tool steps must define a tool_id.") | |
| 411 | |
| 412 _ensure_defaults(step, { | |
| 413 "name": step['tool_id'], | |
| 414 "post_job_actions": {}, | |
| 415 "tool_version": None, | |
| 416 }) | |
| 417 post_job_actions = step["post_job_actions"] | |
| 418 _populate_annotation(step) | |
| 419 | |
| 420 tool_state = { | |
| 421 # TODO: Galaxy should not require tool state actually specify a __page__. | |
| 422 "__page__": 0, | |
| 423 } | |
| 424 | |
| 425 connect = _init_connect_dict(step) | |
| 426 | |
| 427 def append_link(key, value): | |
| 428 if key not in connect: | |
| 429 connect[key] = [] | |
| 430 assert "$link" in value | |
| 431 link_value = value["$link"] | |
| 432 connect[key].append(clean_connection(link_value)) | |
| 433 | |
| 434 def replace_links(value, key=""): | |
| 435 if _is_link(value): | |
| 436 append_link(key, value) | |
| 437 # Filled in by the connection, so to force late | |
| 438 # validation of the field just mark as RuntimeValue. | |
| 439 # It would be better I guess if this were some other | |
| 440 # value dedicated to this purpose (e.g. a ficitious | |
| 441 # {"__class__": "ConnectedValue"}) that could be further | |
| 442 # validated by Galaxy. | |
| 443 return _runtime_value() | |
| 444 if isinstance(value, dict): | |
| 445 new_values = {} | |
| 446 for k, v in value.items(): | |
| 447 new_key = _join_prefix(key, k) | |
| 448 new_values[k] = replace_links(v, new_key) | |
| 449 return new_values | |
| 450 elif isinstance(value, list): | |
| 451 new_values = [] | |
| 452 for i, v in enumerate(value): | |
| 453 # If we are a repeat we need to modify the key | |
| 454 # but not if values are actually $links. | |
| 455 if _is_link(v): | |
| 456 append_link(key, v) | |
| 457 new_values.append(None) | |
| 458 else: | |
| 459 new_key = "%s_%d" % (key, i) | |
| 460 new_values.append(replace_links(v, new_key)) | |
| 461 return new_values | |
| 462 else: | |
| 463 return value | |
| 464 | |
| 465 # TODO: handle runtime inputs and state together. | |
| 466 runtime_inputs = step.get("runtime_inputs", []) | |
| 467 if "state" in step or runtime_inputs: | |
| 468 step_state = step.pop("state", {}) | |
| 469 step_state = replace_links(step_state) | |
| 470 | |
| 471 for key, value in step_state.items(): | |
| 472 tool_state[key] = json.dumps(value) | |
| 473 for runtime_input in runtime_inputs: | |
| 474 tool_state[runtime_input] = json.dumps(_runtime_value()) | |
| 475 elif "tool_state" in step: | |
| 476 tool_state.update(step.get("tool_state")) | |
| 477 | |
| 478 # Fill in input connections | |
| 479 _populate_input_connections(context, step, connect) | |
| 480 | |
| 481 _populate_tool_state(step, tool_state) | |
| 482 | |
| 483 # Handle outputs. | |
| 484 out = step.pop("out", None) | |
| 485 if out is None: | |
| 486 # Handle LEGACY 19.XX outputs key. | |
| 487 out = step.pop("outputs", []) | |
| 488 out = convert_dict_to_id_list_if_needed(out) | |
| 489 for output in out: | |
| 490 name = output["id"] | |
| 491 for action_key, action_dict in POST_JOB_ACTIONS.items(): | |
| 492 action_argument = output.get(action_key, action_dict['default']) | |
| 493 if action_argument: | |
| 494 action_class = action_dict['action_class'] | |
| 495 action_name = action_class + name | |
| 496 action = _action( | |
| 497 action_class, | |
| 498 name, | |
| 499 arguments=action_dict['arguments'](action_argument) | |
| 500 ) | |
| 501 post_job_actions[action_name] = action | |
| 502 | |
| 503 | |
| 504 def run_tool_to_step(conversion_context, step, run_action): | |
| 505 tool_description = conversion_context.galaxy_interface.import_tool( | |
| 506 run_action | |
| 507 ) | |
| 508 step["type"] = "tool" | |
| 509 step["tool_id"] = tool_description["tool_id"] | |
| 510 step["tool_version"] = tool_description["tool_version"] | |
| 511 step["tool_hash"] = tool_description.get("tool_hash") | |
| 512 step["tool_uuid"] = tool_description.get("uuid") | |
| 513 | |
| 514 | |
| 515 class BaseConversionContext(object): | |
| 516 | |
| 517 def __init__(self): | |
| 518 self.labels = {} | |
| 519 self.subworkflow_conversion_contexts = {} | |
| 520 | |
| 521 def step_id(self, label_or_id): | |
| 522 if label_or_id in self.labels: | |
| 523 id = self.labels[label_or_id] | |
| 524 else: | |
| 525 id = label_or_id | |
| 526 return int(id) | |
| 527 | |
| 528 def step_output(self, value): | |
| 529 value_parts = str(value).split("/") | |
| 530 if len(value_parts) == 1: | |
| 531 value_parts.append("output") | |
| 532 id = self.step_id(value_parts[0]) | |
| 533 return id, value_parts[1] | |
| 534 | |
| 535 def get_subworkflow_conversion_context(self, step): | |
| 536 # TODO: sometimes this method takes format2 steps and some times converted native ones | |
| 537 # (for input connections) - redo this so the type signature is stronger. | |
| 538 step_id = step.get("id") | |
| 539 run_action = step.get("run") | |
| 540 if self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action): | |
| 541 subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(run_action) | |
| 542 return subworkflow_conversion_context | |
| 543 if "content_id" in step: | |
| 544 subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(step["content_id"]) | |
| 545 return subworkflow_conversion_context | |
| 546 | |
| 547 if step_id not in self.subworkflow_conversion_contexts: | |
| 548 | |
| 549 subworkflow_conversion_context = SubworkflowConversionContext( | |
| 550 self | |
| 551 ) | |
| 552 self.subworkflow_conversion_contexts[step_id] = subworkflow_conversion_context | |
| 553 return self.subworkflow_conversion_contexts[step_id] | |
| 554 | |
| 555 def get_runnable_description(self, run_action): | |
| 556 if "@import" in run_action: | |
| 557 if len(run_action) > 1: | |
| 558 raise Exception("@import must be only key if present.") | |
| 559 | |
| 560 run_action_path = run_action["@import"] | |
| 561 runnable_path = os.path.join(self.workflow_directory, run_action_path) | |
| 562 with open(runnable_path, "r") as f: | |
| 563 runnable_description = ordered_load(f) | |
| 564 run_action = runnable_description | |
| 565 | |
| 566 if not self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action): | |
| 567 run_action = self.graph_ids[run_action[1:]] | |
| 568 | |
| 569 return run_action | |
| 570 | |
| 571 | |
| 572 class ConversionContext(BaseConversionContext): | |
| 573 | |
| 574 def __init__(self, galaxy_interface, workflow_directory, import_options: Optional[ImportOptions] = None): | |
| 575 super(ConversionContext, self).__init__() | |
| 576 self.import_options = import_options or ImportOptions() | |
| 577 self.graph_ids = OrderedDict() # type: Dict | |
| 578 self.graph_id_subworkflow_conversion_contexts = {} # type: Dict | |
| 579 self.workflow_directory = workflow_directory | |
| 580 self.galaxy_interface = galaxy_interface | |
| 581 | |
| 582 def register_runnable(self, run_action): | |
| 583 assert "id" in run_action | |
| 584 self.graph_ids[run_action["id"]] = run_action | |
| 585 | |
| 586 def get_subworkflow_conversion_context_graph(self, graph_id): | |
| 587 if graph_id not in self.graph_id_subworkflow_conversion_contexts: | |
| 588 subworkflow_conversion_context = SubworkflowConversionContext( | |
| 589 self | |
| 590 ) | |
| 591 self.graph_id_subworkflow_conversion_contexts[graph_id] = subworkflow_conversion_context | |
| 592 return self.graph_id_subworkflow_conversion_contexts[graph_id] | |
| 593 | |
| 594 | |
| 595 class SubworkflowConversionContext(BaseConversionContext): | |
| 596 | |
| 597 def __init__(self, parent_context): | |
| 598 super(SubworkflowConversionContext, self).__init__() | |
| 599 self.parent_context = parent_context | |
| 600 | |
| 601 @property | |
| 602 def graph_ids(self): | |
| 603 return self.parent_context.graph_ids | |
| 604 | |
| 605 @property | |
| 606 def workflow_directory(self): | |
| 607 return self.parent_context.workflow_directory | |
| 608 | |
| 609 @property | |
| 610 def import_options(self): | |
| 611 return self.parent_context.import_options | |
| 612 | |
| 613 @property | |
| 614 def galaxy_interface(self): | |
| 615 return self.parent_context.galaxy_interface | |
| 616 | |
| 617 def get_subworkflow_conversion_context_graph(self, graph_id): | |
| 618 return self.parent_context.get_subworkflow_conversion_context_graph(graph_id) | |
| 619 | |
| 620 | |
| 621 def _action(type, name, arguments={}): | |
| 622 return { | |
| 623 "action_arguments": arguments, | |
| 624 "action_type": type, | |
| 625 "output_name": name, | |
| 626 } | |
| 627 | |
| 628 | |
| 629 def _is_link(value): | |
| 630 return isinstance(value, dict) and "$link" in value | |
| 631 | |
| 632 | |
| 633 def _join_prefix(prefix, key): | |
| 634 if prefix: | |
| 635 new_key = "%s|%s" % (prefix, key) | |
| 636 else: | |
| 637 new_key = key | |
| 638 return new_key | |
| 639 | |
| 640 | |
| 641 def _init_connect_dict(step): | |
| 642 if "connect" not in step: | |
| 643 step["connect"] = {} | |
| 644 | |
| 645 connect = step["connect"] | |
| 646 del step["connect"] | |
| 647 | |
| 648 # handle CWL-style in dict connections. | |
| 649 if "in" in step: | |
| 650 step_in = step["in"] | |
| 651 assert isinstance(step_in, dict) | |
| 652 connection_keys = set() | |
| 653 for key, value in step_in.items(): | |
| 654 # TODO: this can be a list right? | |
| 655 if isinstance(value, dict) and 'source' in value: | |
| 656 value = value["source"] | |
| 657 elif isinstance(value, dict) and 'default' in value: | |
| 658 continue | |
| 659 elif isinstance(value, dict): | |
| 660 raise KeyError('step input must define either source or default %s' % value) | |
| 661 connect[key] = [value] | |
| 662 connection_keys.add(key) | |
| 663 | |
| 664 for key in connection_keys: | |
| 665 del step_in[key] | |
| 666 | |
| 667 if len(step_in) == 0: | |
| 668 del step['in'] | |
| 669 | |
| 670 return connect | |
| 671 | |
| 672 | |
| 673 def _populate_input_connections(context, step, connect): | |
| 674 _ensure_inputs_connections(step) | |
| 675 input_connections = step["input_connections"] | |
| 676 is_subworkflow_step = step.get("type") == "subworkflow" | |
| 677 | |
| 678 for key, values in connect.items(): | |
| 679 input_connection_value = [] | |
| 680 if not isinstance(values, list): | |
| 681 values = [values] | |
| 682 for value in values: | |
| 683 if not isinstance(value, dict): | |
| 684 if key == "$step": | |
| 685 value += "/__NO_INPUT_OUTPUT_NAME__" | |
| 686 id, output_name = context.step_output(value) | |
| 687 value = {"id": id, "output_name": output_name} | |
| 688 if is_subworkflow_step: | |
| 689 subworkflow_conversion_context = context.get_subworkflow_conversion_context(step) | |
| 690 input_subworkflow_step_id = subworkflow_conversion_context.step_id(key) | |
| 691 value["input_subworkflow_step_id"] = input_subworkflow_step_id | |
| 692 input_connection_value.append(value) | |
| 693 if key == "$step": | |
| 694 key = "__NO_INPUT_OUTPUT_NAME__" | |
| 695 input_connections[key] = input_connection_value | |
| 696 | |
| 697 | |
| 698 def _populate_annotation(step): | |
| 699 if "annotation" not in step and "doc" in step: | |
| 700 annotation = step.pop("doc") | |
| 701 step["annotation"] = annotation | |
| 702 elif "annotation" not in step: | |
| 703 step["annotation"] = "" | |
| 704 | |
| 705 | |
| 706 def _ensure_inputs_connections(step): | |
| 707 if "input_connections" not in step: | |
| 708 step["input_connections"] = {} | |
| 709 | |
| 710 | |
| 711 def _ensure_defaults(in_dict, defaults): | |
| 712 for key, value in defaults.items(): | |
| 713 if key not in in_dict: | |
| 714 in_dict[key] = value | |
| 715 | |
| 716 | |
| 717 def _populate_tool_state(step, tool_state): | |
| 718 step["tool_state"] = json.dumps(tool_state) | |
| 719 | |
| 720 | |
| 721 def main(argv=None): | |
| 722 """Entry point for script to conversion from Format 2 interface.""" | |
| 723 if argv is None: | |
| 724 argv = sys.argv[1:] | |
| 725 | |
| 726 args = _parser().parse_args(argv) | |
| 727 | |
| 728 format2_path = args.input_path | |
| 729 output_path = args.output_path or (format2_path + ".gxwf.yml") | |
| 730 | |
| 731 workflow_directory = os.path.abspath(format2_path) | |
| 732 galaxy_interface = None | |
| 733 | |
| 734 with open(format2_path, "r") as f: | |
| 735 has_workflow = ordered_load(f) | |
| 736 | |
| 737 output = python_to_workflow(has_workflow, galaxy_interface=galaxy_interface, workflow_directory=workflow_directory) | |
| 738 with open(output_path, "w") as f: | |
| 739 json.dump(output, f, indent=4) | |
| 740 | |
| 741 | |
| 742 def _parser(): | |
| 743 parser = argparse.ArgumentParser(description=SCRIPT_DESCRIPTION) | |
| 744 parser.add_argument('input_path', metavar='INPUT', type=str, | |
| 745 help='input workflow path (.ga)') | |
| 746 parser.add_argument('output_path', metavar='OUTPUT', type=str, nargs="?", | |
| 747 help='output workflow path (.gxfw.yml)') | |
| 748 return parser | |
| 749 | |
| 750 | |
| 751 if __name__ == "__main__": | |
| 752 main(sys.argv) | |
| 753 | |
| 754 | |
| 755 __all__ = ( | |
| 756 'main', | |
| 757 'python_to_workflow', | |
| 758 'yaml_to_workflow', | |
| 759 ) |
