Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/planemo/runnable.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
"""Describe artifacts that can be run, tested, and linted.""" from __future__ import absolute_import import abc import collections import os from distutils.dir_util import copy_tree import aenum import six import yaml from galaxy.tool_util.cwl.parser import workflow_proxy from galaxy.tool_util.loader_directory import ( is_a_yaml_with_class, looks_like_a_cwl_artifact, looks_like_a_data_manager_xml, looks_like_a_tool_cwl, looks_like_a_tool_xml, ) from galaxy.tool_util.parser import get_tool_source from planemo.exit_codes import EXIT_CODE_UNKNOWN_FILE_TYPE, ExitCodeException from planemo.galaxy.workflows import describe_outputs from planemo.io import error from planemo.test import check_output TEST_SUFFIXES = [ "-tests", "_tests", "-test", "_test" ] TEST_EXTENSIONS = [".yml", ".yaml", ".json"] TEST_FILE_NOT_LIST_MESSAGE = ("Invalid test definition file [%s] - file must " "contain a list of tests") TEST_FIELD_MISSING_MESSAGE = ("Invalid test definition [test #%d in %s] -" "defintion must field [%s].") RunnableType = aenum.Enum( "RunnableType", 'galaxy_tool galaxy_datamanager galaxy_workflow cwl_tool cwl_workflow directory' ) @property def _runnable_type_has_tools(runnable_type): return runnable_type.name in ["galaxy_tool", "galaxy_datamanager", "cwl_tool", "directory"] @property def _runnable_type_is_single_artifact(runnable_type): return runnable_type.name not in ["directory"] @property def _runnable_type_test_data_in_parent_dir(runnable_type): return runnable_type.name in ["galaxy_datamanager"] RunnableType.has_tools = _runnable_type_has_tools RunnableType.is_single_artifact = _runnable_type_is_single_artifact RunnableType.test_data_in_parent_dir = _runnable_type_test_data_in_parent_dir _Runnable = collections.namedtuple("Runnable", ["path", "type"]) class Runnable(_Runnable): @property def test_data_search_path(self): if self.type.name in ['galaxy_datamanager']: return os.path.join(os.path.dirname(self.path), os.path.pardir) else: return self.path @property def tool_data_search_path(self): return self.test_data_search_path @property def data_manager_conf_path(self): if self.type.name in ['galaxy_datamanager']: return os.path.join(os.path.dirname(self.path), os.pardir, 'data_manager_conf.xml') @property def has_tools(self): return _runnable_delegate_attribute('has_tools') @property def is_single_artifact(self): return _runnable_delegate_attribute('is_single_artifact') def _runnable_delegate_attribute(attribute): @property def getter(runnable): return getattr(runnable.type, attribute) return getter def _copy_runnable_tree(path, runnable_type, temp_path): dir_to_copy = None if runnable_type in {RunnableType.galaxy_tool, RunnableType.cwl_tool}: dir_to_copy = os.path.dirname(path) path = os.path.join(temp_path, os.path.basename(path)) elif runnable_type == RunnableType.directory: dir_to_copy = path path = temp_path elif runnable_type == RunnableType.galaxy_datamanager: dir_to_copy = os.path.join(os.path.dirname(path), os.pardir) path_to_data_manager_tool = os.path.relpath(path, dir_to_copy) path = os.path.join(temp_path, path_to_data_manager_tool) if dir_to_copy: copy_tree(dir_to_copy, temp_path) return path def for_path(path, temp_path=None): """Produce a class:`Runnable` for supplied path.""" runnable_type = None if os.path.isdir(path): runnable_type = RunnableType.directory elif looks_like_a_tool_cwl(path): runnable_type = RunnableType.cwl_tool elif looks_like_a_data_manager_xml(path): runnable_type = RunnableType.galaxy_datamanager elif looks_like_a_tool_xml(path): runnable_type = RunnableType.galaxy_tool elif is_a_yaml_with_class(path, ["GalaxyWorkflow"]): runnable_type = RunnableType.galaxy_workflow elif path.endswith(".ga"): runnable_type = RunnableType.galaxy_workflow elif looks_like_a_cwl_artifact(path, ["Workflow"]): runnable_type = RunnableType.cwl_workflow if runnable_type is None: error("Unable to determine runnable type for path [%s]" % path) raise ExitCodeException(EXIT_CODE_UNKNOWN_FILE_TYPE) if temp_path: path = _copy_runnable_tree(path, runnable_type, temp_path) return Runnable(path, runnable_type) def for_paths(paths, temp_path=None): """Return a specialized list of Runnable objects for paths.""" return [for_path(path, temp_path=temp_path) for path in paths] def cases(runnable): """Build a `list` of :class:`TestCase` objects for specified runnable.""" cases = [] tests_path = _tests_path(runnable) if tests_path is None: if runnable.type == RunnableType.galaxy_tool: tool_source = get_tool_source(runnable.path) test_dicts = tool_source.parse_tests_to_dict() tool_id = tool_source.parse_id() tool_version = tool_source.parse_version() for i, test_dict in enumerate(test_dicts.get("tests", [])): cases.append(ExternalGalaxyToolTestCase(runnable, tool_id, tool_version, i, test_dict)) return cases tests_directory = os.path.abspath(os.path.dirname(tests_path)) def normalize_to_tests_path(path): if not os.path.isabs(path): absolute_path = os.path.join(tests_directory, path) else: absolute_path = path return os.path.normpath(absolute_path) with open(tests_path, "r") as f: tests_def = yaml.safe_load(f) if not isinstance(tests_def, list): message = TEST_FILE_NOT_LIST_MESSAGE % tests_path raise Exception(message) for i, test_def in enumerate(tests_def): if "job" not in test_def: message = TEST_FIELD_MISSING_MESSAGE % ( i + 1, tests_path, "job" ) raise Exception(message) job_def = test_def["job"] if isinstance(job_def, dict): job_path = None job = job_def else: job_path = normalize_to_tests_path(job_def) job = None doc = test_def.get("doc", None) output_expectations = test_def.get("outputs", {}) case = TestCase( runnable=runnable, tests_directory=tests_directory, output_expectations=output_expectations, index=i, job_path=job_path, job=job, doc=doc, ) cases.append(case) return cases class AbstractTestCase(object): """Description of a test case for a runnable. """ __metaclass__ = abc.ABCMeta def structured_test_data(self, run_response): """Result of executing this test case - a "structured_data" dict. :rtype: dict :return: For example:: { "id": "", "has_data": true, "data": { "status": "success", // error, skip, "job": { "command_line": "cat moo", "stdout": "", "stderr": "" }, "output_problems": [], "execution_problem": "", "inputs" = {}, "problem_log": "" } } """ class TestCase(AbstractTestCase): """Describe an abstract test case for a specified runnable.""" def __init__(self, runnable, tests_directory, output_expectations, job_path, job, index, doc): """Construct TestCase object from required attributes.""" self.runnable = runnable self.job_path = job_path self.job = job self.output_expectations = output_expectations self.tests_directory = tests_directory self.index = index self.doc = doc def __repr__(self): return 'TestCase (%s) for runnable (%s) with job (%s) and expected outputs (%s) in directory (%s) with id (%s)' % \ (self.doc, self.runnable, self.job, self.output_expectations, self.tests_directory, self.index) def structured_test_data(self, run_response): """Check a test case against outputs dictionary. """ output_problems = [] if run_response.was_successful: outputs_dict = run_response.outputs_dict execution_problem = None for output_id, output_test in self.output_expectations.items(): if output_id not in outputs_dict: message = "Expected output [%s] not found in results." % output_id output_problems.append(message) continue output_value = outputs_dict[output_id] output_problems.extend( self._check_output(output_id, output_value, output_test) ) if output_problems: status = "failure" else: status = "success" else: execution_problem = run_response.error_message status = "error" data_dict = dict( status=status ) if status != "success": data_dict["output_problems"] = output_problems data_dict["execution_problem"] = execution_problem log = run_response.log if log is not None: data_dict["problem_log"] = log job_info = run_response.job_info if job_info is not None: data_dict["job"] = job_info data_dict["inputs"] = self._job return dict( id=("%s_%s" % (self._test_id, self.index)), has_data=True, data=data_dict, ) @property def _job(self): if self.job_path is not None: with open(self.job_path, "r") as f: return f.read() else: return self.job def _check_output(self, output_id, output_value, output_test): output_problems = [] if not isinstance(output_test, dict): if output_test != output_value: template = "Output [%s] value [%s] does not match expected value [%s]." message = template % (output_id, output_value, output_test) output_problems.append(message) else: if not isinstance(output_value, dict): output_problems.append("Expected file properties for output [%s]" % output_id) return if "path" not in output_value and "location" in output_value: assert output_value["location"].startswith("file://") output_value["path"] = output_value["location"][len("file://"):] if "path" not in output_value: output_problems.append("No path specified for expected output file [%s]" % output_id) return output_problems.extend( check_output( self.runnable, output_value, output_test, # TODO: needs kwds in here... ) ) return output_problems @property def _test_id(self): if self.runnable.type in [ RunnableType.cwl_tool, RunnableType.galaxy_tool, ]: return get_tool_source(self.runnable.path).parse_id() else: return os.path.basename(self.runnable.path) class ExternalGalaxyToolTestCase(AbstractTestCase): """Special class of AbstractCase that doesn't use job_path but uses test data from a Galaxy server. """ def __init__(self, runnable, tool_id, tool_version, test_index, test_dict): """Construct TestCase object from required attributes.""" self.runnable = runnable self.tool_id = tool_id self.tool_version = tool_version self.test_index = test_index self.test_dict = test_dict def structured_test_data(self, run_response): """Just return the structured_test_data generated from galaxy-tool-util for this test variant. """ return run_response def _tests_path(runnable): if not runnable.is_single_artifact: raise NotImplementedError("Tests for directories are not yet implemented.") runnable_path = runnable.path base, _ = os.path.splitext(runnable_path) for test_suffix in TEST_SUFFIXES: for test_extension in TEST_EXTENSIONS: test_path = base + test_suffix + test_extension if os.path.exists(test_path): return test_path return None def get_outputs(runnable): """Return a list of :class:`RunnableOutput` objects for this runnable.""" if not runnable.is_single_artifact: raise NotImplementedError("Cannot generate outputs for a directory.") if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]: tool_source = get_tool_source(runnable.path) # TODO: do something with collections at some point output_datasets, _ = tool_source.parse_outputs(None) outputs = [ToolOutput(o) for o in output_datasets.values()] return outputs elif runnable.type == RunnableType.galaxy_workflow: workflow_outputs = describe_outputs(runnable.path) return [GalaxyWorkflowOutput(o) for o in workflow_outputs] elif runnable.type == RunnableType.cwl_workflow: workflow = workflow_proxy(runnable.path, strict_cwl_validation=False) return [CwlWorkflowOutput(label) for label in workflow.output_labels] else: raise NotImplementedError("Getting outputs for this artifact type is not yet supported.") class RunnableOutput(object): """Description of a single output of an execution of a Runnable.""" __metaclass__ = abc.ABCMeta @abc.abstractproperty def get_id(self): """An identifier that describes this output.""" class ToolOutput(RunnableOutput): def __init__(self, tool_output): self._tool_output = tool_output def get_id(self): return self._tool_output.name class GalaxyWorkflowOutput(RunnableOutput): def __init__(self, workflow_output): self._workflow_output = workflow_output def get_id(self): return self._workflow_output.label @property def workflow_output(self): return self._workflow_output class CwlWorkflowOutput(RunnableOutput): def __init__(self, label): self._label = label def get_id(self): return self._label class RunResponse(object): """Description of an attempt for an engine to execute a Runnable.""" __metaclass__ = abc.ABCMeta @abc.abstractproperty def was_successful(self): """Indicate whether an error was encountered while executing this runnble. If successful, response should conform to the SuccessfulRunResponse interface, otherwise it will conform to the ErrorRunResponse interface. """ @abc.abstractproperty def job_info(self): """If job information is available, return as dictionary.""" @abc.abstractproperty def log(self): """If engine related log is available, return as text data.""" class SuccessfulRunResponse(RunResponse): """Description of the results of an engine executing a Runnable.""" __metaclass__ = abc.ABCMeta def was_successful(self): """Return `True` to indicate this run was successful.""" return True @abc.abstractproperty def outputs_dict(self): """Return a dict of output descriptions.""" @six.python_2_unicode_compatible class ErrorRunResponse(RunResponse): """Description of an error while attempting to execute a Runnable.""" def __init__(self, error_message, job_info=None, log=None): """Create an ErrorRunResponse with specified error message.""" self._error_message = error_message self._job_info = job_info self._log = log @property def error_message(self): """Error message describing the problem with execution of the runnable.""" return self._error_message @property def was_successful(self): """Return `False` to indicate this run was successful.""" return False @property def job_info(self): """Return potentially null stored `job_info` dict.""" return self._job_info @property def log(self): """Return potentially null stored `log` text.""" return self._log def __str__(self): """Print a helpful error description of run.""" message = "Run failed with message [%s]" % self.error_message log = self.log if log: message += " and log [%s]" % log return message __all__ = ( "cases", "ErrorRunResponse", "for_path", "for_paths", "get_outputs", "Runnable", "RunnableType", "RunResponse", "RunnableOutput", "SuccessfulRunResponse", "TestCase", )