Mercurial > repos > fubar > tool_factory_2
changeset 92:6ce360759c28 draft
Uploaded
author | fubar |
---|---|
date | Thu, 19 Nov 2020 23:59:50 +0000 |
parents | 7176af503cdd |
children | df1205dcf676 |
files | toolfactory/galaxyxml/__init__.py toolfactory/galaxyxml/tool/__init__.py toolfactory/galaxyxml/tool/__pycache__/__init__.cpython-36.pyc toolfactory/galaxyxml/tool/__pycache__/import_xml.cpython-36.pyc toolfactory/galaxyxml/tool/import_xml.py toolfactory/galaxyxml/tool/parameters/__init__.py toolfactory/galaxyxml/tool/parameters/__pycache__/__init__.cpython-36.pyc |
diffstat | 7 files changed, 1700 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/toolfactory/galaxyxml/__init__.py Thu Nov 19 23:59:50 2020 +0000 @@ -0,0 +1,64 @@ +from builtins import object +from builtins import str + +from lxml import etree + + +class GalaxyXML(object): + def __init__(self): + self.root = etree.Element("root") + + def export(self): + return etree.tostring(self.root, pretty_print=True, encoding="unicode") + + +class Util(object): + @classmethod + def coerce(cls, data, kill_lists=False): + """Recursive data sanitisation + """ + if isinstance(data, dict): + return {k: cls.coerce(v, kill_lists=kill_lists) for k, v in list(data.items()) if v is not None} + elif isinstance(data, list): + if kill_lists: + return cls.coerce(data[0]) + else: + return [cls.coerce(v, kill_lists=kill_lists) for v in data] + else: + return cls.coerce_value(data) + + @classmethod + def coerce_value(cls, obj): + """Make everything a string! + """ + if isinstance(obj, bool): + if obj: + return "true" + else: + return "false" + elif isinstance(obj, str): + return obj + else: + return str(obj) + + @classmethod + def clean_kwargs(cls, params, final=False): + if "kwargs" in params: + kwargs = params["kwargs"] + for k in kwargs: + params[k] = kwargs[k] + del params["kwargs"] + if "self" in params: + del params["self"] + + if "__class__" in params: + del params["__class__"] + + # There will be more params, it would be NICE to use a whitelist + # instead of a blacklist, but until we have more data let's just + # blacklist stuff we see commonly. + if final: + for blacklist in ("positional",): + if blacklist in params: + del params[blacklist] + return params
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/toolfactory/galaxyxml/tool/__init__.py Thu Nov 19 23:59:50 2020 +0000 @@ -0,0 +1,184 @@ +import copy +import logging + +from galaxyxml import GalaxyXML, Util +from galaxyxml.tool.parameters import XMLParam + +from lxml import etree + +VALID_TOOL_TYPES = ("data_source", "data_source_async") +VALID_URL_METHODS = ("get", "post") + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class Tool(GalaxyXML): + + def __init__( + self, + name, + id, + version, + description, + executable, + hidden=False, + tool_type=None, + URL_method=None, + workflow_compatible=True, + interpreter=None, + version_command="interpreter filename.exe --version", + command_override=None, + ): + + self.executable = executable + self.interpreter = interpreter + self.command_override = command_override + kwargs = { + "name": name, + "id": id, + "version": version, + "hidden": hidden, + "workflow_compatible": workflow_compatible, + } + self.version_command = version_command + + # Remove some of the default values to make tools look a bit nicer + if not hidden: + del kwargs["hidden"] + if workflow_compatible: + del kwargs["workflow_compatible"] + + kwargs = Util.coerce(kwargs) + self.root = etree.Element("tool", **kwargs) + + if tool_type is not None: + if tool_type not in VALID_TOOL_TYPES: + raise Exception("Tool type must be one of %s" % ",".join(VALID_TOOL_TYPES)) + else: + kwargs["tool_type"] = tool_type + + if URL_method is not None: + if URL_method in VALID_URL_METHODS: + kwargs["URL_method"] = URL_method + else: + raise Exception("URL_method must be one of %s" % ",".join(VALID_URL_METHODS)) + + description_node = etree.SubElement(self.root, "description") + description_node.text = description + + def add_comment(self, comment_txt): + comment = etree.Comment(comment_txt) + self.root.insert(0, comment) + + def append_version_command(self): + version_command = etree.SubElement(self.root, "version_command") + try: + version_command.text = etree.CDATA(self.version_command) + except Exception: + pass + + def append(self, sub_node): + if issubclass(type(sub_node), XMLParam): + self.root.append(sub_node.node) + else: + self.root.append(sub_node) + + def clean_command_string(self, command_line): + clean = [] + for x in command_line: + if x is not [] and x is not [""]: + clean.append(x) + + return "\n".join(clean) + + def export(self, keep_old_command=False): # noqa + + export_xml = copy.deepcopy(self) + + try: + export_xml.append(export_xml.edam_operations) + except Exception: + pass + + try: + export_xml.append(export_xml.edam_topics) + except Exception: + pass + + try: + export_xml.append(export_xml.requirements) + except Exception: + pass + + try: + export_xml.append(export_xml.configfiles) + except Exception: + pass + + if self.command_override: + command_line = self.command_override + else: + command_line = [] + try: + command_line.append(export_xml.inputs.cli()) + except Exception as e: + logger.warning(str(e)) + + try: + command_line.append(export_xml.outputs.cli()) + except Exception: + pass + + # Add stdio section + stdio = etree.SubElement(export_xml.root, "stdio") + etree.SubElement(stdio, "exit_code", range="1:", level="fatal") + + # Append version command + export_xml.append_version_command() + + # Steal interpreter from kwargs + command_kwargs = {} + if export_xml.interpreter is not None: + command_kwargs["interpreter"] = export_xml.interpreter + + # Add command section + command_node = etree.SubElement(export_xml.root, "command", **command_kwargs) + + if keep_old_command: + if getattr(self, "command", None): + command_node.text = etree.CDATA(export_xml.command) + else: + logger.warning("The tool does not have any old command stored. " + "Only the command line is written.") + command_node.text = export_xml.executable + else: + if self.command_override: + actual_cli = export_xml.clean_command_string(command_line) + else: + actual_cli = "%s %s" % (export_xml.executable, export_xml.clean_command_string(command_line)) + command_node.text = etree.CDATA(actual_cli.strip()) + + try: + export_xml.append(export_xml.inputs) + except Exception: + pass + + try: + export_xml.append(export_xml.outputs) + except Exception: + pass + + try: + export_xml.append(export_xml.tests) + except Exception: + pass + + help_element = etree.SubElement(export_xml.root, "help") + help_element.text = etree.CDATA(export_xml.help) + + try: + export_xml.append(export_xml.citations) + except Exception: + pass + + return super(Tool, export_xml).export()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/toolfactory/galaxyxml/tool/import_xml.py Thu Nov 19 23:59:50 2020 +0000 @@ -0,0 +1,713 @@ +import logging +import xml.etree.ElementTree as ET + +import galaxyxml.tool as gxt +import galaxyxml.tool.parameters as gxtp + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class GalaxyXmlParser(object): + """ + Class to import content from an existing Galaxy XML wrapper. + """ + + def _init_tool(self, xml_root): + """ + Init tool from existing xml tool. + + :param xml_root: root of the galaxy xml file. + :type xml_root: :class:`xml.etree._Element` + """ + version_cmd = None + description = None + for child in xml_root: + if child.tag == "description": + description = child.text + elif child.tag == "command": + executable = child.text.split()[0] + command = child.text + elif child.tag == "version_command": + version_cmd = child.text + + tool = gxt.Tool( + xml_root.attrib["name"], + xml_root.attrib["id"], + xml_root.attrib.get("version", None), + description, + executable, + hidden=xml_root.attrib.get("hidden", False), + tool_type=xml_root.attrib.get("tool_type", None), + URL_method=xml_root.attrib.get("URL_method", None), + workflow_compatible=xml_root.attrib.get("workflow_compatible", True), + version_command=version_cmd, + ) + tool.command = command + return tool + + def _load_description(self, tool, desc_root): + """ + <description> is already loaded during initiation. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param desc_root: root of <description> tag. + :type desc_root: :class:`xml.etree._Element` + """ + logger.info("<description> is loaded during initiation of the object.") + + def _load_version_command(self, tool, vers_root): + """ + <version_command> is already loaded during initiation. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param vers_root: root of <version_command> tag. + :type vers_root: :class:`xml.etree._Element` + """ + logger.info("<version_command> is loaded during initiation of the object.") + + def _load_stdio(self, tool, stdio_root): + """ + So far, <stdio> is automatically generated by galaxyxml. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param desc_root: root of <stdio> tag. + :type desc_root: :class:`xml.etree._Element` + """ + logger.info("<stdio> is not loaded but automatically generated by galaxyxml.") + + def _load_command(self, tool, desc_root): + """ + <command> is already loaded during initiation. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param desc_root: root of <command> tag. + :type desc_root: :class:`xml.etree._Element` + """ + logger.info("<command> is loaded during initiation of the object.") + + def _load_help(self, tool, help_root): + """ + Load the content of the <help> into the tool. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param requirements_root: root of <help> tag. + :type requirements_root: :class:`xml.etree._Element` + """ + tool.help = help_root.text + + def _load_requirements(self, tool, requirements_root): + """ + Add <requirements> to the tool. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param requirements_root: root of <requirements> tag. + :type requirements_root: :class:`xml.etree._Element` + """ + tool.requirements = gxtp.Requirements() + for req in requirements_root: + req_type = req.attrib["type"] + value = req.text + if req.tag == "requirement": + version = req.attrib.get("version", None) + tool.requirements.append(gxtp.Requirement(req_type, value, version=version)) + elif req.tag == "container": + tool.requirements.append(gxtp.Container(req_type, value)) + else: + logger.warning(req.tag + " is not a valid tag for requirements child") + + def _load_edam_topics(self, tool, topics_root): + """ + Add <edam_topics> to the tool. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param topics_root: root of <edam_topics> tag. + :type topics_root: :class:`xml.etree._Element` + """ + tool.edam_topics = gxtp.EdamTopics() + for edam_topic in topics_root: + tool.edam_topics.append(gxtp.EdamTopic(edam_topic.text)) + + def _load_edam_operations(self, tool, operations_root): + """ + Add <edam_operations> to the tool. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param operations_root: root of <edam_operations> tag. + :type operations_root: :class:`xml.etree._Element` + """ + tool.edam_operations = gxtp.EdamOperations() + for edam_op in operations_root: + tool.edam_operations.append(gxtp.EdamOperation(edam_op.text)) + + def _load_configfiles(self, tool, configfiles_root): + """ + Add <configfiles> to the tool. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param configfiles_root: root of <configfiles> tag. + :type configfiles_root: :class:`xml.etree._Element` + """ + tool.configfiles = gxtp.Configfiles() + for conf in configfiles_root: + name = conf.attrib["name"] + value = conf.text + tool.configfiles.append(gxtp.Configfile(name, value)) + + def _load_citations(self, tool, citations_root): + """ + Add <citations> to the tool. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param citations_root: root of <citations> tag. + :type citations_root: :class:`xml.etree._Element` + """ + tool.citations = gxtp.Citations() + for cit in citations_root: + cit_type = cit.attrib["type"] + value = cit.text + tool.citations.append(gxtp.Citation(cit_type, value)) + + def _load_inputs(self, tool, inputs_root): + """ + Add <inputs> to the tool using the :class:`galaxyxml.tool.import_xml.InputsParser` object. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param inputs_root: root of <inputs> tag. + :type inputs_root: :class:`xml.etree._Element` + """ + tool.inputs = gxtp.Inputs() + inp_parser = InputsParser() + inp_parser.load_inputs(tool.inputs, inputs_root) + + def _load_outputs(self, tool, outputs_root): + """ + Add <outputs> to the tool using the :class:`galaxyxml.tool.import_xml.OutputsParser` object. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param outputs_root: root of <outputs> tag. + :type outputs_root: :class:`xml.etree._Element` + """ + tool.outputs = gxtp.Outputs() + out_parser = OutputsParser() + out_parser.load_outputs(tool.outputs, outputs_root) + + def _load_tests(self, tool, tests_root): + """ + Add <tests> to the tool using the :class:`galaxyxml.tool.import_xml.TestsParser` object. + + :param tool: Tool object from galaxyxml. + :type tool: :class:`galaxyxml.tool.Tool` + :param tests_root: root of <tests> tag. + :type tests_root: :class:`xml.etree._Element` + """ + tool.tests = gxtp.Tests() + tests_parser = TestsParser() + tests_parser.load_tests(tool.tests, tests_root) + + def import_xml(self, xml_path): + """ + Load existing xml into the :class:`galaxyxml.tool.Tool` object. + + :param xml_path: Path of the XML to be loaded. + :type xml_path: STRING + :return: XML content in the galaxyxml model. + :rtype: :class:`galaxyxml.tool.Tool` + """ + xml_root = ET.parse(xml_path).getroot() + tool = self._init_tool(xml_root) + # Now we import each tag's field + for child in xml_root: + try: + getattr(self, "_load_{}".format(child.tag))(tool, child) + except AttributeError: + logger.warning(child.tag + " tag is not processed.") + return tool + + +class InputsParser(object): + """ + Class to parse content of the <inputs> tag from a Galaxy XML wrapper. + """ + + def _load_text_param(self, root, text_param): + """ + Add <param type="text" /> to the root. + + :param root: root to append the param to. + :param text_param: root of <param> tag. + :type text_param: :class:`xml.etree._Element` + """ + root.append( + gxtp.TextParam( + text_param.attrib["name"], + optional=text_param.get("optional", None), + label=text_param.get("label", None), + help=text_param.get("help", None), + value=text_param.get("value", None), + ) + ) + + def _load_data_param(self, root, data_param): + """ + Add <param type="data" /> to the root. + + :param root: root to append the param to. + :param data_param: root of <param> tag. + :type data_param: :class:`xml.etree._Element` + """ + root.append( + gxtp.DataParam( + data_param.attrib["name"], + optional=data_param.attrib.get("optional", None), + label=data_param.attrib.get("label", None), + help=data_param.attrib.get("help", None), + format=data_param.attrib.get("format", None), + multiple=data_param.attrib.get("multiple", None), + ) + ) + + def _load_boolean_param(self, root, bool_param): + """ + Add <param type="boolean" /> to the root. + + :param root: root to append the param to. + :param bool_param: root of <param> tag. + :type bool_param: :class:`xml.etree._Element` + """ + root.append( + gxtp.BooleanParam( + bool_param.attrib["name"], + optional=bool_param.attrib.get("optional", None), + label=bool_param.attrib.get("label", None), + help=bool_param.attrib.get("help", None), + checked=bool_param.attrib.get("checked", False), + truevalue=bool_param.attrib.get("truevalue", None), + falsevalue=bool_param.attrib.get("falsevalue", None), + ) + ) + + def _load_integer_param(self, root, int_param): + """ + Add <param type="integer" /> to the root. + + :param root: root to append the param to. + :param int_param: root of <param> tag. + :type int_param: :class:`xml.etree._Element` + """ + root.append( + gxtp.IntegerParam( + int_param.attrib["name"], + int_param.attrib.get("value", None), + optional=int_param.attrib.get("optional", None), + label=int_param.attrib.get("label", None), + help=int_param.attrib.get("help", None), + min=int_param.attrib.get("min", None), + max=int_param.attrib.get("max", None), + ) + ) + + def _load_float_param(self, root, float_param): + """ + Add <param type="float" /> to the root. + + :param root: root to append the param to. + :param float_param: root of <param> tag. + :type float_param: :class:`xml.etree._Element` + """ + root.append( + gxtp.FloatParam( + float_param.attrib["name"], + float_param.attrib.get("value", None), + optional=float_param.attrib.get("optional", None), + label=float_param.attrib.get("label", None), + help=float_param.attrib.get("help", None), + min=float_param.attrib.get("min", None), + max=float_param.attrib.get("max", None), + ) + ) + + def _load_option_select(self, root, option): + """ + Add <option> to the root (usually <param type="select" />). + + :param root: root to append the param to. + :param option: root of <option> tag. + :type float_param: :class:`xml.etree._Element` + """ + root.append( + gxtp.SelectOption( + option.attrib.get("value", None), option.text, selected=option.attrib.get("selected", False) + ) + ) + + def _load_column_options(self, root, column): + """ + Add <column> to the root (usually <options>). + + :param root: root to append the param to. + :param option: root of <column> tag. + :type float_param: :class:`xml.etree._Element` + """ + root.append(gxtp.Column(column.attrib["name"], column.attrib["index"])) + + def _load_filter_options(self, root, filter): + """ + Add <filter> to the root (usually <options>). + + :param root: root to append the param to. + :param option: root of <filter> tag. + :type float_param: :class:`xml.etree._Element` + """ + root.append( + gxtp.Filter( + filter.attrib["type"], + column=filter.attrib.get("column", None), + name=filter.attrib.get("name", None), + ref=filter.attrib.get("ref", None), + key=filter.attrib.get("key", None), + multiple=filter.attrib.get("multiple", None), + separator=filter.attrib.get("separator", None), + keep=filter.attrib.get("keep", None), + value=filter.attrib.get("value", None), + ref_attribute=filter.attrib.get("ref_attribute", None), + index=filter.attrib.get("index", None), + ) + ) + + def _load_options_select(self, root, options): + """ + Add <options> to the root (usually <param type="select" />). + + :param root: root to append the param to. + :param option: root of <options> tag. + :type float_param: :class:`xml.etree._Element` + """ + opts = gxtp.Options( + from_dataset=options.attrib.get("from_dataset", None), + from_file=options.attrib.get("from_file", None), + from_data_table=options.attrib.get("from_data_table", None), + from_parameter=options.attrib.get("from_parameter", None), + ) + # Deal with child nodes (usually filter and column) + for opt_child in options: + try: + getattr(self, "_load_{}_options".format(opt_child.tag))(opts, opt_child) + except AttributeError: + logger.warning(opt_child.tag + " tag is not processed for <options>.") + root.append(opts) + + def _load_select_param(self, root, sel_param): + """ + Add <param type="select" /> to the root. + + :param root: root to append the param to. + :param sel_param: root of <param> tag. + :type sel_param: :class:`xml.etree._Element` + """ + select_param = gxtp.SelectParam( + sel_param.attrib["name"], + optional=sel_param.attrib.get("optional", None), + label=sel_param.attrib.get("label", None), + help=sel_param.attrib.get("help", None), + data_ref=sel_param.attrib.get("data_ref", None), + display=sel_param.attrib.get("display", None), + multiple=sel_param.attrib.get("multiple", None), + ) + # Deal with child nodes (usually option and options) + for sel_child in sel_param: + try: + getattr(self, "_load_{}_select".format(sel_child.tag))(select_param, sel_child) + except AttributeError: + logger.warning(sel_child.tag + " tag is not processed for <param type='select'>.") + root.append(select_param) + + def _load_param(self, root, param_root): + """ + Method to select which type of <param> is being added to the root. + + :param root: root to attach param to. + :param param_root: root of <param> tag. + :type param_root: :class:`xml.etree._Element` + """ + param_type = param_root.attrib["type"] + try: + getattr(self, "_load_{}_param".format(param_type))(root, param_root) + except AttributeError: + logger.warning(param_type + " tag is not processed for <param>.") + + def _load_when(self, root, when_root): + """ + Add <when> to the root (usually <conditional>). + + :param root: root to append when to. + :param when_root: root of <when> tag. + :type when_root: :class:`xml.etree._Element` + """ + when = gxtp.When(when_root.attrib["value"]) + # Deal with child nodes + self.load_inputs(when, when_root) + root.append(when) + + def _load_conditional(self, root, conditional_root): + """ + Add <conditional> to the root. + + :param root: root to append conditional to. + :param conditional_root: root of <conditional> tag. + :type conditional_root: :class:`xml.etree._Element` + """ + value_ref_in_group = conditional_root.attrib.get("value_ref_in_group", None) + # Other optional parameters need to be added to conditional object + conditional = gxtp.Conditional( + conditional_root.attrib["name"], + value_from=conditional_root.attrib.get("value_from", None), + value_ref=conditional_root.attrib.get("value_ref", None), + value_ref_in_group=value_ref_in_group, + label=conditional_root.attrib.get("label", None), + ) + # Deal with child nodes + self.load_inputs(conditional, conditional_root) + root.append(conditional) + + def _load_section(self, root, section_root): + """ + Add <section> to the root. + + :param root: root to append conditional to. + :param section_root: root of <section> tag. + :type section_root: :class:`xml.etree._Element` + """ + section = gxtp.Section( + section_root.attrib["name"], + section_root.attrib["title"], + expanded=section_root.attrib.get("expanded", None), + help=section_root.attrib.get("help", None), + ) + # Deal with child nodes + self.load_inputs(section, section_root) + root.append(section) + + def _load_repeat(self, root, repeat_root): + """ + Add <repeat> to the root. + + :param root: root to append repeat to. + :param repeat_root: root of <repeat> tag. + :param repeat_root: :class:`xml.etree._Element` + """ + repeat = gxtp.Repeat( + repeat_root.attrib["name"], + repeat_root.attrib["title"], + min=repeat_root.attrib.get("min", None), + max=repeat_root.attrib.get("max", None), + default=repeat_root.attrib.get("default", None), + ) + # Deal with child nodes + self.load_inputs(repeat, repeat_root) + root.append(repeat) + + def load_inputs(self, root, inputs_root): + """ + Add <inputs.tag> to the root (it can be any tags with children such as + <inputs>, <repeat>, <section> ...) + + :param root: root to attach inputs to (either <inputs> or <when>). + :param inputs_root: root of <inputs> tag. + :type inputs_root: :class:`xml.etree._Element` + """ + for inp_child in inputs_root: + try: + getattr(self, "_load_{}".format(inp_child.tag))(root, inp_child) + except AttributeError: + logger.warning(inp_child.tag + " tag is not processed for <" + inputs_root.tag + "> tag.") + + +class OutputsParser(object): + """ + Class to parse content of the <outputs> tag from a Galaxy XML wrapper. + """ + + def _load_data(self, outputs_root, data_root): + """ + Add <data> to <outputs>. + + :param outputs_root: <outputs> root to append <data> to. + :param data_root: root of <data> tag. + :param data_root: :class:`xml.etree._Element` + """ + data = gxtp.OutputData( + data_root.attrib.get("name", None), + data_root.attrib.get("format", None), + format_source=data_root.attrib.get("format_source", None), + metadata_source=data_root.attrib.get("metadata_source", None), + label=data_root.attrib.get("label", None), + from_work_dir=data_root.attrib.get("from_work_dir", None), + hidden=data_root.attrib.get("hidden", False), + ) + # Deal with child nodes + for data_child in data_root: + try: + getattr(self, "_load_{}".format(data_child.tag))(data, data_child) + except AttributeError: + logger.warning(data_child.tag + " tag is not processed for <data>.") + outputs_root.append(data) + + def _load_change_format(self, root, chfmt_root): + """ + Add <change_format> to root (<data>). + + :param root: root to append <change_format> to. + :param chfm_root: root of <change_format> tag. + :param chfm_root: :class:`xml.etree._Element` + """ + change_format = gxtp.ChangeFormat() + for chfmt_child in chfmt_root: + change_format.append( + gxtp.ChangeFormatWhen( + chfmt_child.attrib["input"], chfmt_child.attrib["format"], chfmt_child.attrib["value"] + ) + ) + root.append(change_format) + + def _load_collection(self, outputs_root, coll_root): + """ + Add <collection> to <outputs>. + + :param outputs_root: <outputs> root to append <collection> to. + :param coll_root: root of <collection> tag. + :param coll_root: :class:`xml.etree._Element` + """ + collection = gxtp.OutputCollection( + coll_root.attrib["name"], + type=coll_root.attrib.get("type", None), + label=coll_root.attrib.get("label", None), + format_source=coll_root.attrib.get("format_source", None), + type_source=coll_root.attrib.get("type_source", None), + structured_like=coll_root.attrib.get("structured_like", None), + inherit_format=coll_root.attrib.get("inherit_format", None), + ) + # Deal with child nodes + for coll_child in coll_root: + try: + getattr(self, "_load_{}".format(coll_child.tag))(collection, coll_child) + except AttributeError: + logger.warning(coll_child.tag + " tag is not processed for <collection>.") + outputs_root.append(collection) + + def _load_discover_datasets(self, root, disc_root): + """ + Add <discover_datasets> to root (<collection>). + + :param root: root to append <collection> to. + :param disc_root: root of <discover_datasets> tag. + :param disc_root: :class:`xml.etree._Element` + """ + root.append( + gxtp.DiscoverDatasets( + disc_root.attrib["pattern"], + directory=disc_root.attrib.get("directory", None), + format=disc_root.attrib.get("format", None), + ext=disc_root.attrib.get("ext", None), + visible=disc_root.attrib.get("visible", None), + ) + ) + + def _load_filter(self, root, filter_root): + """ + Add <filter> to root (<collection> or <data>). + + :param root: root to append <collection> to. + :param coll_root: root of <filter> tag. + :param coll_root: :class:`xml.etree._Element` + """ + root.append(gxtp.OutputFilter(filter_root.text)) + + def load_outputs(self, root, outputs_root): + """ + Add <outputs> to the root. + + :param root: root to attach <outputs> to (<tool>). + :param tests_root: root of <outputs> tag. + :type tests_root: :class:`xml.etree._Element` + """ + for out_child in outputs_root: + try: + getattr(self, "_load_{}".format(out_child.tag))(root, out_child) + except AttributeError: + logger.warning(out_child.tag + " tag is not processed for <outputs>.") + + +class TestsParser(object): + """ + Class to parse content of the <tests> tag from a Galaxy XML wrapper. + """ + + def _load_param(self, test_root, param_root): + """ + Add <param> to the <test>. + + :param root: <test> root to append <param> to. + :param repeat_root: root of <param> tag. + :param repeat_root: :class:`xml.etree._Element` + """ + test_root.append( + gxtp.TestParam( + param_root.attrib["name"], + value=param_root.attrib.get("value", None), + ftype=param_root.attrib.get("ftype", None), + dbkey=param_root.attrib.get("dbkey", None), + ) + ) + + def _load_output(self, test_root, output_root): + """ + Add <output> to the <test>. + + :param root: <test> root to append <output> to. + :param repeat_root: root of <output> tag. + :param repeat_root: :class:`xml.etree._Element` + """ + test_root.append( + gxtp.TestOutput( + name=output_root.attrib.get("name", None), + file=output_root.attrib.get("file", None), + ftype=output_root.attrib.get("ftype", None), + sort=output_root.attrib.get("sort", None), + value=output_root.attrib.get("value", None), + md5=output_root.attrib.get("md5", None), + checksum=output_root.attrib.get("checksum", None), + compare=output_root.attrib.get("compare", None), + lines_diff=output_root.attrib.get("lines_diff", None), + delta=output_root.attrib.get("delta", None), + ) + ) + + def load_tests(self, root, tests_root): + """ + Add <tests> to the root. + + :param root: root to attach <tests> to (<tool>). + :param tests_root: root of <tests> tag. + :type tests_root: :class:`xml.etree._Element` + """ + for test_root in tests_root: + test = gxtp.Test() + for test_child in test_root: + try: + getattr(self, "_load_{}".format(test_child.tag))(test, test_child) + except AttributeError: + logger.warning(test_child.tag + " tag is not processed within <test>.") + root.append(test)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/toolfactory/galaxyxml/tool/parameters/__init__.py Thu Nov 19 23:59:50 2020 +0000 @@ -0,0 +1,739 @@ +from builtins import object +from builtins import str + +from galaxyxml import Util + +from lxml import etree + + + +class XMLParam(object): + name = "node" + + def __init__(self, *args, **kwargs): + # http://stackoverflow.com/a/12118700 + self.children = [] + kwargs = {k: v for k, v in list(kwargs.items()) if v is not None} + kwargs = Util.coerce(kwargs, kill_lists=True) + kwargs = Util.clean_kwargs(kwargs, final=True) + self.node = etree.Element(self.name, **kwargs) + + def append(self, sub_node): + if self.acceptable_child(sub_node): + # If one of ours, they aren't etree nodes, they're custom objects + if issubclass(type(sub_node), XMLParam): + self.node.append(sub_node.node) + self.children.append(sub_node) + else: + raise Exception( + "Child was unacceptable to parent (%s is not appropriate for %s)" % (type(self), type(sub_node)) + ) + else: + raise Exception( + "Child was unacceptable to parent (%s is not appropriate for %s)" % (type(self), type(sub_node)) + ) + + def validate(self): + # Very few need validation, but some nodes we may want to have + # validation routines on. Should only be called when DONE. + for child in self.children: + # If any child fails to validate return false. + if not child.validate(): + return False + return True + + def cli(self): + lines = [] + for child in self.children: + lines.append(child.command_line()) + # lines += child.command_line() + return "\n".join(lines) + + def command_line(self): + return None + + +class RequestParamTranslation(XMLParam): + name = "request_param_translation" + + def __init__(self, **kwargs): + self.node = etree.Element(self.name) + + def acceptable_child(self, child): + return isinstance(child, RequestParamTranslation) + + +class RequestParam(XMLParam): + name = "request_param" + + def __init__(self, galaxy_name, remote_name, missing, **kwargs): + # TODO: bulk copy locals into self.attr? + self.galaxy_name = galaxy_name + # http://stackoverflow.com/a/1408860 + params = Util.clean_kwargs(locals().copy()) + super(RequestParam, self).__init__(**params) + + def acceptable_child(self, child): + return isinstance(child, AppendParam) and self.galaxy_name == "URL" + + +class AppendParam(XMLParam): + name = "append_param" + + def __init__(self, separator="&", first_separator="?", join="=", **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(AppendParam, self).__init__(**params) + + def acceptable_child(self, child): + return isinstance(child, AppendParamValue) + + +class AppendParamValue(XMLParam): + name = "value" + + def __init__(self, name="_export", missing="1", **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(AppendParamValue, self).__init__(**params) + + def acceptable_child(self, child): + return False + + +class EdamOperations(XMLParam): + name = "edam_operations" + + def acceptable_child(self, child): + return issubclass(type(child), EdamOperation) + + def has_operation(self, edam_operation): + """ + Check the presence of a given edam_operation. + + :type edam_operation: STRING + """ + for operation in self.children: + if operation.node.text == edam_operation: + return True + return False + + +class EdamOperation(XMLParam): + name = "edam_operation" + + def __init__(self, value): + super(EdamOperation, self).__init__() + self.node.text = str(value) + + +class EdamTopics(XMLParam): + name = "edam_topics" + + def acceptable_child(self, child): + return issubclass(type(child), EdamTopic) + + def has_topic(self, edam_topic): + """ + Check the presence of a given edam_topic. + + :type edam_topic: STRING + """ + for topic in self.children: + if topic.node.text == edam_topic: + return True + return False + + +class EdamTopic(XMLParam): + name = "edam_topic" + + def __init__(self, value): + super(EdamTopic, self).__init__() + self.node.text = str(value) + + +class Requirements(XMLParam): + name = "requirements" + # This bodes to be an issue -__- + + def acceptable_child(self, child): + return issubclass(type(child), Requirement) or issubclass(type(child), Container) + + +class Requirement(XMLParam): + name = "requirement" + + def __init__(self, type, value, version=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + passed_kwargs = {} + passed_kwargs["version"] = params["version"] + passed_kwargs["type"] = params["type"] + super(Requirement, self).__init__(**passed_kwargs) + self.node.text = str(value) + + +class Container(XMLParam): + name = "container" + + def __init__(self, type, value, **kwargs): + params = Util.clean_kwargs(locals().copy()) + passed_kwargs = {} + passed_kwargs["type"] = params["type"] + super(Container, self).__init__(**passed_kwargs) + self.node.text = str(value) + + +class Configfiles(XMLParam): + name = "configfiles" + + def acceptable_child(self, child): + return issubclass(type(child), Configfile) or issubclass(type(child), ConfigfileDefaultInputs) + + +class Configfile(XMLParam): + name = "configfile" + + def __init__(self, name, text, **kwargs): + params = Util.clean_kwargs(locals().copy()) + passed_kwargs = {} + passed_kwargs["name"] = params["name"] + super(Configfile, self).__init__(**passed_kwargs) + self.node.text = etree.CDATA(str(text)) + + +class ConfigfileDefaultInputs(XMLParam): + name = "inputs" + + def __init__(self, name, **kwargs): + params = Util.clean_kwargs(locals().copy()) + passed_kwargs = {} + passed_kwargs["name"] = params["name"] + super(ConfigfileDefaultInputs, self).__init__(**passed_kwargs) + + +class Inputs(XMLParam): + name = "inputs" + # This bodes to be an issue -__- + + def __init__(self, action=None, check_value=None, method=None, target=None, nginx_upload=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(Inputs, self).__init__(**params) + + def acceptable_child(self, child): + return issubclass(type(child), InputParameter) + + +class InputParameter(XMLParam): + def __init__(self, name, **kwargs): + # TODO: look at + self.mako_identifier = name + # We use kwargs instead of the usual locals(), so manually copy the + # name to kwargs + if name is not None: + kwargs["name"] = name + + # Handle positional parameters + if "positional" in kwargs and kwargs["positional"]: + self.positional = True + else: + self.positional = False + + if "num_dashes" in kwargs: + self.num_dashes = kwargs["num_dashes"] + del kwargs["num_dashes"] + else: + self.num_dashes = 0 + + self.space_between_arg = " " + + # Not sure about this :( + # https://wiki.galaxyproject.org/Tools/BestPractices#Parameter_help + if "label" in kwargs: + # TODO: replace with positional attribute + if len(self.flag()) > 0: + if kwargs["label"] is None: + kwargs["label"] = "Author did not provide help for this parameter... " + if not self.positional: + kwargs["argument"] = self.flag() + + super(InputParameter, self).__init__(**kwargs) + + def command_line(self): + before = self.command_line_before() + cli = self.command_line_actual() + after = self.command_line_after() + + complete = [x for x in (before, cli, after) if x is not None] + return "\n".join(complete) + + def command_line_before(self): + try: + return self.command_line_before_override + except Exception: + return None + + def command_line_after(self): + try: + return self.command_line_after_override + except Exception: + return None + + def command_line_actual(self): + try: + return self.command_line_override + except Exception: + if self.positional: + return self.mako_name() + else: + return "%s%s%s" % (self.flag(), self.space_between_arg, self.mako_name()) + + def mako_name(self): + # TODO: enhance logic to check up parents for things like + # repeat>condotion>param + return "$" + self.mako_identifier + + def flag(self): + flag = "-" * self.num_dashes + return flag + self.mako_identifier + + +class Section(InputParameter): + name = "section" + + def __init__(self, name, title, expanded=None, help=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(Section, self).__init__(**params) + + def acceptable_child(self, child): + return issubclass(type(child), InputParameter) + + +class Repeat(InputParameter): + name = "repeat" + + def __init__(self, name, title, min=None, max=None, default=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + # Allow overriding + self.command_line_before_override = "#for $i in $%s:" % name + self.command_line_after_override = "#end for" + # self.command_line_override + super(Repeat, self).__init__(**params) + + def acceptable_child(self, child): + return issubclass(type(child), InputParameter) + + def command_line_actual(self): + if hasattr(self, "command_line_override"): + return self.command_line_override + else: + return "%s" % self.mako_name() + + +class Conditional(InputParameter): + name = "conditional" + + def __init__(self, name, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(Conditional, self).__init__(**params) + + def acceptable_child(self, child): + return issubclass(type(child), InputParameter) and not isinstance(child, Conditional) + + def validate(self): + # Find a way to check if one of the kids is a WHEN + pass + + +class When(InputParameter): + name = "when" + + def __init__(self, value): + params = Util.clean_kwargs(locals().copy()) + super(When, self).__init__(None, **params) + + def acceptable_child(self, child): + return issubclass(type(child), InputParameter) + + +class Param(InputParameter): + name = "param" + + # This...isn't really valid as-is, and shouldn't be used. + def __init__(self, name, optional=None, label=None, help=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + params["type"] = self.type + super(Param, self).__init__(**params) + + if type(self) == Param: + raise Exception("Param class is not an actual parameter type, use a subclass of Param") + + def acceptable_child(self, child): + return issubclass(type(child, InputParameter) or isinstance(child), ValidatorParam) + + +class TextParam(Param): + type = "text" + + def __init__(self, name, optional=None, label=None, help=None, value=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(TextParam, self).__init__(**params) + + def command_line_actual(self): + try: + return self.command_line_override + except Exception: + if self.positional: + return self.mako_name() + else: + return f"{self.flag}{self.space_between_arg}'{self.mako_name()}'" + + +class _NumericParam(Param): + def __init__(self, name, value, optional=None, label=None, help=None, min=None, max=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(_NumericParam, self).__init__(**params) + + +class IntegerParam(_NumericParam): + type = "integer" + + +class FloatParam(_NumericParam): + type = "float" + + +class BooleanParam(Param): + type = "boolean" + + def __init__( + self, name, optional=None, label=None, help=None, checked=False, truevalue=None, falsevalue=None, **kwargs + ): + params = Util.clean_kwargs(locals().copy()) + + super(BooleanParam, self).__init__(**params) + if truevalue is None: + # If truevalue and falsevalue are None, then we use "auto", the IUC + # recommended default. + # + # truevalue is set to the parameter's value, and falsevalue is not. + # + # Unfortunately, mako_identifier is set as a result of the super + # call, which we shouldn't call TWICE, so we'll just hack around this :( + # params['truevalue'] = '%s%s' % (self.) + self.node.attrib["truevalue"] = self.flag() + + if falsevalue is None: + self.node.attrib["falsevalue"] = "" + + def command_line_actual(self): + if hasattr(self, "command_line_override"): + return self.command_line_override + else: + return "%s" % self.mako_name() + + +class DataParam(Param): + type = "data" + + def __init__(self, name, optional=None, label=None, help=None, format=None, multiple=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(DataParam, self).__init__(**params) + + +class SelectParam(Param): + type = "select" + + def __init__( + self, + name, + optional=None, + label=None, + help=None, + data_ref=None, + display=None, + multiple=None, + options=None, + default=None, + **kwargs + ): + params = Util.clean_kwargs(locals().copy()) + del params["options"] + del params["default"] + + super(SelectParam, self).__init__(**params) + + if options is not None and default is not None: + if default not in options: + raise Exception("Specified a default that isn't in options") + + if options: + for k, v in list(sorted(options.items())): + selected = k == default + self.append(SelectOption(k, v, selected=selected)) + + def acceptable_child(self, child): + return issubclass(type(child), SelectOption) or issubclass(type(child), Options) + + +class SelectOption(InputParameter): + name = "option" + + def __init__(self, value, text, selected=False, **kwargs): + params = Util.clean_kwargs(locals().copy()) + + passed_kwargs = {} + if selected: + passed_kwargs["selected"] = "true" + passed_kwargs["value"] = params["value"] + + super(SelectOption, self).__init__(None, **passed_kwargs) + self.node.text = str(text) + + +class Options(InputParameter): + name = "options" + + def __init__(self, from_dataset=None, from_file=None, from_data_table=None, from_parameter=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(Options, self).__init__(None, **params) + + def acceptable_child(self, child): + return issubclass(type(child), Column) or issubclass(type(child), Filter) + + +class Column(InputParameter): + name = "column" + + def __init__(self, name, index, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(Column, self).__init__(**params) + + +class Filter(InputParameter): + name = "filter" + + def __init__( + self, + type, + column=None, + name=None, + ref=None, + key=None, + multiple=None, + separator=None, + keep=None, + value=None, + ref_attribute=None, + index=None, + **kwargs + ): + params = Util.clean_kwargs(locals().copy()) + super(Filter, self).__init__(**params) + + +class ValidatorParam(InputParameter): + name = "validator" + + def __init__( + self, + type, + message=None, + filename=None, + metadata_name=None, + metadata_column=None, + line_startswith=None, + min=None, + max=None, + **kwargs + ): + params = Util.clean_kwargs(locals().copy()) + super(ValidatorParam, self).__init__(**params) + + +class Outputs(XMLParam): + name = "outputs" + + def acceptable_child(self, child): + return isinstance(child, OutputData) or isinstance(child, OutputCollection) + + +class OutputData(XMLParam): + """Copypasta of InputParameter, needs work + """ + + name = "data" + + def __init__( + self, + name, + format, + format_source=None, + metadata_source=None, + label=None, + from_work_dir=None, + hidden=False, + **kwargs + ): + # TODO: validate format_source&metadata_source against something in the + # XMLParam children tree. + self.mako_identifier = name + if "num_dashes" in kwargs: + self.num_dashes = kwargs["num_dashes"] + del kwargs["num_dashes"] + else: + self.num_dashes = 0 + self.space_between_arg = " " + params = Util.clean_kwargs(locals().copy()) + + super(OutputData, self).__init__(**params) + + def command_line(self): + if hasattr(self, "command_line_override"): + return self.command_line_override + else: + return "%s%s%s" % (self.flag(), self.space_between_arg, self.mako_name()) + + def mako_name(self): + return "$" + self.mako_identifier + + def flag(self): + flag = "-" * self.num_dashes + return flag + self.mako_identifier + + def acceptable_child(self, child): + return isinstance(child, OutputFilter) or isinstance(child, ChangeFormat) or isinstance(child, DiscoverDatasets) + + +class OutputFilter(XMLParam): + name = "filter" + + def __init__(self, text, **kwargs): + params = Util.clean_kwargs(locals().copy()) + del params["text"] + super(OutputFilter, self).__init__(**params) + self.node.text = text + + def acceptable_child(self, child): + return False + + +class ChangeFormat(XMLParam): + name = "change_format" + + def __init__(self, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(ChangeFormat, self).__init__(**params) + + def acceptable_child(self, child): + return isinstance(child, ChangeFormatWhen) + + +class ChangeFormatWhen(XMLParam): + name = "when" + + def __init__(self, input, format, value, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(ChangeFormatWhen, self).__init__(**params) + + def acceptable_child(self, child): + return False + + +class OutputCollection(XMLParam): + name = "collection" + + def __init__( + self, + name, + type=None, + label=None, + format_source=None, + type_source=None, + structured_like=None, + inherit_format=None, + **kwargs + ): + params = Util.clean_kwargs(locals().copy()) + super(OutputCollection, self).__init__(**params) + + def acceptable_child(self, child): + return isinstance(child, OutputData) or isinstance(child, OutputFilter) or isinstance(child, DiscoverDatasets) + + +class DiscoverDatasets(XMLParam): + name = "discover_datasets" + + def __init__(self, pattern, directory=None, format=None, ext=None, visible=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(DiscoverDatasets, self).__init__(**params) + + +class Tests(XMLParam): + name = "tests" + + def acceptable_child(self, child): + return issubclass(type(child), Test) + + +class Test(XMLParam): + name = "test" + + def acceptable_child(self, child): + return isinstance(child, TestParam) or isinstance(child, TestOutput) + + +class TestParam(XMLParam): + name = "param" + + def __init__(self, name, value=None, ftype=None, dbkey=None, **kwargs): + params = Util.clean_kwargs(locals().copy()) + super(TestParam, self).__init__(**params) + + +class TestOutput(XMLParam): + name = "output" + + def __init__( + self, + name=None, + file=None, + ftype=None, + sort=None, + value=None, + md5=None, + checksum=None, + compare=None, + lines_diff=None, + delta=None, + **kwargs + ): + params = Util.clean_kwargs(locals().copy()) + super(TestOutput, self).__init__(**params) + + +class Citations(XMLParam): + name = "citations" + + def acceptable_child(self, child): + return issubclass(type(child), Citation) + + def has_citation(self, type, value): + """ + Check the presence of a given citation. + + :type type: STRING + :type value: STRING + """ + for citation in self.children: + if citation.node.attrib["type"] == type and citation.node.text == value: + return True + return False + + +class Citation(XMLParam): + name = "citation" + + def __init__(self, type, value): + passed_kwargs = {} + passed_kwargs["type"] = type + super(Citation, self).__init__(**passed_kwargs) + self.node.text = str(value)