diff env/lib/python3.7/site-packages/gxformat2/converter.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/gxformat2/converter.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,754 +0,0 @@
-"""Functionality for converting a Format 2 workflow into a standard Galaxy workflow."""
-from __future__ import print_function
-
-import copy
-import json
-import logging
-import os
-import sys
-import uuid
-from collections import OrderedDict
-
-from ._labels import Labels
-from ._yaml import ordered_load
-
-# source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output
-SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1"
-STEP_TYPES = [
-    "subworkflow",
-    "data_input",
-    "data_collection_input",
-    "tool",
-    "pause",
-    "parameter_input",
-]
-
-STEP_TYPE_ALIASES = {
-    'input': 'data_input',
-    'input_collection': 'data_collection_input',
-    'parameter': 'parameter_input',
-}
-
-RUN_ACTIONS_TO_STEPS = {
-    'GalaxyWorkflow': 'run_workflow_to_step',
-    'GalaxyTool': 'run_tool_to_step',
-}
-
-POST_JOB_ACTIONS = {
-    'hide': {
-        'action_class': "HideDatasetAction",
-        'default': False,
-        'arguments': lambda x: x,
-    },
-    'rename': {
-        'action_class': 'RenameDatasetAction',
-        'default': {},
-        'arguments': lambda x: {'newname': x},
-    },
-    'delete_intermediate_datasets': {
-        'action_class': 'DeleteIntermediatesAction',
-        'default': False,
-        'arguments': lambda x: x,
-    },
-    'change_datatype': {
-        'action_class': 'ChangeDatatypeAction',
-        'default': {},
-        'arguments': lambda x: {'newtype': x},
-    },
-    'set_columns': {
-        'action_class': 'ColumnSetAction',
-        'default': {},
-        'arguments': lambda x: x,
-    },
-    'add_tags': {
-        'action_class': 'TagDatasetAction',
-        'default': [],
-        'arguments': lambda x: {'tags': ",".join(x)},
-    },
-    'remove_tags': {
-        'action_class': 'RemoveTagDatasetAction',
-        'default': [],
-        'arguments': lambda x: {'tags': ",".join(x)},
-    },
-}
-
-log = logging.getLogger(__name__)
-
-
-def rename_arg(argument):
-    return argument
-
-
-def clean_connection(value):
-    if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS:
-        # Hope these are just used by Galaxy testing workflows and such, and not in production workflows.
-        log.warn("Legacy workflow syntax for connections [%s] will not be supported in the future" % value)
-        value = value.replace("#", "/", 1)
-    else:
-        return value
-
-
-class ImportOptions(object):
-
-    def __init__(self):
-        self.deduplicate_subworkflows = False
-
-
-def yaml_to_workflow(has_yaml, galaxy_interface, workflow_directory, import_options=None):
-    """Convert a Format 2 workflow into standard Galaxy format from supplied stream."""
-    as_python = ordered_load(has_yaml)
-    return python_to_workflow(as_python, galaxy_interface, workflow_directory, import_options=import_options)
-
-
-def python_to_workflow(as_python, galaxy_interface, workflow_directory=None, import_options=None):
-    """Convert a Format 2 workflow into standard Galaxy format from supplied dictionary."""
-    if "yaml_content" in as_python:
-        as_python = ordered_load(as_python["yaml_content"])
-
-    if workflow_directory is None:
-        workflow_directory = os.path.abspath(".")
-
-    conversion_context = ConversionContext(
-        galaxy_interface,
-        workflow_directory,
-        import_options,
-    )
-    as_python = _preprocess_graphs(as_python, conversion_context)
-    subworkflows = None
-    if conversion_context.import_options.deduplicate_subworkflows:
-        # TODO: import only required workflows...
-        # TODO: dag sort these...
-        subworkflows = OrderedDict()
-        for graph_id, subworkflow_content in conversion_context.graph_ids.items():
-            if graph_id == "main":
-                continue
-            subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context_graph("#" + graph_id)
-            subworkflows[graph_id] = _python_to_workflow(copy.deepcopy(subworkflow_content), subworkflow_conversion_context)
-    converted = _python_to_workflow(as_python, conversion_context)
-    if subworkflows is not None:
-        converted["subworkflows"] = subworkflows
-    return converted
-
-
-# move to a utils file?
-def steps_as_list(format2_workflow, add_label=True):
-    """Return steps as a list, converting ID map to list representation if needed."""
-    steps = format2_workflow["steps"]
-    steps = _convert_dict_to_id_list_if_needed(steps, add_label=True)
-    return steps
-
-
-def ensure_step_position(step, order_index):
-    """Ensure step contains a position definition."""
-    if "position" not in step:
-        step["position"] = {
-            "left": 10 * order_index,
-            "top": 10 * order_index
-        }
-
-
-def _python_to_workflow(as_python, conversion_context):
-
-    if "class" not in as_python:
-        raise Exception("This is not a not a valid Galaxy workflow definition, must define a class.")
-
-    if as_python["class"] != "GalaxyWorkflow":
-        raise Exception("This is not a not a valid Galaxy workflow definition, 'class' must be 'GalaxyWorkflow'.")
-
-    # .ga files don't have this, drop it so it isn't interpreted as a format 2 workflow.
-    as_python.pop("class")
-
-    _ensure_defaults(as_python, {
-        "a_galaxy_workflow": "true",
-        "format-version": "0.1",
-        "name": "Workflow",
-        "uuid": str(uuid.uuid4()),
-    })
-    _populate_annotation(as_python)
-
-    steps = steps_as_list(as_python)
-
-    convert_inputs_to_steps(as_python, steps)
-
-    if isinstance(steps, list):
-        steps_as_dict = OrderedDict()
-        for i, step in enumerate(steps):
-            steps_as_dict[str(i)] = step
-            if "id" not in step:
-                step["id"] = i
-
-            if "label" in step:
-                label = step["label"]
-                conversion_context.labels[label] = i
-
-            # TODO: this really should be optional in Galaxy API.
-            ensure_step_position(step, i)
-
-        as_python["steps"] = steps_as_dict
-        steps = steps_as_dict
-
-    for step in steps.values():
-        step_type = step.get("type", None)
-        if "run" in step:
-            if step_type is not None:
-                raise Exception("Steps specified as run actions cannot specify a type.")
-            run_action = step.get("run")
-            run_action = conversion_context.get_runnable_description(run_action)
-            if isinstance(run_action, dict):
-                run_class = run_action["class"]
-                run_to_step_function = eval(RUN_ACTIONS_TO_STEPS[run_class])
-
-                run_to_step_function(conversion_context, step, run_action)
-            else:
-                step["content_id"] = run_action
-                step["type"] = "subworkflow"
-            del step["run"]
-
-    for step in steps.values():
-        step_type = step.get("type", "tool")
-        step_type = STEP_TYPE_ALIASES.get(step_type, step_type)
-        if step_type not in STEP_TYPES:
-            raise Exception("Unknown step type encountered %s" % step_type)
-        step["type"] = step_type
-        eval("transform_%s" % step_type)(conversion_context, step)
-
-    outputs = as_python.get("outputs", [])
-    outputs = _convert_dict_to_id_list_if_needed(outputs)
-
-    for output in outputs:
-        assert isinstance(output, dict), "Output definition must be dictionary"
-        assert "source" in output or "outputSource" in output, "Output definition must specify source"
-
-        if "label" in output and "id" in output:
-            raise Exception("label and id are aliases for outputs, may only define one")
-        if "label" not in output and "id" not in output:
-            label = ""
-
-        raw_label = output.pop("label", None)
-        raw_id = output.pop("id", None)
-        label = raw_label or raw_id
-        if Labels.is_anonymous_output_label(label):
-            label = None
-        source = clean_connection(output.get("outputSource"))
-        if source is None and SUPPORT_LEGACY_CONNECTIONS:
-            source = output.get("source").replace("#", "/", 1)
-        id, output_name = conversion_context.step_output(source)
-        step = steps[str(id)]
-        workflow_output = {
-            "output_name": output_name,
-            "label": label,
-            "uuid": output.get("uuid", None)
-        }
-        if "workflow_outputs" not in step:
-            step["workflow_outputs"] = []
-        step["workflow_outputs"].append(workflow_output)
-
-    return as_python
-
-
-def _preprocess_graphs(as_python, conversion_context):
-    if not isinstance(as_python, dict):
-        raise Exception("This is not a not a valid Galaxy workflow definition.")
-
-    format_version = as_python.get("format-version", "v2.0")
-    assert format_version == "v2.0"
-
-    if "class" not in as_python and "$graph" in as_python:
-        for subworkflow in as_python["$graph"]:
-            if not isinstance(subworkflow, dict):
-                raise Exception("Malformed workflow content in $graph")
-            if "id" not in subworkflow:
-                raise Exception("No subworkflow ID found for entry in $graph.")
-            subworkflow_id = subworkflow["id"]
-            if subworkflow_id == "main":
-                as_python = subworkflow
-
-            conversion_context.register_runnable(subworkflow)
-
-    return as_python
-
-
-def convert_inputs_to_steps(workflow_dict, steps):
-    """Convert workflow inputs to a steps in array - like in native Galaxy."""
-    if "inputs" not in workflow_dict:
-        return
-
-    inputs = workflow_dict["inputs"]
-    new_steps = []
-    inputs = _convert_dict_to_id_list_if_needed(inputs)
-    for input_def_raw in inputs:
-        input_def = input_def_raw.copy()
-
-        if "label" in input_def and "id" in input_def:
-            raise Exception("label and id are aliases for inputs, may only define one")
-        if "label" not in input_def and "id" not in input_def:
-            raise Exception("Input must define a label.")
-
-        raw_label = input_def.pop("label", None)
-        raw_id = input_def.pop("id", None)
-        label = raw_label or raw_id
-
-        if not label:
-            raise Exception("Input label must not be empty.")
-
-        input_type = input_def.pop("type", "data")
-        if input_type in ["File", "data", "data_input"]:
-            step_type = "data_input"
-        elif input_type in ["collection", "data_collection", "data_collection_input"]:
-            step_type = "data_collection_input"
-        elif input_type in ["text", "integer", "float", "color", "boolean"]:
-            step_type = "parameter_input"
-            input_def["parameter_type"] = input_type
-        else:
-            raise Exception("Input type must be a data file or collection.")
-
-        step_def = input_def
-        step_def.update({
-            "type": step_type,
-            "label": label,
-        })
-        new_steps.append(step_def)
-
-    for i, new_step in enumerate(new_steps):
-        steps.insert(i, new_step)
-
-
-def run_workflow_to_step(conversion_context, step, run_action):
-    step["type"] = "subworkflow"
-    if conversion_context.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
-        step["content_id"] = run_action
-    else:
-        subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context(step)
-        step["subworkflow"] = _python_to_workflow(
-            copy.deepcopy(run_action),
-            subworkflow_conversion_context,
-        )
-
-
-def _is_graph_id_reference(run_action):
-    return run_action and not isinstance(run_action, dict)
-
-
-def transform_data_input(context, step):
-    transform_input(context, step, default_name="Input dataset")
-
-
-def transform_data_collection_input(context, step):
-    transform_input(context, step, default_name="Input dataset collection")
-
-
-def transform_parameter_input(context, step):
-    transform_input(context, step, default_name="input_parameter")
-
-
-def transform_input(context, step, default_name):
-    default_name = step.get("label", default_name)
-    _populate_annotation(step)
-    _ensure_inputs_connections(step)
-
-    if "inputs" not in step:
-        step["inputs"] = [{}]
-
-    step_inputs = step["inputs"][0]
-    if "name" in step_inputs:
-        name = step_inputs["name"]
-    else:
-        name = default_name
-
-    _ensure_defaults(step_inputs, {
-        "name": name,
-        "description": "",
-    })
-    tool_state = {
-        "name": name
-    }
-    for attrib in ["collection_type", "parameter_type", "optional", "default"]:
-        if attrib in step:
-            tool_state[attrib] = step[attrib]
-
-    _populate_tool_state(step, tool_state)
-
-
-def transform_pause(context, step, default_name="Pause for dataset review"):
-    default_name = step.get("label", default_name)
-    _populate_annotation(step)
-
-    _ensure_inputs_connections(step)
-
-    if "inputs" not in step:
-        step["inputs"] = [{}]
-
-    step_inputs = step["inputs"][0]
-    if "name" in step_inputs:
-        name = step_inputs["name"]
-    else:
-        name = default_name
-
-    _ensure_defaults(step_inputs, {
-        "name": name,
-    })
-    tool_state = {
-        "name": name
-    }
-
-    connect = _init_connect_dict(step)
-    _populate_input_connections(context, step, connect)
-    _populate_tool_state(step, tool_state)
-
-
-def transform_subworkflow(context, step):
-    _populate_annotation(step)
-
-    _ensure_inputs_connections(step)
-
-    tool_state = {
-    }
-
-    connect = _init_connect_dict(step)
-    _populate_input_connections(context, step, connect)
-    _populate_tool_state(step, tool_state)
-
-
-def _runtime_value():
-    return {"__class__": "RuntimeValue"}
-
-
-def transform_tool(context, step):
-    if "tool_id" not in step:
-        raise Exception("Tool steps must define a tool_id.")
-
-    _ensure_defaults(step, {
-        "name": step['tool_id'],
-        "post_job_actions": {},
-        "tool_version": None,
-    })
-    post_job_actions = step["post_job_actions"]
-    _populate_annotation(step)
-
-    tool_state = {
-        # TODO: Galaxy should not require tool state actually specify a __page__.
-        "__page__": 0,
-    }
-
-    connect = _init_connect_dict(step)
-
-    def append_link(key, value):
-        if key not in connect:
-            connect[key] = []
-        assert "$link" in value
-        link_value = value["$link"]
-        connect[key].append(clean_connection(link_value))
-
-    def replace_links(value, key=""):
-        if _is_link(value):
-            append_link(key, value)
-            # Filled in by the connection, so to force late
-            # validation of the field just mark as RuntimeValue.
-            # It would be better I guess if this were some other
-            # value dedicated to this purpose (e.g. a ficitious
-            # {"__class__": "ConnectedValue"}) that could be further
-            # validated by Galaxy.
-            return _runtime_value()
-        if isinstance(value, dict):
-            new_values = {}
-            for k, v in value.items():
-                new_key = _join_prefix(key, k)
-                new_values[k] = replace_links(v, new_key)
-            return new_values
-        elif isinstance(value, list):
-            new_values = []
-            for i, v in enumerate(value):
-                # If we are a repeat we need to modify the key
-                # but not if values are actually $links.
-                if _is_link(v):
-                    append_link(key, v)
-                    new_values.append(None)
-                else:
-                    new_key = "%s_%d" % (key, i)
-                    new_values.append(replace_links(v, new_key))
-            return new_values
-        else:
-            return value
-
-    # TODO: handle runtime inputs and state together.
-    runtime_inputs = step.get("runtime_inputs", [])
-    if "state" in step or runtime_inputs:
-        step_state = step.pop("state", {})
-        step_state = replace_links(step_state)
-
-        for key, value in step_state.items():
-            tool_state[key] = json.dumps(value)
-        for runtime_input in runtime_inputs:
-            tool_state[runtime_input] = json.dumps(_runtime_value())
-    elif "tool_state" in step:
-        tool_state.update(step.get("tool_state"))
-
-    # Fill in input connections
-    _populate_input_connections(context, step, connect)
-
-    _populate_tool_state(step, tool_state)
-
-    # Handle outputs.
-    out = step.pop("out", None)
-    if out is None:
-        # Handle LEGACY 19.XX outputs key.
-        out = step.pop("outputs", [])
-    out = _convert_dict_to_id_list_if_needed(out)
-    for output in out:
-        name = output["id"]
-        for action_key, action_dict in POST_JOB_ACTIONS.items():
-            action_argument = output.get(action_key, action_dict['default'])
-            if action_argument:
-                action_class = action_dict['action_class']
-                action_name = action_class + name
-                action = _action(
-                    action_class,
-                    name,
-                    arguments=action_dict['arguments'](action_argument)
-                )
-                post_job_actions[action_name] = action
-
-
-def run_tool_to_step(conversion_context, step, run_action):
-    tool_description = conversion_context.galaxy_interface.import_tool(
-        run_action
-    )
-    step["type"] = "tool"
-    step["tool_id"] = tool_description["tool_id"]
-    step["tool_version"] = tool_description["tool_version"]
-    step["tool_hash"] = tool_description.get("tool_hash")
-    step["tool_uuid"] = tool_description.get("uuid")
-
-
-class BaseConversionContext(object):
-
-    def __init__(self):
-        self.labels = {}
-        self.subworkflow_conversion_contexts = {}
-
-    def step_id(self, label_or_id):
-        if label_or_id in self.labels:
-            id = self.labels[label_or_id]
-        else:
-            id = label_or_id
-        return int(id)
-
-    def step_output(self, value):
-        value_parts = str(value).split("/")
-        if len(value_parts) == 1:
-            value_parts.append("output")
-        id = self.step_id(value_parts[0])
-        return id, value_parts[1]
-
-    def get_subworkflow_conversion_context(self, step):
-        # TODO: sometimes this method takes format2 steps and some times converted native ones
-        # (for input connections) - redo this so the type signature is stronger.
-        step_id = step.get("id")
-        run_action = step.get("run")
-        if self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
-            subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(run_action)
-            return subworkflow_conversion_context
-        if "content_id" in step:
-            subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(step["content_id"])
-            return subworkflow_conversion_context
-
-        if step_id not in self.subworkflow_conversion_contexts:
-
-            subworkflow_conversion_context = SubworkflowConversionContext(
-                self
-            )
-            self.subworkflow_conversion_contexts[step_id] = subworkflow_conversion_context
-        return self.subworkflow_conversion_contexts[step_id]
-
-    def get_runnable_description(self, run_action):
-        if "@import" in run_action:
-            if len(run_action) > 1:
-                raise Exception("@import must be only key if present.")
-
-            run_action_path = run_action["@import"]
-            runnable_path = os.path.join(self.workflow_directory, run_action_path)
-            with open(runnable_path, "r") as f:
-                runnable_description = ordered_load(f)
-                run_action = runnable_description
-
-        if not self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
-            run_action = self.graph_ids[run_action[1:]]
-
-        return run_action
-
-
-class ConversionContext(BaseConversionContext):
-
-    def __init__(self, galaxy_interface, workflow_directory, import_options=None):
-        super(ConversionContext, self).__init__()
-        self.import_options = import_options or ImportOptions()
-        self.graph_ids = OrderedDict()
-        self.graph_id_subworkflow_conversion_contexts = {}
-        self.workflow_directory = workflow_directory
-        self.galaxy_interface = galaxy_interface
-
-    def register_runnable(self, run_action):
-        assert "id" in run_action
-        self.graph_ids[run_action["id"]] = run_action
-
-    def get_subworkflow_conversion_context_graph(self, graph_id):
-        if graph_id not in self.graph_id_subworkflow_conversion_contexts:
-            subworkflow_conversion_context = SubworkflowConversionContext(
-                self
-            )
-            self.graph_id_subworkflow_conversion_contexts[graph_id] = subworkflow_conversion_context
-        return self.graph_id_subworkflow_conversion_contexts[graph_id]
-
-
-class SubworkflowConversionContext(BaseConversionContext):
-
-    def __init__(self, parent_context):
-        super(SubworkflowConversionContext, self).__init__()
-        self.parent_context = parent_context
-
-    @property
-    def graph_ids(self):
-        return self.parent_context.graph_ids
-
-    @property
-    def workflow_directory(self):
-        return self.parent_context.workflow_directory
-
-    @property
-    def import_options(self):
-        return self.parent_context.import_options
-
-    @property
-    def galaxy_interface(self):
-        return self.parent_context.galaxy_interface
-
-    def get_subworkflow_conversion_context_graph(self, graph_id):
-        return self.parent_context.get_subworkflow_conversion_context_graph(graph_id)
-
-
-def _action(type, name, arguments={}):
-    return {
-        "action_arguments": arguments,
-        "action_type": type,
-        "output_name": name,
-    }
-
-
-def _is_link(value):
-    return isinstance(value, dict) and "$link" in value
-
-
-def _join_prefix(prefix, key):
-    if prefix:
-        new_key = "%s|%s" % (prefix, key)
-    else:
-        new_key = key
-    return new_key
-
-
-def _init_connect_dict(step):
-    if "connect" not in step:
-        step["connect"] = {}
-
-    connect = step["connect"]
-    del step["connect"]
-
-    # handle CWL-style in dict connections.
-    if "in" in step:
-        step_in = step["in"]
-        assert isinstance(step_in, dict)
-        connection_keys = set()
-        for key, value in step_in.items():
-            # TODO: this can be a list right?
-            if isinstance(value, dict) and 'source' in value:
-                value = value["source"]
-            elif isinstance(value, dict) and 'default' in value:
-                continue
-            elif isinstance(value, dict):
-                raise KeyError('step input must define either source or default %s' % value)
-            connect[key] = [value]
-            connection_keys.add(key)
-
-        for key in connection_keys:
-            del step_in[key]
-
-        if len(step_in) == 0:
-            del step['in']
-
-    return connect
-
-
-def _populate_input_connections(context, step, connect):
-    _ensure_inputs_connections(step)
-    input_connections = step["input_connections"]
-    is_subworkflow_step = step.get("type") == "subworkflow"
-
-    for key, values in connect.items():
-        input_connection_value = []
-        if not isinstance(values, list):
-            values = [values]
-        for value in values:
-            if not isinstance(value, dict):
-                if key == "$step":
-                    value += "/__NO_INPUT_OUTPUT_NAME__"
-                id, output_name = context.step_output(value)
-                value = {"id": id, "output_name": output_name}
-                if is_subworkflow_step:
-                    subworkflow_conversion_context = context.get_subworkflow_conversion_context(step)
-                    input_subworkflow_step_id = subworkflow_conversion_context.step_id(key)
-                    value["input_subworkflow_step_id"] = input_subworkflow_step_id
-            input_connection_value.append(value)
-        if key == "$step":
-            key = "__NO_INPUT_OUTPUT_NAME__"
-        input_connections[key] = input_connection_value
-
-
-def _populate_annotation(step):
-    if "annotation" not in step and "doc" in step:
-        annotation = step.pop("doc")
-        step["annotation"] = annotation
-    elif "annotation" not in step:
-        step["annotation"] = ""
-
-
-def _ensure_inputs_connections(step):
-    if "input_connections" not in step:
-        step["input_connections"] = {}
-
-
-def _ensure_defaults(in_dict, defaults):
-    for key, value in defaults.items():
-        if key not in in_dict:
-            in_dict[key] = value
-
-
-def _populate_tool_state(step, tool_state):
-    step["tool_state"] = json.dumps(tool_state)
-
-
-def _convert_dict_to_id_list_if_needed(dict_or_list, add_label=False):
-    rval = dict_or_list
-    if isinstance(dict_or_list, dict):
-        rval = []
-        for key, value in dict_or_list.items():
-            if not isinstance(value, dict):
-                value = {"type": value}
-            if add_label:
-                value["label"] = key
-            else:
-                value["id"] = key
-            rval.append(value)
-    return rval
-
-
-def main(argv):
-    print(json.dumps(yaml_to_workflow(argv[0])))
-
-
-if __name__ == "__main__":
-    main(sys.argv)
-
-__all__ = (
-    'yaml_to_workflow',
-    'python_to_workflow',
-)