Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/gxformat2/converter.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/gxformat2/converter.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,754 +0,0 @@ -"""Functionality for converting a Format 2 workflow into a standard Galaxy workflow.""" -from __future__ import print_function - -import copy -import json -import logging -import os -import sys -import uuid -from collections import OrderedDict - -from ._labels import Labels -from ._yaml import ordered_load - -# source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output -SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1" -STEP_TYPES = [ - "subworkflow", - "data_input", - "data_collection_input", - "tool", - "pause", - "parameter_input", -] - -STEP_TYPE_ALIASES = { - 'input': 'data_input', - 'input_collection': 'data_collection_input', - 'parameter': 'parameter_input', -} - -RUN_ACTIONS_TO_STEPS = { - 'GalaxyWorkflow': 'run_workflow_to_step', - 'GalaxyTool': 'run_tool_to_step', -} - -POST_JOB_ACTIONS = { - 'hide': { - 'action_class': "HideDatasetAction", - 'default': False, - 'arguments': lambda x: x, - }, - 'rename': { - 'action_class': 'RenameDatasetAction', - 'default': {}, - 'arguments': lambda x: {'newname': x}, - }, - 'delete_intermediate_datasets': { - 'action_class': 'DeleteIntermediatesAction', - 'default': False, - 'arguments': lambda x: x, - }, - 'change_datatype': { - 'action_class': 'ChangeDatatypeAction', - 'default': {}, - 'arguments': lambda x: {'newtype': x}, - }, - 'set_columns': { - 'action_class': 'ColumnSetAction', - 'default': {}, - 'arguments': lambda x: x, - }, - 'add_tags': { - 'action_class': 'TagDatasetAction', - 'default': [], - 'arguments': lambda x: {'tags': ",".join(x)}, - }, - 'remove_tags': { - 'action_class': 'RemoveTagDatasetAction', - 'default': [], - 'arguments': lambda x: {'tags': ",".join(x)}, - }, -} - -log = logging.getLogger(__name__) - - -def rename_arg(argument): - return argument - - -def clean_connection(value): - if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS: - # Hope these are just used by Galaxy testing workflows and such, and not in production workflows. - log.warn("Legacy workflow syntax for connections [%s] will not be supported in the future" % value) - value = value.replace("#", "/", 1) - else: - return value - - -class ImportOptions(object): - - def __init__(self): - self.deduplicate_subworkflows = False - - -def yaml_to_workflow(has_yaml, galaxy_interface, workflow_directory, import_options=None): - """Convert a Format 2 workflow into standard Galaxy format from supplied stream.""" - as_python = ordered_load(has_yaml) - return python_to_workflow(as_python, galaxy_interface, workflow_directory, import_options=import_options) - - -def python_to_workflow(as_python, galaxy_interface, workflow_directory=None, import_options=None): - """Convert a Format 2 workflow into standard Galaxy format from supplied dictionary.""" - if "yaml_content" in as_python: - as_python = ordered_load(as_python["yaml_content"]) - - if workflow_directory is None: - workflow_directory = os.path.abspath(".") - - conversion_context = ConversionContext( - galaxy_interface, - workflow_directory, - import_options, - ) - as_python = _preprocess_graphs(as_python, conversion_context) - subworkflows = None - if conversion_context.import_options.deduplicate_subworkflows: - # TODO: import only required workflows... - # TODO: dag sort these... - subworkflows = OrderedDict() - for graph_id, subworkflow_content in conversion_context.graph_ids.items(): - if graph_id == "main": - continue - subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context_graph("#" + graph_id) - subworkflows[graph_id] = _python_to_workflow(copy.deepcopy(subworkflow_content), subworkflow_conversion_context) - converted = _python_to_workflow(as_python, conversion_context) - if subworkflows is not None: - converted["subworkflows"] = subworkflows - return converted - - -# move to a utils file? -def steps_as_list(format2_workflow, add_label=True): - """Return steps as a list, converting ID map to list representation if needed.""" - steps = format2_workflow["steps"] - steps = _convert_dict_to_id_list_if_needed(steps, add_label=True) - return steps - - -def ensure_step_position(step, order_index): - """Ensure step contains a position definition.""" - if "position" not in step: - step["position"] = { - "left": 10 * order_index, - "top": 10 * order_index - } - - -def _python_to_workflow(as_python, conversion_context): - - if "class" not in as_python: - raise Exception("This is not a not a valid Galaxy workflow definition, must define a class.") - - if as_python["class"] != "GalaxyWorkflow": - raise Exception("This is not a not a valid Galaxy workflow definition, 'class' must be 'GalaxyWorkflow'.") - - # .ga files don't have this, drop it so it isn't interpreted as a format 2 workflow. - as_python.pop("class") - - _ensure_defaults(as_python, { - "a_galaxy_workflow": "true", - "format-version": "0.1", - "name": "Workflow", - "uuid": str(uuid.uuid4()), - }) - _populate_annotation(as_python) - - steps = steps_as_list(as_python) - - convert_inputs_to_steps(as_python, steps) - - if isinstance(steps, list): - steps_as_dict = OrderedDict() - for i, step in enumerate(steps): - steps_as_dict[str(i)] = step - if "id" not in step: - step["id"] = i - - if "label" in step: - label = step["label"] - conversion_context.labels[label] = i - - # TODO: this really should be optional in Galaxy API. - ensure_step_position(step, i) - - as_python["steps"] = steps_as_dict - steps = steps_as_dict - - for step in steps.values(): - step_type = step.get("type", None) - if "run" in step: - if step_type is not None: - raise Exception("Steps specified as run actions cannot specify a type.") - run_action = step.get("run") - run_action = conversion_context.get_runnable_description(run_action) - if isinstance(run_action, dict): - run_class = run_action["class"] - run_to_step_function = eval(RUN_ACTIONS_TO_STEPS[run_class]) - - run_to_step_function(conversion_context, step, run_action) - else: - step["content_id"] = run_action - step["type"] = "subworkflow" - del step["run"] - - for step in steps.values(): - step_type = step.get("type", "tool") - step_type = STEP_TYPE_ALIASES.get(step_type, step_type) - if step_type not in STEP_TYPES: - raise Exception("Unknown step type encountered %s" % step_type) - step["type"] = step_type - eval("transform_%s" % step_type)(conversion_context, step) - - outputs = as_python.get("outputs", []) - outputs = _convert_dict_to_id_list_if_needed(outputs) - - for output in outputs: - assert isinstance(output, dict), "Output definition must be dictionary" - assert "source" in output or "outputSource" in output, "Output definition must specify source" - - if "label" in output and "id" in output: - raise Exception("label and id are aliases for outputs, may only define one") - if "label" not in output and "id" not in output: - label = "" - - raw_label = output.pop("label", None) - raw_id = output.pop("id", None) - label = raw_label or raw_id - if Labels.is_anonymous_output_label(label): - label = None - source = clean_connection(output.get("outputSource")) - if source is None and SUPPORT_LEGACY_CONNECTIONS: - source = output.get("source").replace("#", "/", 1) - id, output_name = conversion_context.step_output(source) - step = steps[str(id)] - workflow_output = { - "output_name": output_name, - "label": label, - "uuid": output.get("uuid", None) - } - if "workflow_outputs" not in step: - step["workflow_outputs"] = [] - step["workflow_outputs"].append(workflow_output) - - return as_python - - -def _preprocess_graphs(as_python, conversion_context): - if not isinstance(as_python, dict): - raise Exception("This is not a not a valid Galaxy workflow definition.") - - format_version = as_python.get("format-version", "v2.0") - assert format_version == "v2.0" - - if "class" not in as_python and "$graph" in as_python: - for subworkflow in as_python["$graph"]: - if not isinstance(subworkflow, dict): - raise Exception("Malformed workflow content in $graph") - if "id" not in subworkflow: - raise Exception("No subworkflow ID found for entry in $graph.") - subworkflow_id = subworkflow["id"] - if subworkflow_id == "main": - as_python = subworkflow - - conversion_context.register_runnable(subworkflow) - - return as_python - - -def convert_inputs_to_steps(workflow_dict, steps): - """Convert workflow inputs to a steps in array - like in native Galaxy.""" - if "inputs" not in workflow_dict: - return - - inputs = workflow_dict["inputs"] - new_steps = [] - inputs = _convert_dict_to_id_list_if_needed(inputs) - for input_def_raw in inputs: - input_def = input_def_raw.copy() - - if "label" in input_def and "id" in input_def: - raise Exception("label and id are aliases for inputs, may only define one") - if "label" not in input_def and "id" not in input_def: - raise Exception("Input must define a label.") - - raw_label = input_def.pop("label", None) - raw_id = input_def.pop("id", None) - label = raw_label or raw_id - - if not label: - raise Exception("Input label must not be empty.") - - input_type = input_def.pop("type", "data") - if input_type in ["File", "data", "data_input"]: - step_type = "data_input" - elif input_type in ["collection", "data_collection", "data_collection_input"]: - step_type = "data_collection_input" - elif input_type in ["text", "integer", "float", "color", "boolean"]: - step_type = "parameter_input" - input_def["parameter_type"] = input_type - else: - raise Exception("Input type must be a data file or collection.") - - step_def = input_def - step_def.update({ - "type": step_type, - "label": label, - }) - new_steps.append(step_def) - - for i, new_step in enumerate(new_steps): - steps.insert(i, new_step) - - -def run_workflow_to_step(conversion_context, step, run_action): - step["type"] = "subworkflow" - if conversion_context.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action): - step["content_id"] = run_action - else: - subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context(step) - step["subworkflow"] = _python_to_workflow( - copy.deepcopy(run_action), - subworkflow_conversion_context, - ) - - -def _is_graph_id_reference(run_action): - return run_action and not isinstance(run_action, dict) - - -def transform_data_input(context, step): - transform_input(context, step, default_name="Input dataset") - - -def transform_data_collection_input(context, step): - transform_input(context, step, default_name="Input dataset collection") - - -def transform_parameter_input(context, step): - transform_input(context, step, default_name="input_parameter") - - -def transform_input(context, step, default_name): - default_name = step.get("label", default_name) - _populate_annotation(step) - _ensure_inputs_connections(step) - - if "inputs" not in step: - step["inputs"] = [{}] - - step_inputs = step["inputs"][0] - if "name" in step_inputs: - name = step_inputs["name"] - else: - name = default_name - - _ensure_defaults(step_inputs, { - "name": name, - "description": "", - }) - tool_state = { - "name": name - } - for attrib in ["collection_type", "parameter_type", "optional", "default"]: - if attrib in step: - tool_state[attrib] = step[attrib] - - _populate_tool_state(step, tool_state) - - -def transform_pause(context, step, default_name="Pause for dataset review"): - default_name = step.get("label", default_name) - _populate_annotation(step) - - _ensure_inputs_connections(step) - - if "inputs" not in step: - step["inputs"] = [{}] - - step_inputs = step["inputs"][0] - if "name" in step_inputs: - name = step_inputs["name"] - else: - name = default_name - - _ensure_defaults(step_inputs, { - "name": name, - }) - tool_state = { - "name": name - } - - connect = _init_connect_dict(step) - _populate_input_connections(context, step, connect) - _populate_tool_state(step, tool_state) - - -def transform_subworkflow(context, step): - _populate_annotation(step) - - _ensure_inputs_connections(step) - - tool_state = { - } - - connect = _init_connect_dict(step) - _populate_input_connections(context, step, connect) - _populate_tool_state(step, tool_state) - - -def _runtime_value(): - return {"__class__": "RuntimeValue"} - - -def transform_tool(context, step): - if "tool_id" not in step: - raise Exception("Tool steps must define a tool_id.") - - _ensure_defaults(step, { - "name": step['tool_id'], - "post_job_actions": {}, - "tool_version": None, - }) - post_job_actions = step["post_job_actions"] - _populate_annotation(step) - - tool_state = { - # TODO: Galaxy should not require tool state actually specify a __page__. - "__page__": 0, - } - - connect = _init_connect_dict(step) - - def append_link(key, value): - if key not in connect: - connect[key] = [] - assert "$link" in value - link_value = value["$link"] - connect[key].append(clean_connection(link_value)) - - def replace_links(value, key=""): - if _is_link(value): - append_link(key, value) - # Filled in by the connection, so to force late - # validation of the field just mark as RuntimeValue. - # It would be better I guess if this were some other - # value dedicated to this purpose (e.g. a ficitious - # {"__class__": "ConnectedValue"}) that could be further - # validated by Galaxy. - return _runtime_value() - if isinstance(value, dict): - new_values = {} - for k, v in value.items(): - new_key = _join_prefix(key, k) - new_values[k] = replace_links(v, new_key) - return new_values - elif isinstance(value, list): - new_values = [] - for i, v in enumerate(value): - # If we are a repeat we need to modify the key - # but not if values are actually $links. - if _is_link(v): - append_link(key, v) - new_values.append(None) - else: - new_key = "%s_%d" % (key, i) - new_values.append(replace_links(v, new_key)) - return new_values - else: - return value - - # TODO: handle runtime inputs and state together. - runtime_inputs = step.get("runtime_inputs", []) - if "state" in step or runtime_inputs: - step_state = step.pop("state", {}) - step_state = replace_links(step_state) - - for key, value in step_state.items(): - tool_state[key] = json.dumps(value) - for runtime_input in runtime_inputs: - tool_state[runtime_input] = json.dumps(_runtime_value()) - elif "tool_state" in step: - tool_state.update(step.get("tool_state")) - - # Fill in input connections - _populate_input_connections(context, step, connect) - - _populate_tool_state(step, tool_state) - - # Handle outputs. - out = step.pop("out", None) - if out is None: - # Handle LEGACY 19.XX outputs key. - out = step.pop("outputs", []) - out = _convert_dict_to_id_list_if_needed(out) - for output in out: - name = output["id"] - for action_key, action_dict in POST_JOB_ACTIONS.items(): - action_argument = output.get(action_key, action_dict['default']) - if action_argument: - action_class = action_dict['action_class'] - action_name = action_class + name - action = _action( - action_class, - name, - arguments=action_dict['arguments'](action_argument) - ) - post_job_actions[action_name] = action - - -def run_tool_to_step(conversion_context, step, run_action): - tool_description = conversion_context.galaxy_interface.import_tool( - run_action - ) - step["type"] = "tool" - step["tool_id"] = tool_description["tool_id"] - step["tool_version"] = tool_description["tool_version"] - step["tool_hash"] = tool_description.get("tool_hash") - step["tool_uuid"] = tool_description.get("uuid") - - -class BaseConversionContext(object): - - def __init__(self): - self.labels = {} - self.subworkflow_conversion_contexts = {} - - def step_id(self, label_or_id): - if label_or_id in self.labels: - id = self.labels[label_or_id] - else: - id = label_or_id - return int(id) - - def step_output(self, value): - value_parts = str(value).split("/") - if len(value_parts) == 1: - value_parts.append("output") - id = self.step_id(value_parts[0]) - return id, value_parts[1] - - def get_subworkflow_conversion_context(self, step): - # TODO: sometimes this method takes format2 steps and some times converted native ones - # (for input connections) - redo this so the type signature is stronger. - step_id = step.get("id") - run_action = step.get("run") - if self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action): - subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(run_action) - return subworkflow_conversion_context - if "content_id" in step: - subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(step["content_id"]) - return subworkflow_conversion_context - - if step_id not in self.subworkflow_conversion_contexts: - - subworkflow_conversion_context = SubworkflowConversionContext( - self - ) - self.subworkflow_conversion_contexts[step_id] = subworkflow_conversion_context - return self.subworkflow_conversion_contexts[step_id] - - def get_runnable_description(self, run_action): - if "@import" in run_action: - if len(run_action) > 1: - raise Exception("@import must be only key if present.") - - run_action_path = run_action["@import"] - runnable_path = os.path.join(self.workflow_directory, run_action_path) - with open(runnable_path, "r") as f: - runnable_description = ordered_load(f) - run_action = runnable_description - - if not self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action): - run_action = self.graph_ids[run_action[1:]] - - return run_action - - -class ConversionContext(BaseConversionContext): - - def __init__(self, galaxy_interface, workflow_directory, import_options=None): - super(ConversionContext, self).__init__() - self.import_options = import_options or ImportOptions() - self.graph_ids = OrderedDict() - self.graph_id_subworkflow_conversion_contexts = {} - self.workflow_directory = workflow_directory - self.galaxy_interface = galaxy_interface - - def register_runnable(self, run_action): - assert "id" in run_action - self.graph_ids[run_action["id"]] = run_action - - def get_subworkflow_conversion_context_graph(self, graph_id): - if graph_id not in self.graph_id_subworkflow_conversion_contexts: - subworkflow_conversion_context = SubworkflowConversionContext( - self - ) - self.graph_id_subworkflow_conversion_contexts[graph_id] = subworkflow_conversion_context - return self.graph_id_subworkflow_conversion_contexts[graph_id] - - -class SubworkflowConversionContext(BaseConversionContext): - - def __init__(self, parent_context): - super(SubworkflowConversionContext, self).__init__() - self.parent_context = parent_context - - @property - def graph_ids(self): - return self.parent_context.graph_ids - - @property - def workflow_directory(self): - return self.parent_context.workflow_directory - - @property - def import_options(self): - return self.parent_context.import_options - - @property - def galaxy_interface(self): - return self.parent_context.galaxy_interface - - def get_subworkflow_conversion_context_graph(self, graph_id): - return self.parent_context.get_subworkflow_conversion_context_graph(graph_id) - - -def _action(type, name, arguments={}): - return { - "action_arguments": arguments, - "action_type": type, - "output_name": name, - } - - -def _is_link(value): - return isinstance(value, dict) and "$link" in value - - -def _join_prefix(prefix, key): - if prefix: - new_key = "%s|%s" % (prefix, key) - else: - new_key = key - return new_key - - -def _init_connect_dict(step): - if "connect" not in step: - step["connect"] = {} - - connect = step["connect"] - del step["connect"] - - # handle CWL-style in dict connections. - if "in" in step: - step_in = step["in"] - assert isinstance(step_in, dict) - connection_keys = set() - for key, value in step_in.items(): - # TODO: this can be a list right? - if isinstance(value, dict) and 'source' in value: - value = value["source"] - elif isinstance(value, dict) and 'default' in value: - continue - elif isinstance(value, dict): - raise KeyError('step input must define either source or default %s' % value) - connect[key] = [value] - connection_keys.add(key) - - for key in connection_keys: - del step_in[key] - - if len(step_in) == 0: - del step['in'] - - return connect - - -def _populate_input_connections(context, step, connect): - _ensure_inputs_connections(step) - input_connections = step["input_connections"] - is_subworkflow_step = step.get("type") == "subworkflow" - - for key, values in connect.items(): - input_connection_value = [] - if not isinstance(values, list): - values = [values] - for value in values: - if not isinstance(value, dict): - if key == "$step": - value += "/__NO_INPUT_OUTPUT_NAME__" - id, output_name = context.step_output(value) - value = {"id": id, "output_name": output_name} - if is_subworkflow_step: - subworkflow_conversion_context = context.get_subworkflow_conversion_context(step) - input_subworkflow_step_id = subworkflow_conversion_context.step_id(key) - value["input_subworkflow_step_id"] = input_subworkflow_step_id - input_connection_value.append(value) - if key == "$step": - key = "__NO_INPUT_OUTPUT_NAME__" - input_connections[key] = input_connection_value - - -def _populate_annotation(step): - if "annotation" not in step and "doc" in step: - annotation = step.pop("doc") - step["annotation"] = annotation - elif "annotation" not in step: - step["annotation"] = "" - - -def _ensure_inputs_connections(step): - if "input_connections" not in step: - step["input_connections"] = {} - - -def _ensure_defaults(in_dict, defaults): - for key, value in defaults.items(): - if key not in in_dict: - in_dict[key] = value - - -def _populate_tool_state(step, tool_state): - step["tool_state"] = json.dumps(tool_state) - - -def _convert_dict_to_id_list_if_needed(dict_or_list, add_label=False): - rval = dict_or_list - if isinstance(dict_or_list, dict): - rval = [] - for key, value in dict_or_list.items(): - if not isinstance(value, dict): - value = {"type": value} - if add_label: - value["label"] = key - else: - value["id"] = key - rval.append(value) - return rval - - -def main(argv): - print(json.dumps(yaml_to_workflow(argv[0]))) - - -if __name__ == "__main__": - main(sys.argv) - -__all__ = ( - 'yaml_to_workflow', - 'python_to_workflow', -)