view env/lib/python3.7/site-packages/gxformat2/converter.py @ 4:79f47841a781 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:47:39 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

"""Functionality for converting a Format 2 workflow into a standard Galaxy workflow."""
from __future__ import print_function

import copy
import json
import logging
import os
import sys
import uuid
from collections import OrderedDict

from ._labels import Labels
from ._yaml import ordered_load

# source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output
SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1"
STEP_TYPES = [
    "subworkflow",
    "data_input",
    "data_collection_input",
    "tool",
    "pause",
    "parameter_input",
]

STEP_TYPE_ALIASES = {
    'input': 'data_input',
    'input_collection': 'data_collection_input',
    'parameter': 'parameter_input',
}

RUN_ACTIONS_TO_STEPS = {
    'GalaxyWorkflow': 'run_workflow_to_step',
    'GalaxyTool': 'run_tool_to_step',
}

POST_JOB_ACTIONS = {
    'hide': {
        'action_class': "HideDatasetAction",
        'default': False,
        'arguments': lambda x: x,
    },
    'rename': {
        'action_class': 'RenameDatasetAction',
        'default': {},
        'arguments': lambda x: {'newname': x},
    },
    'delete_intermediate_datasets': {
        'action_class': 'DeleteIntermediatesAction',
        'default': False,
        'arguments': lambda x: x,
    },
    'change_datatype': {
        'action_class': 'ChangeDatatypeAction',
        'default': {},
        'arguments': lambda x: {'newtype': x},
    },
    'set_columns': {
        'action_class': 'ColumnSetAction',
        'default': {},
        'arguments': lambda x: x,
    },
    'add_tags': {
        'action_class': 'TagDatasetAction',
        'default': [],
        'arguments': lambda x: {'tags': ",".join(x)},
    },
    'remove_tags': {
        'action_class': 'RemoveTagDatasetAction',
        'default': [],
        'arguments': lambda x: {'tags': ",".join(x)},
    },
}

log = logging.getLogger(__name__)


def rename_arg(argument):
    return argument


def clean_connection(value):
    if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS:
        # Hope these are just used by Galaxy testing workflows and such, and not in production workflows.
        log.warn("Legacy workflow syntax for connections [%s] will not be supported in the future" % value)
        value = value.replace("#", "/", 1)
    else:
        return value


class ImportOptions(object):

    def __init__(self):
        self.deduplicate_subworkflows = False


def yaml_to_workflow(has_yaml, galaxy_interface, workflow_directory, import_options=None):
    """Convert a Format 2 workflow into standard Galaxy format from supplied stream."""
    as_python = ordered_load(has_yaml)
    return python_to_workflow(as_python, galaxy_interface, workflow_directory, import_options=import_options)


def python_to_workflow(as_python, galaxy_interface, workflow_directory=None, import_options=None):
    """Convert a Format 2 workflow into standard Galaxy format from supplied dictionary."""
    if "yaml_content" in as_python:
        as_python = ordered_load(as_python["yaml_content"])

    if workflow_directory is None:
        workflow_directory = os.path.abspath(".")

    conversion_context = ConversionContext(
        galaxy_interface,
        workflow_directory,
        import_options,
    )
    as_python = _preprocess_graphs(as_python, conversion_context)
    subworkflows = None
    if conversion_context.import_options.deduplicate_subworkflows:
        # TODO: import only required workflows...
        # TODO: dag sort these...
        subworkflows = OrderedDict()
        for graph_id, subworkflow_content in conversion_context.graph_ids.items():
            if graph_id == "main":
                continue
            subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context_graph("#" + graph_id)
            subworkflows[graph_id] = _python_to_workflow(copy.deepcopy(subworkflow_content), subworkflow_conversion_context)
    converted = _python_to_workflow(as_python, conversion_context)
    if subworkflows is not None:
        converted["subworkflows"] = subworkflows
    return converted


# move to a utils file?
def steps_as_list(format2_workflow, add_label=True):
    """Return steps as a list, converting ID map to list representation if needed."""
    steps = format2_workflow["steps"]
    steps = _convert_dict_to_id_list_if_needed(steps, add_label=True)
    return steps


def ensure_step_position(step, order_index):
    """Ensure step contains a position definition."""
    if "position" not in step:
        step["position"] = {
            "left": 10 * order_index,
            "top": 10 * order_index
        }


def _python_to_workflow(as_python, conversion_context):

    if "class" not in as_python:
        raise Exception("This is not a not a valid Galaxy workflow definition, must define a class.")

    if as_python["class"] != "GalaxyWorkflow":
        raise Exception("This is not a not a valid Galaxy workflow definition, 'class' must be 'GalaxyWorkflow'.")

    # .ga files don't have this, drop it so it isn't interpreted as a format 2 workflow.
    as_python.pop("class")

    _ensure_defaults(as_python, {
        "a_galaxy_workflow": "true",
        "format-version": "0.1",
        "name": "Workflow",
        "uuid": str(uuid.uuid4()),
    })
    _populate_annotation(as_python)

    steps = steps_as_list(as_python)

    convert_inputs_to_steps(as_python, steps)

    if isinstance(steps, list):
        steps_as_dict = OrderedDict()
        for i, step in enumerate(steps):
            steps_as_dict[str(i)] = step
            if "id" not in step:
                step["id"] = i

            if "label" in step:
                label = step["label"]
                conversion_context.labels[label] = i

            # TODO: this really should be optional in Galaxy API.
            ensure_step_position(step, i)

        as_python["steps"] = steps_as_dict
        steps = steps_as_dict

    for step in steps.values():
        step_type = step.get("type", None)
        if "run" in step:
            if step_type is not None:
                raise Exception("Steps specified as run actions cannot specify a type.")
            run_action = step.get("run")
            run_action = conversion_context.get_runnable_description(run_action)
            if isinstance(run_action, dict):
                run_class = run_action["class"]
                run_to_step_function = eval(RUN_ACTIONS_TO_STEPS[run_class])

                run_to_step_function(conversion_context, step, run_action)
            else:
                step["content_id"] = run_action
                step["type"] = "subworkflow"
            del step["run"]

    for step in steps.values():
        step_type = step.get("type", "tool")
        step_type = STEP_TYPE_ALIASES.get(step_type, step_type)
        if step_type not in STEP_TYPES:
            raise Exception("Unknown step type encountered %s" % step_type)
        step["type"] = step_type
        eval("transform_%s" % step_type)(conversion_context, step)

    outputs = as_python.get("outputs", [])
    outputs = _convert_dict_to_id_list_if_needed(outputs)

    for output in outputs:
        assert isinstance(output, dict), "Output definition must be dictionary"
        assert "source" in output or "outputSource" in output, "Output definition must specify source"

        if "label" in output and "id" in output:
            raise Exception("label and id are aliases for outputs, may only define one")
        if "label" not in output and "id" not in output:
            label = ""

        raw_label = output.pop("label", None)
        raw_id = output.pop("id", None)
        label = raw_label or raw_id
        if Labels.is_anonymous_output_label(label):
            label = None
        source = clean_connection(output.get("outputSource"))
        if source is None and SUPPORT_LEGACY_CONNECTIONS:
            source = output.get("source").replace("#", "/", 1)
        id, output_name = conversion_context.step_output(source)
        step = steps[str(id)]
        workflow_output = {
            "output_name": output_name,
            "label": label,
            "uuid": output.get("uuid", None)
        }
        if "workflow_outputs" not in step:
            step["workflow_outputs"] = []
        step["workflow_outputs"].append(workflow_output)

    return as_python


def _preprocess_graphs(as_python, conversion_context):
    if not isinstance(as_python, dict):
        raise Exception("This is not a not a valid Galaxy workflow definition.")

    format_version = as_python.get("format-version", "v2.0")
    assert format_version == "v2.0"

    if "class" not in as_python and "$graph" in as_python:
        for subworkflow in as_python["$graph"]:
            if not isinstance(subworkflow, dict):
                raise Exception("Malformed workflow content in $graph")
            if "id" not in subworkflow:
                raise Exception("No subworkflow ID found for entry in $graph.")
            subworkflow_id = subworkflow["id"]
            if subworkflow_id == "main":
                as_python = subworkflow

            conversion_context.register_runnable(subworkflow)

    return as_python


def convert_inputs_to_steps(workflow_dict, steps):
    """Convert workflow inputs to a steps in array - like in native Galaxy."""
    if "inputs" not in workflow_dict:
        return

    inputs = workflow_dict["inputs"]
    new_steps = []
    inputs = _convert_dict_to_id_list_if_needed(inputs)
    for input_def_raw in inputs:
        input_def = input_def_raw.copy()

        if "label" in input_def and "id" in input_def:
            raise Exception("label and id are aliases for inputs, may only define one")
        if "label" not in input_def and "id" not in input_def:
            raise Exception("Input must define a label.")

        raw_label = input_def.pop("label", None)
        raw_id = input_def.pop("id", None)
        label = raw_label or raw_id

        if not label:
            raise Exception("Input label must not be empty.")

        input_type = input_def.pop("type", "data")
        if input_type in ["File", "data", "data_input"]:
            step_type = "data_input"
        elif input_type in ["collection", "data_collection", "data_collection_input"]:
            step_type = "data_collection_input"
        elif input_type in ["text", "integer", "float", "color", "boolean"]:
            step_type = "parameter_input"
            input_def["parameter_type"] = input_type
        else:
            raise Exception("Input type must be a data file or collection.")

        step_def = input_def
        step_def.update({
            "type": step_type,
            "label": label,
        })
        new_steps.append(step_def)

    for i, new_step in enumerate(new_steps):
        steps.insert(i, new_step)


def run_workflow_to_step(conversion_context, step, run_action):
    step["type"] = "subworkflow"
    if conversion_context.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
        step["content_id"] = run_action
    else:
        subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context(step)
        step["subworkflow"] = _python_to_workflow(
            copy.deepcopy(run_action),
            subworkflow_conversion_context,
        )


def _is_graph_id_reference(run_action):
    return run_action and not isinstance(run_action, dict)


def transform_data_input(context, step):
    transform_input(context, step, default_name="Input dataset")


def transform_data_collection_input(context, step):
    transform_input(context, step, default_name="Input dataset collection")


def transform_parameter_input(context, step):
    transform_input(context, step, default_name="input_parameter")


def transform_input(context, step, default_name):
    default_name = step.get("label", default_name)
    _populate_annotation(step)
    _ensure_inputs_connections(step)

    if "inputs" not in step:
        step["inputs"] = [{}]

    step_inputs = step["inputs"][0]
    if "name" in step_inputs:
        name = step_inputs["name"]
    else:
        name = default_name

    _ensure_defaults(step_inputs, {
        "name": name,
        "description": "",
    })
    tool_state = {
        "name": name
    }
    for attrib in ["collection_type", "parameter_type", "optional", "default"]:
        if attrib in step:
            tool_state[attrib] = step[attrib]

    _populate_tool_state(step, tool_state)


def transform_pause(context, step, default_name="Pause for dataset review"):
    default_name = step.get("label", default_name)
    _populate_annotation(step)

    _ensure_inputs_connections(step)

    if "inputs" not in step:
        step["inputs"] = [{}]

    step_inputs = step["inputs"][0]
    if "name" in step_inputs:
        name = step_inputs["name"]
    else:
        name = default_name

    _ensure_defaults(step_inputs, {
        "name": name,
    })
    tool_state = {
        "name": name
    }

    connect = _init_connect_dict(step)
    _populate_input_connections(context, step, connect)
    _populate_tool_state(step, tool_state)


def transform_subworkflow(context, step):
    _populate_annotation(step)

    _ensure_inputs_connections(step)

    tool_state = {
    }

    connect = _init_connect_dict(step)
    _populate_input_connections(context, step, connect)
    _populate_tool_state(step, tool_state)


def _runtime_value():
    return {"__class__": "RuntimeValue"}


def transform_tool(context, step):
    if "tool_id" not in step:
        raise Exception("Tool steps must define a tool_id.")

    _ensure_defaults(step, {
        "name": step['tool_id'],
        "post_job_actions": {},
        "tool_version": None,
    })
    post_job_actions = step["post_job_actions"]
    _populate_annotation(step)

    tool_state = {
        # TODO: Galaxy should not require tool state actually specify a __page__.
        "__page__": 0,
    }

    connect = _init_connect_dict(step)

    def append_link(key, value):
        if key not in connect:
            connect[key] = []
        assert "$link" in value
        link_value = value["$link"]
        connect[key].append(clean_connection(link_value))

    def replace_links(value, key=""):
        if _is_link(value):
            append_link(key, value)
            # Filled in by the connection, so to force late
            # validation of the field just mark as RuntimeValue.
            # It would be better I guess if this were some other
            # value dedicated to this purpose (e.g. a ficitious
            # {"__class__": "ConnectedValue"}) that could be further
            # validated by Galaxy.
            return _runtime_value()
        if isinstance(value, dict):
            new_values = {}
            for k, v in value.items():
                new_key = _join_prefix(key, k)
                new_values[k] = replace_links(v, new_key)
            return new_values
        elif isinstance(value, list):
            new_values = []
            for i, v in enumerate(value):
                # If we are a repeat we need to modify the key
                # but not if values are actually $links.
                if _is_link(v):
                    append_link(key, v)
                    new_values.append(None)
                else:
                    new_key = "%s_%d" % (key, i)
                    new_values.append(replace_links(v, new_key))
            return new_values
        else:
            return value

    # TODO: handle runtime inputs and state together.
    runtime_inputs = step.get("runtime_inputs", [])
    if "state" in step or runtime_inputs:
        step_state = step.pop("state", {})
        step_state = replace_links(step_state)

        for key, value in step_state.items():
            tool_state[key] = json.dumps(value)
        for runtime_input in runtime_inputs:
            tool_state[runtime_input] = json.dumps(_runtime_value())
    elif "tool_state" in step:
        tool_state.update(step.get("tool_state"))

    # Fill in input connections
    _populate_input_connections(context, step, connect)

    _populate_tool_state(step, tool_state)

    # Handle outputs.
    out = step.pop("out", None)
    if out is None:
        # Handle LEGACY 19.XX outputs key.
        out = step.pop("outputs", [])
    out = _convert_dict_to_id_list_if_needed(out)
    for output in out:
        name = output["id"]
        for action_key, action_dict in POST_JOB_ACTIONS.items():
            action_argument = output.get(action_key, action_dict['default'])
            if action_argument:
                action_class = action_dict['action_class']
                action_name = action_class + name
                action = _action(
                    action_class,
                    name,
                    arguments=action_dict['arguments'](action_argument)
                )
                post_job_actions[action_name] = action


def run_tool_to_step(conversion_context, step, run_action):
    tool_description = conversion_context.galaxy_interface.import_tool(
        run_action
    )
    step["type"] = "tool"
    step["tool_id"] = tool_description["tool_id"]
    step["tool_version"] = tool_description["tool_version"]
    step["tool_hash"] = tool_description.get("tool_hash")
    step["tool_uuid"] = tool_description.get("uuid")


class BaseConversionContext(object):

    def __init__(self):
        self.labels = {}
        self.subworkflow_conversion_contexts = {}

    def step_id(self, label_or_id):
        if label_or_id in self.labels:
            id = self.labels[label_or_id]
        else:
            id = label_or_id
        return int(id)

    def step_output(self, value):
        value_parts = str(value).split("/")
        if len(value_parts) == 1:
            value_parts.append("output")
        id = self.step_id(value_parts[0])
        return id, value_parts[1]

    def get_subworkflow_conversion_context(self, step):
        # TODO: sometimes this method takes format2 steps and some times converted native ones
        # (for input connections) - redo this so the type signature is stronger.
        step_id = step.get("id")
        run_action = step.get("run")
        if self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
            subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(run_action)
            return subworkflow_conversion_context
        if "content_id" in step:
            subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(step["content_id"])
            return subworkflow_conversion_context

        if step_id not in self.subworkflow_conversion_contexts:

            subworkflow_conversion_context = SubworkflowConversionContext(
                self
            )
            self.subworkflow_conversion_contexts[step_id] = subworkflow_conversion_context
        return self.subworkflow_conversion_contexts[step_id]

    def get_runnable_description(self, run_action):
        if "@import" in run_action:
            if len(run_action) > 1:
                raise Exception("@import must be only key if present.")

            run_action_path = run_action["@import"]
            runnable_path = os.path.join(self.workflow_directory, run_action_path)
            with open(runnable_path, "r") as f:
                runnable_description = ordered_load(f)
                run_action = runnable_description

        if not self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
            run_action = self.graph_ids[run_action[1:]]

        return run_action


class ConversionContext(BaseConversionContext):

    def __init__(self, galaxy_interface, workflow_directory, import_options=None):
        super(ConversionContext, self).__init__()
        self.import_options = import_options or ImportOptions()
        self.graph_ids = OrderedDict()
        self.graph_id_subworkflow_conversion_contexts = {}
        self.workflow_directory = workflow_directory
        self.galaxy_interface = galaxy_interface

    def register_runnable(self, run_action):
        assert "id" in run_action
        self.graph_ids[run_action["id"]] = run_action

    def get_subworkflow_conversion_context_graph(self, graph_id):
        if graph_id not in self.graph_id_subworkflow_conversion_contexts:
            subworkflow_conversion_context = SubworkflowConversionContext(
                self
            )
            self.graph_id_subworkflow_conversion_contexts[graph_id] = subworkflow_conversion_context
        return self.graph_id_subworkflow_conversion_contexts[graph_id]


class SubworkflowConversionContext(BaseConversionContext):

    def __init__(self, parent_context):
        super(SubworkflowConversionContext, self).__init__()
        self.parent_context = parent_context

    @property
    def graph_ids(self):
        return self.parent_context.graph_ids

    @property
    def workflow_directory(self):
        return self.parent_context.workflow_directory

    @property
    def import_options(self):
        return self.parent_context.import_options

    @property
    def galaxy_interface(self):
        return self.parent_context.galaxy_interface

    def get_subworkflow_conversion_context_graph(self, graph_id):
        return self.parent_context.get_subworkflow_conversion_context_graph(graph_id)


def _action(type, name, arguments={}):
    return {
        "action_arguments": arguments,
        "action_type": type,
        "output_name": name,
    }


def _is_link(value):
    return isinstance(value, dict) and "$link" in value


def _join_prefix(prefix, key):
    if prefix:
        new_key = "%s|%s" % (prefix, key)
    else:
        new_key = key
    return new_key


def _init_connect_dict(step):
    if "connect" not in step:
        step["connect"] = {}

    connect = step["connect"]
    del step["connect"]

    # handle CWL-style in dict connections.
    if "in" in step:
        step_in = step["in"]
        assert isinstance(step_in, dict)
        connection_keys = set()
        for key, value in step_in.items():
            # TODO: this can be a list right?
            if isinstance(value, dict) and 'source' in value:
                value = value["source"]
            elif isinstance(value, dict) and 'default' in value:
                continue
            elif isinstance(value, dict):
                raise KeyError('step input must define either source or default %s' % value)
            connect[key] = [value]
            connection_keys.add(key)

        for key in connection_keys:
            del step_in[key]

        if len(step_in) == 0:
            del step['in']

    return connect


def _populate_input_connections(context, step, connect):
    _ensure_inputs_connections(step)
    input_connections = step["input_connections"]
    is_subworkflow_step = step.get("type") == "subworkflow"

    for key, values in connect.items():
        input_connection_value = []
        if not isinstance(values, list):
            values = [values]
        for value in values:
            if not isinstance(value, dict):
                if key == "$step":
                    value += "/__NO_INPUT_OUTPUT_NAME__"
                id, output_name = context.step_output(value)
                value = {"id": id, "output_name": output_name}
                if is_subworkflow_step:
                    subworkflow_conversion_context = context.get_subworkflow_conversion_context(step)
                    input_subworkflow_step_id = subworkflow_conversion_context.step_id(key)
                    value["input_subworkflow_step_id"] = input_subworkflow_step_id
            input_connection_value.append(value)
        if key == "$step":
            key = "__NO_INPUT_OUTPUT_NAME__"
        input_connections[key] = input_connection_value


def _populate_annotation(step):
    if "annotation" not in step and "doc" in step:
        annotation = step.pop("doc")
        step["annotation"] = annotation
    elif "annotation" not in step:
        step["annotation"] = ""


def _ensure_inputs_connections(step):
    if "input_connections" not in step:
        step["input_connections"] = {}


def _ensure_defaults(in_dict, defaults):
    for key, value in defaults.items():
        if key not in in_dict:
            in_dict[key] = value


def _populate_tool_state(step, tool_state):
    step["tool_state"] = json.dumps(tool_state)


def _convert_dict_to_id_list_if_needed(dict_or_list, add_label=False):
    rval = dict_or_list
    if isinstance(dict_or_list, dict):
        rval = []
        for key, value in dict_or_list.items():
            if not isinstance(value, dict):
                value = {"type": value}
            if add_label:
                value["label"] = key
            else:
                value["id"] = key
            rval.append(value)
    return rval


def main(argv):
    print(json.dumps(yaml_to_workflow(argv[0])))


if __name__ == "__main__":
    main(sys.argv)

__all__ = (
    'yaml_to_workflow',
    'python_to_workflow',
)