Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/planemo/tool_builder.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/planemo/tool_builder.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,909 @@ +"""This module contains :func:`build` to build tool descriptions. + +This class is used by the `tool_init` command and can be used to build +Galaxy and CWL tool descriptions. +""" + +import os +import re +import shlex +import shutil +import subprocess +from collections import namedtuple + +from planemo import io +from planemo import templates + +REUSING_MACROS_MESSAGE = ("Macros file macros.xml already exists, assuming " + " it has relevant planemo-generated definitions.") +DEFAULT_CWL_VERSION = "v1.0" + + +TOOL_TEMPLATE = """<tool id="{{id}}" name="{{name}}" version="{{version}}" python_template_version="3.5"> +{%- if description %} + <description>{{ description }}</description> +{%- endif %} +{%- if macros %} + <macros> + <import>macros.xml</import> + </macros> + <expand macro="requirements" /> +{%- if version_command %} + <expand macro="version_command" /> +{%- endif %} +{%- else %} + <requirements> +{%- for requirement in requirements %} + {{ requirement }} +{%- endfor %} +{%- for container in containers %} + {{ container }} +{%- endfor %} + </requirements> +{%- if version_command %} + <version_command>{{ version_command }}</version_command> +{%- endif %} +{%- endif %} + <command detect_errors="exit_code"><![CDATA[ +{%- if command %} + {{ command }} +{%- else %} + TODO: Fill in command template. +{%- endif %} + ]]></command> + <inputs> +{%- for input in inputs %} + {{ input }} +{%- endfor %} + </inputs> + <outputs> +{%- for output in outputs %} + {{ output }} +{%- endfor %} + </outputs> +{%- if tests %} + <tests> +{%- for test in tests %} + <test> +{%- for param in test.params %} + <param name="{{ param[0]}}" value="{{ param[1] }}"/> +{%- endfor %} +{%- for output in test.outputs %} + <output name="{{ output[0] }}" file="{{ output[1] }}"/> +{%- endfor %} + </test> +{%- endfor %} + </tests> +{%- endif %} + <help><![CDATA[ +{%- if help %} + {{ help }} +{%- else %} + TODO: Fill in help. +{%- endif %} + ]]></help> +{%- if macros %} + <expand macro="citations" /> +{%- else %} +{%- if doi or bibtex_citations %} + <citations> +{%- for single_doi in doi %} + <citation type="doi">{{ single_doi }}</citation> +{%- endfor %} +{%- for bibtex_citation in bibtex_citations %} + <citation type="bibtex">{{ bibtex_citation }}</citation> +{%- endfor %} + </citations> +{%- endif %} +{%- endif %} +</tool> +""" + +MACROS_TEMPLATE = """<macros> + <xml name="requirements"> + <requirements> +{%- for requirement in requirements %} + {{ requirement }} +{%- endfor %} + <yield/> +{%- for container in containers %} + {{ container }} +{%- endfor %} + </requirements> + </xml> + <xml name="citations"> + <citations> +{%- for single_doi in doi %} + <citation type="doi">{{ single_doi }}</citation> +{%- endfor %} +{%- for bibtex_citation in bibtex_citations %} + <citation type="bibtex">{{ bibtex_citation }}</citation> +{%- endfor %} + <yield /> + </citations> + </xml> +{%- if version_command %} + <xml name="version_command"> + <version_command>{{ version_command }}</version_command> + </xml> +{%- endif %} +</macros> +""" + +CWL_TEMPLATE = """#!/usr/bin/env cwl-runner +cwlVersion: '{{cwl_version}}' +class: CommandLineTool +id: "{{id}}" +label: "{{label}}" +{%- if containers or requirements %} +hints: +{%- for container in containers %} + DockerRequirement: + dockerPull: {{ container.image_id }} +{%- endfor %} +{%- if requirements %} + SoftwareRequirement: + packages: +{%- for requirement in requirements %} + - package: {{ requirement.name }} +{%- if requirement.version %} + version: + - "{{ requirement.version }}" +{%- else %} + version: [] +{%- endif %} +{%- endfor %} +{%- endif %} +{%- endif %} +{%- if inputs or outputs %} +inputs: +{%- for input in inputs %} + {{ input.id }}: + type: {{ input.type }} + doc: | + TODO + inputBinding: + position: {{ input.position }} +{%- if input.prefix %} + prefix: "{{input.prefix.prefix}}" +{%- if not input.prefix.separated %} + separate: false +{%- endif %} +{%- endif %} +{%- endfor %} +{%- for output in outputs %} +{%- if output.require_filename %} + {{ output.id }}: + type: string + doc: | + Filename for output {{ output.id }} + inputBinding: + position: {{ output.position }} +{%- if output.prefix %} + prefix: "{{output.prefix.prefix}}" +{%- if not output.prefix.separated %} + separate: false +{%- endif %} +{%- endif %} +{%- endif %} +{%- endfor %} +{%- else %} +inputs: [] # TODO +{%- endif %} +{%- if outputs %} +outputs: +{%- for output in outputs %} + {{ output.id }}: + type: File + outputBinding: + glob: {{ output.glob }} +{%- endfor %} +{%- else %} +outputs: [] # TODO +{%- endif %} +{%- if base_command %} +baseCommand: +{%- for base_command_part in base_command %} + - "{{ base_command_part}}" +{%- endfor %} +{%- else %} +baseCommand: [] +{%- endif %} +{%- if arguments %} +arguments: +{%- for argument in arguments %} + - valueFrom: "{{ argument.value }}" + position: {{ argument.position }} +{%- if argument.prefix %} + prefix: "{{argument.prefix.prefix}}" +{%- if not argument.prefix.separated %} + separate: false +{%- endif %} +{%- endif %} +{%- endfor %} +{%- else %} +arguments: [] +{%- endif %} +{%- if stdout %} +stdout: {{ stdout }} +{%- endif %} +doc: | +{%- if help %} + {{ help|indent(2) }} +{%- else %} + TODO: Fill in description. +{%- endif %} +""" + +CWL_TEST_TEMPLATE = """ +- doc: test generated from example command + job: {{ job_filename }} +{%- if outputs %} + outputs: +{%- for output in outputs %} + {{ output.id }}: + path: test-data/{{ output.example_value }} +{%- endfor %} +{%- else %} + outputs: TODO +{%- endif %} +""" + +CWL_JOB_TEMPLATE = """ +{%- if inputs %} +{%- for input in inputs %} +{%- if input.type == "File" %} +{{ input.id }}: + class: File + path: test-data/{{ input.example_value }} +{%- else %} + {{ input.id }}: {{ input.example_value }} +{%- endif %} +{%- endfor %} +{%- else %} +# TODO: Specify job input. +{} +{%- endif %} +""" + + +def build(**kwds): + """Build up a :func:`ToolDescription` from supplid arguments.""" + if kwds.get("cwl"): + builder = _build_cwl + else: + builder = _build_galaxy + return builder(**kwds) + + +def _build_cwl(**kwds): + _handle_help(kwds) + _handle_requirements(kwds) + assert len(kwds["containers"]) <= 1, kwds + command_io = CommandIO(**kwds) + render_kwds = { + "cwl_version": DEFAULT_CWL_VERSION, + "help": kwds.get("help", ""), + "containers": kwds.get("containers", []), + "requirements": kwds.get("requirements", []), + "id": kwds.get("id"), + "label": kwds.get("name"), + } + render_kwds.update(command_io.cwl_properties()) + + contents = _render(render_kwds, template_str=CWL_TEMPLATE) + tool_files = [] + test_files = [] + if kwds["test_case"]: + sep = "-" if "-" in kwds.get("id") else "_" + tests_path = "%s%stests.yml" % (kwds.get("id"), sep) + job_path = "%s%sjob.yml" % (kwds.get("id"), sep) + render_kwds["job_filename"] = job_path + test_contents = _render(render_kwds, template_str=CWL_TEST_TEMPLATE) + job_contents = _render(render_kwds, template_str=CWL_JOB_TEMPLATE) + tool_files.append(ToolFile(tests_path, test_contents, "test")) + tool_files.append(ToolFile(job_path, job_contents, "job")) + for cwl_input in render_kwds["inputs"] or []: + if cwl_input.type == "File" and cwl_input.example_value: + test_files.append(cwl_input.example_value) + + for cwl_output in render_kwds["outputs"] or []: + if cwl_output.example_value: + test_files.append(cwl_output.example_value) + + return ToolDescription( + contents, + tool_files=tool_files, + test_files=test_files + ) + + +def _build_galaxy(**kwds): + # Test case to build up from supplied inputs and outputs, ultimately + # ignored unless kwds["test_case"] is truthy. + + _handle_help(kwds) + + # process raw cite urls + cite_urls = kwds.get("cite_url", []) + del kwds["cite_url"] + citations = list(map(UrlCitation, cite_urls)) + kwds["bibtex_citations"] = citations + + # handle requirements and containers + _handle_requirements(kwds) + + command_io = CommandIO(**kwds) + kwds["inputs"] = command_io.inputs + kwds["outputs"] = command_io.outputs + kwds["command"] = command_io.cheetah_template + + test_case = command_io.test_case() + + # finally wrap up tests + tests, test_files = _handle_tests(kwds, test_case) + kwds["tests"] = tests + + # Render tool content from template. + contents = _render(kwds) + + tool_files = [] + append_macro_file(tool_files, kwds) + + return ToolDescription( + contents, + tool_files=tool_files, + test_files=test_files + ) + + +def append_macro_file(tool_files, kwds): + macro_contents = None + if kwds["macros"]: + macro_contents = _render(kwds, MACROS_TEMPLATE) + + macros_file = "macros.xml" + if not os.path.exists(macros_file): + tool_files.append(ToolFile(macros_file, macro_contents, "macros")) + + io.info(REUSING_MACROS_MESSAGE) + + +class CommandIO(object): + + def __init__(self, **kwds): + command = _find_command(kwds) + cheetah_template = command + + # process raw inputs + inputs = kwds.pop("input", []) + inputs = list(map(Input, inputs or [])) + + # alternatively process example inputs + example_inputs = kwds.pop("example_input", []) + for i, input_file in enumerate(example_inputs or []): + name = "input%d" % (i + 1) + inputs.append(Input(input_file, name=name, example=True)) + cheetah_template = _replace_file_in_command(cheetah_template, input_file, name) + + # handle raw outputs (from_work_dir ones) as well as named_outputs + outputs = kwds.pop("output", []) + outputs = list(map(Output, outputs or [])) + + named_outputs = kwds.pop("named_output", []) + for named_output in (named_outputs or []): + outputs.append(Output(name=named_output, example=False)) + + # handle example outputs + example_outputs = kwds.pop("example_output", []) + for i, output_file in enumerate(example_outputs or []): + name = "output%d" % (i + 1) + from_path = output_file + use_from_path = True + if output_file in cheetah_template: + # Actually found the file in the command, assume it can + # be specified directly and skip from_work_dir. + use_from_path = False + output = Output(name=name, from_path=from_path, + use_from_path=use_from_path, example=True) + outputs.append(output) + cheetah_template = _replace_file_in_command(cheetah_template, output_file, output.name) + + self.inputs = inputs + self.outputs = outputs + self.command = command + self.cheetah_template = cheetah_template + + def example_input_names(self): + for input in self.inputs: + if input.example: + yield input.input_description + + def example_output_names(self): + for output in self.outputs: + if output.example: + yield output.example_path + + def cwl_lex_list(self): + if not self.command: + return [] + + command_parts = shlex.split(self.command) + parse_list = [] + + input_count = 0 + output_count = 0 + + index = 0 + + prefixed_parts = [] + while index < len(command_parts): + value = command_parts[index] + eq_split = value.split("=") + + prefix = None + if not _looks_like_start_of_prefix(index, command_parts): + index += 1 + elif len(eq_split) == 2: + prefix = Prefix(eq_split[0] + "=", False) + value = eq_split[1] + index += 1 + else: + prefix = Prefix(value, True) + value = command_parts[index + 1] + index += 2 + prefixed_parts.append((prefix, value)) + + for position, (prefix, value) in enumerate(prefixed_parts): + if value in self.example_input_names(): + input_count += 1 + input = CwlInput( + "input%d" % input_count, + position, + prefix, + value, + ) + parse_list.append(input) + elif value in self.example_output_names(): + output_count += 1 + output = CwlOutput( + "output%d" % output_count, + position, + prefix, + value, + ) + parse_list.append(output) + elif prefix: + param_id = prefix.prefix.lower().rstrip("=") + type_ = param_type(value) + input = CwlInput( + param_id, + position, + prefix, + value, + type_=type_, + ) + parse_list.append(input) + else: + part = CwlCommandPart(value, position, prefix) + parse_list.append(part) + return parse_list + + def cwl_properties(self): + base_command = [] + arguments = [] + inputs = [] + outputs = [] + + lex_list = self.cwl_lex_list() + + index = 0 + while index < len(lex_list): + token = lex_list[index] + if isinstance(token, CwlCommandPart): + base_command.append(token.value) + else: + break + index += 1 + + while index < len(lex_list): + token = lex_list[index] + if token.is_token(">"): + break + token.position = index - len(base_command) + 1 + if isinstance(token, CwlCommandPart): + arguments.append(token) + elif isinstance(token, CwlInput): + inputs.append(token) + elif isinstance(token, CwlOutput): + token.glob = "$(inputs.%s)" % token.id + outputs.append(token) + + index += 1 + + stdout = None + if index < len(lex_list): + token = lex_list[index] + if token.is_token(">") and (index + 1) < len(lex_list): + output_token = lex_list[index + 1] + if not isinstance(output_token, CwlOutput): + output_token = CwlOutput("std_out", None) + + output_token.glob = "out" + output_token.require_filename = False + outputs.append(output_token) + stdout = "out" + index += 2 + else: + io.warn("Example command too complex, you will need to build it up manually.") + + return { + "inputs": inputs, + "outputs": outputs, + "arguments": arguments, + "base_command": base_command, + "stdout": stdout, + } + + def test_case(self): + test_case = TestCase() + for input in self.inputs: + if input.example: + test_case.params.append((input.name, input.input_description)) + + for output in self.outputs: + if output.example: + test_case.outputs.append((output.name, output.example_path)) + + return test_case + + +def _looks_like_start_of_prefix(index, parts): + value = parts[index] + if len(value.split("=")) == 2: + return True + if index + 1 == len(parts): + return False + next_value = parts[index + 1] + next_value_is_not_start = (len(value.split("=")) != 2) and next_value[0] not in ["-", ">", "<", "|"] + return value.startswith("-") and next_value_is_not_start + + +Prefix = namedtuple("Prefix", ["prefix", "separated"]) + + +class CwlCommandPart(object): + + def __init__(self, value, position, prefix): + self.value = value + self.position = position + self.prefix = prefix + + def is_token(self, value): + return self.value == value + + +class CwlInput(object): + + def __init__(self, id, position, prefix, example_value, type_="File"): + self.id = id + self.position = position + self.prefix = prefix + self.example_value = example_value + self.type = type_ + + def is_token(self, value): + return False + + +class CwlOutput(object): + + def __init__(self, id, position, prefix, example_value): + self.id = id + self.position = position + self.prefix = prefix + self.glob = None + self.example_value = example_value + self.require_filename = True + + def is_token(self, value): + return False + + +def _render(kwds, template_str=TOOL_TEMPLATE): + """ Apply supplied template variables to TOOL_TEMPLATE to generate + the final tool. + """ + return templates.render(template_str, **kwds) + + +def _replace_file_in_command(command, specified_file, name): + """ Replace example file with cheetah variable name in supplied command + or command template. Be sure to single quote the name. + """ + # TODO: check if the supplied variant was single quoted already. + if '"%s"' % specified_file in command: + # Sample command already wrapped filename in double quotes + command = command.replace('"%s"' % specified_file, "'$%s'" % name) + elif (" %s " % specified_file) in (" " + command + " "): + # In case of spaces, best to wrap filename in double quotes + command = command.replace(specified_file, "'$%s'" % name) + else: + command = command.replace(specified_file, '$%s' % name) + return command + + +def _handle_help(kwds): + """ Convert supplied help parameters into a help variable for template. + If help_text is supplied, use as is. If help is specified from a command, + run the command and use that help text. + """ + help_text = kwds.get("help_text") + if not help_text: + help_from_command = kwds.get("help_from_command") + if help_from_command: + p = subprocess.Popen( + help_from_command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True + ) + help_text = p.communicate()[0] + + del kwds["help_text"] + del kwds["help_from_command"] + + kwds["help"] = help_text + + +def _handle_tests(kwds, test_case): + """ Given state built up from handling rest of arguments (test_case) and + supplied kwds - build tests for template and corresponding test files. + """ + test_files = [] + if kwds["test_case"]: + tests = [test_case] + test_files.extend(map(lambda x: x[1], test_case.params)) + test_files.extend(map(lambda x: x[1], test_case.outputs)) + else: + tests = [] + return tests, test_files + + +def _handle_requirements(kwds): + """ Convert requirements and containers specified from the command-line + into abstract format for consumption by the template. + """ + requirements = kwds["requirement"] + del kwds["requirement"] + requirements = list(map(Requirement, requirements or [])) + + container = kwds["container"] + del kwds["container"] + containers = list(map(Container, container or [])) + + kwds["requirements"] = requirements + kwds["containers"] = containers + + +def _find_command(kwds): + """Find base command from supplied arguments or just return None. + + If no such command was supplied (template will just replace this + with a TODO item). + """ + command = kwds.get("command") + if not command: + command = kwds.get("example_command", None) + if command: + del kwds["example_command"] + return command + + +class UrlCitation(object): + + def __init__(self, url): + self.url = url + + def __str__(self): + if "github.com" in self.url: + return self._github_str() + else: + return self._url_str() + + def _github_str(self): + url = self.url + title = url.split("/")[-1] + return ''' +@misc{github%s, + author = {LastTODO, FirstTODO}, + year = {TODO}, + title = {%s}, + publisher = {GitHub}, + journal = {GitHub repository}, + url = {%s}, +}''' % (title, title, url) + + def _url_str(self): + url = self.url + return ''' +@misc{renameTODO, + author = {LastTODO, FirstTODO}, + year = {TODO}, + title = {TODO}, + url = {%s}, +}''' % (url) + + +class ToolDescription(object): + """An description of the tool and related files to create.""" + + def __init__(self, contents, tool_files=None, test_files=[]): + self.contents = contents + self.tool_files = tool_files or [] + self.test_files = test_files + + +class ToolFile(object): + + def __init__(self, filename, contents, description): + self.filename = filename + self.contents = contents + self.description = description + + +class Input(object): + + def __init__(self, input_description, name=None, example=False): + parts = input_description.split(".") + name = name or parts[0] + if len(parts) > 0: + datatype = ".".join(parts[1:]) + else: + datatype = "data" + + self.input_description = input_description + self.example = example + self.name = name + self.datatype = datatype + + def __str__(self): + template = '<param type="data" name="{0}" format="{1}" />' + self.datatype = self.datatype.split(".")[-1] + return template.format(self.name, self.datatype) + + +class Output(object): + + def __init__(self, from_path=None, name=None, use_from_path=False, example=False): + if from_path: + parts = from_path.split(".") + name = name or parts[0] + if len(parts) > 1: + datatype = ".".join(parts[1:]) + else: + datatype = "data" + else: + name = name + datatype = "data" + + self.name = name + self.datatype = datatype + if use_from_path: + self.from_path = from_path + else: + self.from_path = None + self.example = example + if example: + self.example_path = from_path + + def __str__(self): + if self.from_path: + return self._from_path_str() + else: + return self._named_str() + + def _from_path_str(self): + template = '<data name="{0}" format="{1}" from_work_dir="{2}" />' + return template.format(self.name, self.datatype, self.from_path) + + def _named_str(self): + template = '<data name="{0}" format="{1}" />' + return template.format(self.name, self.datatype) + + +class Requirement(object): + + def __init__(self, requirement): + parts = requirement.split("@", 1) + if len(parts) > 1: + name = parts[0] + version = "@".join(parts[1:]) + else: + name = parts[0] + version = None + self.name = name + self.version = version + + def __str__(self): + base = '<requirement type="package"{0}>{1}</requirement>' + if self.version is not None: + attrs = ' version="{0}"'.format(self.version) + else: + attrs = '' + return base.format(attrs, self.name) + + +def param_type(value): + if re.match(r"^\d+$", value): + return "int" + elif re.match(r"^\d+?\.\d+?$", value): + return "float" + else: + return "string" + + +class Container(object): + + def __init__(self, image_id): + self.type = "docker" + self.image_id = image_id + + def __str__(self): + template = '<container type="{0}">{1}</container>' + return template.format(self.type, self.image_id) + + +class TestCase(object): + + def __init__(self): + self.params = [] + self.outputs = [] + + +def write_tool_description(ctx, tool_description, **kwds): + """Write a tool description to the file system guided by supplied CLI kwds.""" + tool_id = kwds.get("id") + output = kwds.get("tool") + if not output: + extension = "cwl" if kwds.get("cwl") else "xml" + output = "%s.%s" % (tool_id, extension) + if not io.can_write_to_path(output, **kwds): + ctx.exit(1) + + io.write_file(output, tool_description.contents) + io.info("Tool written to %s" % output) + for tool_file in tool_description.tool_files: + if tool_file.contents is None: + continue + + path = tool_file.filename + if not io.can_write_to_path(path, **kwds): + ctx.exit(1) + io.write_file(path, tool_file.contents) + io.info("Tool %s written to %s" % (tool_file.description, path)) + + macros = kwds["macros"] + macros_file = "macros.xml" + if macros and not os.path.exists(macros_file): + io.write_file(macros_file, tool_description.macro_contents) + elif macros: + io.info(REUSING_MACROS_MESSAGE) + if tool_description.test_files: + if not os.path.exists("test-data"): + io.info("No test-data directory, creating one.") + os.makedirs('test-data') + for test_file in tool_description.test_files: + io.info("Copying test-file %s" % test_file) + try: + shutil.copy(test_file, 'test-data') + except Exception as e: + io.info("Copy of %s failed: %s" % (test_file, e)) + + +__all__ = ( + "build", + "ToolDescription", + "write_tool_description", +)