Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/gxformat2/abstract.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 (2021-03-22) |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/gxformat2/abstract.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,186 @@ +"""Module for exporting Galaxy workflows to CWL abstract interface.""" +import argparse +import sys +from typing import Any, Dict + +from gxformat2._scripts import ensure_format2 +from gxformat2.converter import steps_as_list +from gxformat2.normalize import NormalizedWorkflow, walk_id_list_or_dict +from gxformat2.yaml import ordered_dump_to_path, ordered_load + +CWL_VERSION = "v1.2" + +SCRIPT_DESCRIPTION = """ +This script converts the an executable Galaxy workflow (in either format - +Format 2 or native .ga) into an abstract CWL representation. + +In order to represent Galaxy tool executions in the Common Workflow Language +workflow language, they are serialized as v1.2+ abstract 'Operation' classes. +Because abstract 'Operation' classes are used, the resulting CWL workflow is +not executable - either in Galaxy or by CWL implementations. The resulting CWL +file should be thought of more as a common metadata specification describing +the workflow structure. +""" + + +def from_dict(workflow_dict: dict, subworkflow=False): + """Convert dictified Galaxy workflow into abstract CWL representation.""" + # TODO: pass some sort of flag to ensure_format2 to make sure information + # about step outputs that may be present in native format is not lost when + # converting to Format2. + workflow_dict = ensure_format2(workflow_dict) + normalized_workflow = NormalizedWorkflow(workflow_dict) + workflow_dict = normalized_workflow.normalized_workflow_dict + + requirements = {} # type: Dict[str, Any] + abstract_dict = { + 'class': 'Workflow', + } # type: Dict[str, Any] + if not subworkflow: + abstract_dict["cwlVersion"] = CWL_VERSION + # inputs and outputs already mostly in CWL format... + + # TODO: add test case where format2 input without inputs declaration is used + abstract_dict["inputs"] = _format2_inputs_to_abstract(workflow_dict.get("inputs", {})) + abstract_dict["outputs"] = _format2_outputs_to_abstract(workflow_dict.get("outputs", {})) + steps = {} + for format2_step in steps_as_list(workflow_dict, add_ids=True, inputs_offset=len(abstract_dict["inputs"]), mutate=False): + label = format2_step.get("label") or format2_step.get("id") + assert label is not None + label = str(label) + steps[label] = _format2_step_to_abstract(format2_step, requirements=requirements) + + abstract_dict["steps"] = steps + if requirements: + abstract_dict['requirements'] = requirements + return abstract_dict + + +def _format2_step_to_abstract(format2_step, requirements): + """Convert Format2 step CWL 1.2+ abstract operation.""" + abstract_step = {} + if "run" in format2_step: + # probably encountered in subworkflow. + format2_run = format2_step["run"] + format2_run_class = format2_run["class"] + requirements["SubworkflowFeatureRequirement"] = {} + if format2_run_class == "GalaxyWorkflow": + # preprocess to ensure it has outs - should the original call be recursive? + step_run = from_dict(format2_run, subworkflow=True) + abstract_step["run"] = step_run + else: + raise NotImplementedError("Unknown runnabled type encountered [%s]" % format2_run_class) + else: + step_run = { + "class": "Operation", + "doc": format2_step.get("doc", ""), + "inputs": {}, # TODO + "outputs": {}, # TODO + } + abstract_step["run"] = step_run + abstract_step["in"] = _format2_in_to_abstract(format2_step.get("in", [])) + abstract_step["out"] = _format2_out_to_abstract(format2_step) + return abstract_step + + +def _format2_in_to_abstract(in_dict): + """Convert Format2 'in' dict for step into CWL abstract 'in' dict.""" + return in_dict + + +def _format2_out_to_abstract(format2_step, run=None): + """Convert Format2 'out' list for step into CWL abstract 'out' list.""" + cwl_out = [] + if "out" in format2_step: + out = format2_step.get("out") + if isinstance(out, dict): + for out_name, out_def in out.items(): + # discard PJA info when converting to abstract CWL + cwl_out.append(out_name) + else: + cwl_out = out + + return cwl_out + + +def _format2_inputs_to_abstract(inputs): + """Strip Galaxy extensions or namespace them.""" + abstract_inputs = {} + + for input_name, input_def in walk_id_list_or_dict(inputs): + if isinstance(input_def, dict): + input_type = input_def.get("type") + else: + input_type = input_def + input_def = {"type": input_type} + + if input_type == "data": + input_def["type"] = "File" + + _format2_type_to_abstract(input_def) + + # Strip off Galaxy extensions + input_def.pop("position", None) + abstract_inputs[input_name] = input_def + + return abstract_inputs + + +def _format2_type_to_abstract(has_type): + format2_type = has_type.pop("type") + if format2_type == "data": + cwl_type = "File" + elif format2_type == "collection": + # TODO: handled nested collections, pairs, etc... + cwl_type = "File[]" + else: + cwl_type = format2_type + optional = has_type.pop("optional", False) + if optional: + cwl_type += "?" + has_type["type"] = cwl_type + + +def _format2_outputs_to_abstract(outputs): + """Strip Galaxy extensions or namespace them.""" + for output_name, output in walk_id_list_or_dict(outputs): + if "type" not in output: + output["type"] = "File" + return outputs + + +def main(argv=None): + """Entry point for script to export abstract interface.""" + if argv is None: + argv = sys.argv[1:] + + args = _parser().parse_args(argv) + + workflow_path = args.input_path + output_path = args.output_path or (workflow_path + ".abstract.cwl") + + if workflow_path == "-": + workflow_dict = ordered_load(sys.stdin) + else: + with open(workflow_path, "r") as f: + workflow_dict = ordered_load(f) + + abstract_dict = from_dict(workflow_dict) + ordered_dump_to_path(abstract_dict, output_path) + return 0 + + +def _parser(): + parser = argparse.ArgumentParser(description=SCRIPT_DESCRIPTION) + parser.add_argument('input_path', metavar='INPUT', type=str, + help='input workflow path (.ga/gxwf.yml)') + parser.add_argument('output_path', metavar='OUTPUT', type=str, nargs="?", + help='output workflow path (.cwl)') + return parser + + +if __name__ == "__main__": + sys.exit(main()) + + +__all__ = ('main', 'from_dict')