view env/lib/python3.7/site-packages/planemo/runnable.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line source

"""Describe artifacts that can be run, tested, and linted."""

from __future__ import absolute_import

import abc
import collections
import os
from distutils.dir_util import copy_tree

import aenum
import six
import yaml
from galaxy.tool_util.cwl.parser import workflow_proxy
from galaxy.tool_util.loader_directory import (
    is_a_yaml_with_class,
    looks_like_a_cwl_artifact,
    looks_like_a_data_manager_xml,
    looks_like_a_tool_cwl,
    looks_like_a_tool_xml,
)
from galaxy.tool_util.parser import get_tool_source

from planemo.exit_codes import EXIT_CODE_UNKNOWN_FILE_TYPE, ExitCodeException
from planemo.galaxy.workflows import describe_outputs
from planemo.io import error
from planemo.test import check_output

TEST_SUFFIXES = [
    "-tests", "_tests", "-test", "_test"
]
TEST_EXTENSIONS = [".yml", ".yaml", ".json"]

TEST_FILE_NOT_LIST_MESSAGE = ("Invalid test definition file [%s] - file must "
                              "contain a list of tests")
TEST_FIELD_MISSING_MESSAGE = ("Invalid test definition [test #%d in %s] -"
                              "defintion must field [%s].")


RunnableType = aenum.Enum(
    "RunnableType", 'galaxy_tool galaxy_datamanager galaxy_workflow cwl_tool cwl_workflow directory'
)


@property
def _runnable_type_has_tools(runnable_type):
    return runnable_type.name in ["galaxy_tool", "galaxy_datamanager", "cwl_tool", "directory"]


@property
def _runnable_type_is_single_artifact(runnable_type):
    return runnable_type.name not in ["directory"]


@property
def _runnable_type_test_data_in_parent_dir(runnable_type):
    return runnable_type.name in ["galaxy_datamanager"]


RunnableType.has_tools = _runnable_type_has_tools
RunnableType.is_single_artifact = _runnable_type_is_single_artifact
RunnableType.test_data_in_parent_dir = _runnable_type_test_data_in_parent_dir

_Runnable = collections.namedtuple("Runnable", ["path", "type"])


class Runnable(_Runnable):

    @property
    def test_data_search_path(self):
        if self.type.name in ['galaxy_datamanager']:
            return os.path.join(os.path.dirname(self.path), os.path.pardir)
        else:
            return self.path

    @property
    def tool_data_search_path(self):
        return self.test_data_search_path

    @property
    def data_manager_conf_path(self):
        if self.type.name in ['galaxy_datamanager']:
            return os.path.join(os.path.dirname(self.path), os.pardir, 'data_manager_conf.xml')

    @property
    def has_tools(self):
        return _runnable_delegate_attribute('has_tools')

    @property
    def is_single_artifact(self):
        return _runnable_delegate_attribute('is_single_artifact')


def _runnable_delegate_attribute(attribute):

    @property
    def getter(runnable):
        return getattr(runnable.type, attribute)

    return getter


def _copy_runnable_tree(path, runnable_type, temp_path):
    dir_to_copy = None
    if runnable_type in {RunnableType.galaxy_tool, RunnableType.cwl_tool}:
        dir_to_copy = os.path.dirname(path)
        path = os.path.join(temp_path, os.path.basename(path))
    elif runnable_type == RunnableType.directory:
        dir_to_copy = path
        path = temp_path
    elif runnable_type == RunnableType.galaxy_datamanager:
        dir_to_copy = os.path.join(os.path.dirname(path), os.pardir)
        path_to_data_manager_tool = os.path.relpath(path, dir_to_copy)
        path = os.path.join(temp_path, path_to_data_manager_tool)
    if dir_to_copy:
        copy_tree(dir_to_copy, temp_path)
    return path


def for_path(path, temp_path=None):
    """Produce a class:`Runnable` for supplied path."""
    runnable_type = None
    if os.path.isdir(path):
        runnable_type = RunnableType.directory
    elif looks_like_a_tool_cwl(path):
        runnable_type = RunnableType.cwl_tool
    elif looks_like_a_data_manager_xml(path):
        runnable_type = RunnableType.galaxy_datamanager
    elif looks_like_a_tool_xml(path):
        runnable_type = RunnableType.galaxy_tool
    elif is_a_yaml_with_class(path, ["GalaxyWorkflow"]):
        runnable_type = RunnableType.galaxy_workflow
    elif path.endswith(".ga"):
        runnable_type = RunnableType.galaxy_workflow
    elif looks_like_a_cwl_artifact(path, ["Workflow"]):
        runnable_type = RunnableType.cwl_workflow

    if runnable_type is None:
        error("Unable to determine runnable type for path [%s]" % path)
        raise ExitCodeException(EXIT_CODE_UNKNOWN_FILE_TYPE)

    if temp_path:
        path = _copy_runnable_tree(path, runnable_type, temp_path)

    return Runnable(path, runnable_type)


def for_paths(paths, temp_path=None):
    """Return a specialized list of Runnable objects for paths."""
    return [for_path(path, temp_path=temp_path) for path in paths]


def cases(runnable):
    """Build a `list` of :class:`TestCase` objects for specified runnable."""
    cases = []

    tests_path = _tests_path(runnable)
    if tests_path is None:
        if runnable.type == RunnableType.galaxy_tool:
            tool_source = get_tool_source(runnable.path)
            test_dicts = tool_source.parse_tests_to_dict()
            tool_id = tool_source.parse_id()
            tool_version = tool_source.parse_version()
            for i, test_dict in enumerate(test_dicts.get("tests", [])):
                cases.append(ExternalGalaxyToolTestCase(runnable, tool_id, tool_version, i, test_dict))
        return cases

    tests_directory = os.path.abspath(os.path.dirname(tests_path))

    def normalize_to_tests_path(path):
        if not os.path.isabs(path):
            absolute_path = os.path.join(tests_directory, path)
        else:
            absolute_path = path
        return os.path.normpath(absolute_path)

    with open(tests_path, "r") as f:
        tests_def = yaml.safe_load(f)

    if not isinstance(tests_def, list):
        message = TEST_FILE_NOT_LIST_MESSAGE % tests_path
        raise Exception(message)

    for i, test_def in enumerate(tests_def):
        if "job" not in test_def:
            message = TEST_FIELD_MISSING_MESSAGE % (
                i + 1, tests_path, "job"
            )
            raise Exception(message)
        job_def = test_def["job"]
        if isinstance(job_def, dict):
            job_path = None
            job = job_def
        else:
            job_path = normalize_to_tests_path(job_def)
            job = None

        doc = test_def.get("doc", None)
        output_expectations = test_def.get("outputs", {})
        case = TestCase(
            runnable=runnable,
            tests_directory=tests_directory,
            output_expectations=output_expectations,
            index=i,
            job_path=job_path,
            job=job,
            doc=doc,
        )
        cases.append(case)

    return cases


class AbstractTestCase(object):
    """Description of a test case for a runnable.
    """

    __metaclass__ = abc.ABCMeta

    def structured_test_data(self, run_response):
        """Result of executing this test case - a "structured_data" dict.

        :rtype: dict
        :return:
                 For example::

                   {
                       "id": "",
                       "has_data": true,
                       "data": {
                           "status": "success", // error, skip,
                           "job": {
                               "command_line": "cat moo",
                               "stdout": "",
                               "stderr": ""
                           },
                           "output_problems": [],
                           "execution_problem": "",
                           "inputs" = {},
                           "problem_log": ""
                       }
                   }
        """


class TestCase(AbstractTestCase):
    """Describe an abstract test case for a specified runnable."""

    def __init__(self, runnable, tests_directory, output_expectations, job_path, job, index, doc):
        """Construct TestCase object from required attributes."""
        self.runnable = runnable
        self.job_path = job_path
        self.job = job
        self.output_expectations = output_expectations
        self.tests_directory = tests_directory
        self.index = index
        self.doc = doc

    def __repr__(self):
        return 'TestCase (%s) for runnable (%s) with job (%s) and expected outputs (%s) in directory (%s) with id (%s)' % \
            (self.doc, self.runnable, self.job, self.output_expectations, self.tests_directory, self.index)

    def structured_test_data(self, run_response):
        """Check a test case against outputs dictionary.
        """
        output_problems = []
        if run_response.was_successful:
            outputs_dict = run_response.outputs_dict
            execution_problem = None
            for output_id, output_test in self.output_expectations.items():
                if output_id not in outputs_dict:
                    message = "Expected output [%s] not found in results." % output_id
                    output_problems.append(message)
                    continue

                output_value = outputs_dict[output_id]
                output_problems.extend(
                    self._check_output(output_id, output_value, output_test)
                )
            if output_problems:
                status = "failure"
            else:
                status = "success"
        else:
            execution_problem = run_response.error_message
            status = "error"
        data_dict = dict(
            status=status
        )
        if status != "success":
            data_dict["output_problems"] = output_problems
            data_dict["execution_problem"] = execution_problem
        log = run_response.log
        if log is not None:
            data_dict["problem_log"] = log
        job_info = run_response.job_info
        if job_info is not None:
            data_dict["job"] = job_info
        data_dict["inputs"] = self._job
        return dict(
            id=("%s_%s" % (self._test_id, self.index)),
            has_data=True,
            data=data_dict,
        )

    @property
    def _job(self):
        if self.job_path is not None:
            with open(self.job_path, "r") as f:
                return f.read()
        else:
            return self.job

    def _check_output(self, output_id, output_value, output_test):
        output_problems = []
        if not isinstance(output_test, dict):
            if output_test != output_value:
                template = "Output [%s] value [%s] does not match expected value [%s]."
                message = template % (output_id, output_value, output_test)
                output_problems.append(message)
        else:
            if not isinstance(output_value, dict):
                output_problems.append("Expected file properties for output [%s]" % output_id)
                return
            if "path" not in output_value and "location" in output_value:
                assert output_value["location"].startswith("file://")
                output_value["path"] = output_value["location"][len("file://"):]
            if "path" not in output_value:
                output_problems.append("No path specified for expected output file [%s]" % output_id)
                return

            output_problems.extend(
                check_output(
                    self.runnable,
                    output_value,
                    output_test,
                    # TODO: needs kwds in here...
                )
            )

        return output_problems

    @property
    def _test_id(self):
        if self.runnable.type in [
            RunnableType.cwl_tool,
            RunnableType.galaxy_tool,
        ]:
            return get_tool_source(self.runnable.path).parse_id()
        else:
            return os.path.basename(self.runnable.path)


class ExternalGalaxyToolTestCase(AbstractTestCase):
    """Special class of AbstractCase that doesn't use job_path but uses test data from a Galaxy server.
    """

    def __init__(self, runnable, tool_id, tool_version, test_index, test_dict):
        """Construct TestCase object from required attributes."""
        self.runnable = runnable
        self.tool_id = tool_id
        self.tool_version = tool_version
        self.test_index = test_index
        self.test_dict = test_dict

    def structured_test_data(self, run_response):
        """Just return the structured_test_data generated from galaxy-tool-util for this test variant.
        """
        return run_response


def _tests_path(runnable):
    if not runnable.is_single_artifact:
        raise NotImplementedError("Tests for directories are not yet implemented.")

    runnable_path = runnable.path
    base, _ = os.path.splitext(runnable_path)

    for test_suffix in TEST_SUFFIXES:
        for test_extension in TEST_EXTENSIONS:
            test_path = base + test_suffix + test_extension
            if os.path.exists(test_path):
                return test_path

    return None


def get_outputs(runnable):
    """Return a list of :class:`RunnableOutput` objects for this runnable."""
    if not runnable.is_single_artifact:
        raise NotImplementedError("Cannot generate outputs for a directory.")
    if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]:
        tool_source = get_tool_source(runnable.path)
        # TODO: do something with collections at some point
        output_datasets, _ = tool_source.parse_outputs(None)
        outputs = [ToolOutput(o) for o in output_datasets.values()]
        return outputs
    elif runnable.type == RunnableType.galaxy_workflow:
        workflow_outputs = describe_outputs(runnable.path)
        return [GalaxyWorkflowOutput(o) for o in workflow_outputs]
    elif runnable.type == RunnableType.cwl_workflow:
        workflow = workflow_proxy(runnable.path, strict_cwl_validation=False)
        return [CwlWorkflowOutput(label) for label in workflow.output_labels]
    else:
        raise NotImplementedError("Getting outputs for this artifact type is not yet supported.")


class RunnableOutput(object):
    """Description of a single output of an execution of a Runnable."""

    __metaclass__ = abc.ABCMeta

    @abc.abstractproperty
    def get_id(self):
        """An identifier that describes this output."""


class ToolOutput(RunnableOutput):

    def __init__(self, tool_output):
        self._tool_output = tool_output

    def get_id(self):
        return self._tool_output.name


class GalaxyWorkflowOutput(RunnableOutput):

    def __init__(self, workflow_output):
        self._workflow_output = workflow_output

    def get_id(self):
        return self._workflow_output.label

    @property
    def workflow_output(self):
        return self._workflow_output


class CwlWorkflowOutput(RunnableOutput):

    def __init__(self, label):
        self._label = label

    def get_id(self):
        return self._label


class RunResponse(object):
    """Description of an attempt for an engine to execute a Runnable."""

    __metaclass__ = abc.ABCMeta

    @abc.abstractproperty
    def was_successful(self):
        """Indicate whether an error was encountered while executing this runnble.

        If successful, response should conform to the SuccessfulRunResponse interface,
        otherwise it will conform to the ErrorRunResponse interface.
        """

    @abc.abstractproperty
    def job_info(self):
        """If job information is available, return as dictionary."""

    @abc.abstractproperty
    def log(self):
        """If engine related log is available, return as text data."""


class SuccessfulRunResponse(RunResponse):
    """Description of the results of an engine executing a Runnable."""

    __metaclass__ = abc.ABCMeta

    def was_successful(self):
        """Return `True` to indicate this run was successful."""
        return True

    @abc.abstractproperty
    def outputs_dict(self):
        """Return a dict of output descriptions."""


@six.python_2_unicode_compatible
class ErrorRunResponse(RunResponse):
    """Description of an error while attempting to execute a Runnable."""

    def __init__(self, error_message, job_info=None, log=None):
        """Create an ErrorRunResponse with specified error message."""
        self._error_message = error_message
        self._job_info = job_info
        self._log = log

    @property
    def error_message(self):
        """Error message describing the problem with execution of the runnable."""
        return self._error_message

    @property
    def was_successful(self):
        """Return `False` to indicate this run was successful."""
        return False

    @property
    def job_info(self):
        """Return potentially null stored `job_info` dict."""
        return self._job_info

    @property
    def log(self):
        """Return potentially null stored `log` text."""
        return self._log

    def __str__(self):
        """Print a helpful error description of run."""
        message = "Run failed with message [%s]" % self.error_message
        log = self.log
        if log:
            message += " and log [%s]" % log
        return message


__all__ = (
    "cases",
    "ErrorRunResponse",
    "for_path",
    "for_paths",
    "get_outputs",
    "Runnable",
    "RunnableType",
    "RunResponse",
    "RunnableOutput",
    "SuccessfulRunResponse",
    "TestCase",
)