diff planemo/lib/python3.7/site-packages/galaxy/tool_util/parser/yaml.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400 (2020-07-31)
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/galaxy/tool_util/parser/yaml.py	Fri Jul 31 00:32:28 2020 -0400
@@ -0,0 +1,353 @@
+from collections import OrderedDict
+
+import packaging.version
+
+from galaxy.tool_util.deps import requirements
+from galaxy.tool_util.parser.util import (
+    DEFAULT_DELTA,
+    DEFAULT_DELTA_FRAC
+)
+from .interface import (
+    InputSource,
+    PageSource,
+    PagesSource,
+    ToolSource,
+)
+from .output_collection_def import dataset_collector_descriptions_from_output_dict
+from .output_objects import (
+    ToolOutput,
+    ToolOutputCollection,
+    ToolOutputCollectionStructure,
+)
+from .stdio import error_on_exit_code
+from .util import is_dict
+
+
+class YamlToolSource(ToolSource):
+
+    def __init__(self, root_dict, source_path=None):
+        self.root_dict = root_dict
+        self._source_path = source_path
+        self._macro_paths = []
+
+    @property
+    def source_path(self):
+        return self._source_path
+
+    def parse_id(self):
+        return self.root_dict.get("id")
+
+    def parse_version(self):
+        return self.root_dict.get("version")
+
+    def parse_name(self):
+        return self.root_dict.get("name")
+
+    def parse_description(self):
+        return self.root_dict.get("description", "")
+
+    def parse_edam_operations(self):
+        return self.root_dict.get("edam_operations", [])
+
+    def parse_edam_topics(self):
+        return self.root_dict.get("edam_topics", [])
+
+    def parse_xrefs(self):
+        xrefs = self.root_dict.get("xrefs", [])
+        return [dict(value=xref["value"], reftype=xref["type"]) for xref in xrefs if xref["type"]]
+
+    def parse_is_multi_byte(self):
+        return self.root_dict.get("is_multi_byte", self.default_is_multi_byte)
+
+    def parse_sanitize(self):
+        return self.root_dict.get("sanitize", True)
+
+    def parse_display_interface(self, default):
+        return self.root_dict.get('display_interface', default)
+
+    def parse_require_login(self, default):
+        return self.root_dict.get('require_login', default)
+
+    def parse_command(self):
+        return self.root_dict.get("command")
+
+    def parse_expression(self):
+        return self.root_dict.get("expression")
+
+    def parse_environment_variables(self):
+        return []
+
+    def parse_interpreter(self):
+        return self.root_dict.get("interpreter")
+
+    def parse_version_command(self):
+        return self.root_dict.get("runtime_version", {}).get("command", None)
+
+    def parse_version_command_interpreter(self):
+        return self.root_dict.get("runtime_version", {}).get("interpreter", None)
+
+    def parse_requirements_and_containers(self):
+        return requirements.parse_requirements_from_dict(self.root_dict)
+
+    def parse_input_pages(self):
+        # All YAML tools have only one page (feature is deprecated)
+        page_source = YamlPageSource(self.root_dict.get("inputs", {}))
+        return PagesSource([page_source])
+
+    def parse_strict_shell(self):
+        # TODO: Add ability to disable this.
+        return True
+
+    def parse_stdio(self):
+        return error_on_exit_code()
+
+    def parse_help(self):
+        return self.root_dict.get("help", None)
+
+    def parse_outputs(self, tool):
+        outputs = self.root_dict.get("outputs", {})
+        output_defs = []
+        output_collection_defs = []
+        for name, output_dict in outputs.items():
+            output_type = output_dict.get("type", "data")
+            if output_type == "data":
+                output_defs.append(self._parse_output(tool, name, output_dict))
+            elif output_type == "collection":
+                output_collection_defs.append(self._parse_output_collection(tool, name, output_dict))
+            else:
+                message = "Unknown output_type [%s] encountered." % output_type
+                raise Exception(message)
+        outputs = OrderedDict()
+        for output in output_defs:
+            outputs[output.name] = output
+        output_collections = OrderedDict()
+        for output in output_collection_defs:
+            output_collections[output.name] = output
+
+        return outputs, output_collections
+
+    def _parse_output(self, tool, name, output_dict):
+        output = ToolOutput.from_dict(name, output_dict, tool=tool)
+        return output
+
+    def _parse_output_collection(self, tool, name, output_dict):
+        name = output_dict.get("name")
+        label = output_dict.get("label")
+        default_format = output_dict.get("format", "data")
+        collection_type = output_dict.get("type", None)
+        collection_type_source = output_dict.get("type_source", None)
+        structured_like = output_dict.get("structured_like", None)
+        inherit_format = False
+        inherit_metadata = False
+        if structured_like:
+            inherit_format = output_dict.get("inherit_format", None)
+            inherit_metadata = output_dict.get("inherit_metadata", None)
+        default_format_source = output_dict.get("format_source", None)
+        default_metadata_source = output_dict.get("metadata_source", "")
+        filters = []
+        dataset_collector_descriptions = dataset_collector_descriptions_from_output_dict(output_dict)
+
+        structure = ToolOutputCollectionStructure(
+            collection_type=collection_type,
+            collection_type_source=collection_type_source,
+            structured_like=structured_like,
+            dataset_collector_descriptions=dataset_collector_descriptions,
+        )
+        output_collection = ToolOutputCollection(
+            name,
+            structure,
+            label=label,
+            filters=filters,
+            default_format=default_format,
+            inherit_format=inherit_format,
+            inherit_metadata=inherit_metadata,
+            default_format_source=default_format_source,
+            default_metadata_source=default_metadata_source,
+        )
+        return output_collection
+
+    def parse_tests_to_dict(self):
+        tests = []
+        rval = dict(
+            tests=tests
+        )
+
+        for i, test_dict in enumerate(self.root_dict.get("tests", [])):
+            tests.append(_parse_test(i, test_dict))
+
+        return rval
+
+    def parse_profile(self):
+        return self.root_dict.get("profile", "16.04")
+
+    def parse_interactivetool(self):
+        return self.root_dict.get("entry_points", [])
+
+    def parse_python_template_version(self):
+        python_template_version = self.root_dict.get("python_template_version", None)
+        if python_template_version is not None:
+            python_template_version = packaging.version.parse(python_template_version)
+        return python_template_version
+
+
+def _parse_test(i, test_dict):
+    inputs = test_dict["inputs"]
+    if is_dict(inputs):
+        new_inputs = []
+        for key, value in inputs.items():
+            new_inputs.append({"name": key, "value": value, "attributes": {}})
+        test_dict["inputs"] = new_inputs
+
+    outputs = test_dict["outputs"]
+
+    new_outputs = []
+    if is_dict(outputs):
+        for key, value in outputs.items():
+            if is_dict(value):
+                attributes = value
+                file = attributes.get("file")
+            else:
+                file = value
+                attributes = {}
+            new_outputs.append({
+                "name": key,
+                "value": file,
+                "attributes": attributes
+            })
+    else:
+        for output in outputs:
+            name = output["name"]
+            value = output.get("file", None)
+            attributes = output
+            new_outputs.append((name, value, attributes))
+
+    for output in new_outputs:
+        attributes = output["attributes"]
+        defaults = {
+            'compare': 'diff',
+            'lines_diff': 0,
+            'delta': DEFAULT_DELTA,
+            'delta_frac': DEFAULT_DELTA_FRAC,
+            'sort': False,
+        }
+        # TODO
+        attributes["extra_files"] = []
+        # TODO
+        attributes["metadata"] = {}
+        # TODO
+        assert_list = []
+        assert_list = __to_test_assert_list(attributes.get("asserts", []))
+        attributes["assert_list"] = assert_list
+        _ensure_has(attributes, defaults)
+
+    test_dict["outputs"] = new_outputs
+    # TODO: implement output collections for YAML tools.
+    test_dict["output_collections"] = []
+    test_dict["command"] = __to_test_assert_list(test_dict.get("command", []))
+    test_dict["stdout"] = __to_test_assert_list(test_dict.get("stdout", []))
+    test_dict["stderr"] = __to_test_assert_list(test_dict.get("stderr", []))
+    test_dict["expect_exit_code"] = test_dict.get("expect_exit_code", None)
+    test_dict["expect_failure"] = test_dict.get("expect_exit_code", False)
+    return test_dict
+
+
+def __to_test_assert_list(assertions):
+    def expand_dict_form(item):
+        key, value = item
+        new_value = value.copy()
+        new_value["that"] = key
+        return new_value
+
+    if is_dict(assertions):
+        assertions = map(expand_dict_form, assertions.items())
+
+    assert_list = []
+    for assertion in assertions:
+        # TODO: not handling nested assertions correctly,
+        # not sure these are used though.
+        children = []
+        if "children" in assertion:
+            children = assertion["children"]
+            del assertion["children"]
+        assert_dict = dict(
+            tag=assertion["that"],
+            attributes=assertion,
+            children=children,
+        )
+        assert_list.append(assert_dict)
+
+    return assert_list or None  # XML variant is None if no assertions made
+
+
+class YamlPageSource(PageSource):
+
+    def __init__(self, inputs_list):
+        self.inputs_list = inputs_list
+
+    def parse_input_sources(self):
+        return map(YamlInputSource, self.inputs_list)
+
+
+class YamlInputSource(InputSource):
+
+    def __init__(self, input_dict):
+        self.input_dict = input_dict
+
+    def get(self, key, default=None):
+        return self.input_dict.get(key, default)
+
+    def get_bool(self, key, default):
+        return self.input_dict.get(key, default)
+
+    def parse_input_type(self):
+        input_type = self.input_dict["type"]
+        if input_type == "repeat":
+            return "repeat"
+        elif input_type == "conditional":
+            return "conditional"
+        else:
+            return "param"
+
+    def parse_nested_inputs_source(self):
+        assert self.parse_input_type() == "repeat"
+        return YamlPageSource(self.input_dict["blocks"])
+
+    def parse_test_input_source(self):
+        test_dict = self.input_dict.get("test", None)
+        assert test_dict is not None, "conditional must contain a `test` definition"
+        return YamlInputSource(test_dict)
+
+    def parse_when_input_sources(self):
+        input_dict = self.input_dict
+
+        sources = []
+        for value, block in input_dict.get("when", {}).items():
+            if value is True:
+                value = "true"
+            elif value is False:
+                value = "false"
+            else:
+                value = str(value)
+
+            # str here to lose type information like XML, needed?
+            if not isinstance(block, list):
+                block = [block]
+            case_page_source = YamlPageSource(block)
+            sources.append((value, case_page_source))
+        return sources
+
+    def parse_static_options(self):
+        static_options = list()
+        input_dict = self.input_dict
+        for index, option in enumerate(input_dict.get("options", {})):
+            value = option.get("value")
+            label = option.get("label", value)
+            selected = option.get("selected", False)
+            static_options.append((label, value, selected))
+        return static_options
+
+
+def _ensure_has(dict, defaults):
+    for key, value in defaults.items():
+        if key not in dict:
+            dict[key] = value