Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/planemo/engine/interface.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/planemo/engine/interface.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,136 @@ +"""Module contianing the :class:`Engine` abstraction.""" + +import abc +import json +import os +import tempfile +from typing import List + +from six import add_metaclass + +from planemo.exit_codes import EXIT_CODE_UNSUPPORTED_FILE_TYPE +from planemo.io import error +from planemo.runnable import ( + cases, + RunnableType, +) +from planemo.test.results import StructuredData + + +@add_metaclass(abc.ABCMeta) +class Engine(object): + """Abstract description of an external process for running tools or workflows. + """ + + @abc.abstractmethod + def run(self, path, job_path): + """Run a job using a compatible artifact (workflow or tool).""" + + @abc.abstractmethod + def cleanup(self): + """Release any resources used to run/test with this engine.""" + + @abc.abstractmethod + def test(self, runnables): + """Test runnable artifacts (workflow or tool).""" + + +class BaseEngine(Engine): + """Base class providing context and keywords for Engine implementations.""" + + handled_runnable_types = [] # type: List[RunnableType] + + def __init__(self, ctx, **kwds): + """Store context and kwds.""" + self._ctx = ctx + self._kwds = kwds + + def can_run(self, runnable): + """Use subclass's ``handled_runnable_types`` variable to infer ``can_run``.""" + return runnable.type in self.handled_runnable_types + + def cleanup(self): + """Default no-op cleanup method.""" + + def run(self, runnable, job_path): + """Run a job using a compatible artifact (workflow or tool).""" + self._check_can_run(runnable) + run_response = self._run(runnable, job_path) + return run_response + + @abc.abstractmethod + def _run(self, runnable, job_path): + """Run a job using a compatible artifact (workflow or tool) wrapped as a runnable.""" + + def _check_can_run(self, runnable): + if not self.can_run(runnable): + template = "Engine type [%s] cannot execute [%s]s" + message = template % (self.__class__, runnable.type) + error(message) + self._ctx.exit(EXIT_CODE_UNSUPPORTED_FILE_TYPE) + + def _check_can_run_all(self, runnables): + for runnable in runnables: + self._check_can_run(runnable) + + def test(self, runnables): + """Test runnable artifacts (workflow or tool).""" + self._check_can_run_all(runnables) + test_cases = [t for tl in map(cases, runnables) for t in tl] + test_results = self._collect_test_results(test_cases) + tests = [] + for (test_case, run_response) in test_results: + test_case_data = test_case.structured_test_data(run_response) + tests.append(test_case_data) + test_data = { + 'version': '0.1', + 'tests': tests, + } + structured_results = StructuredData(data=test_data) + structured_results.calculate_summary_data() + return structured_results + + def _collect_test_results(self, test_cases): + test_results = [] + for test_case in test_cases: + self._ctx.vlog( + "Running tests %s" % test_case + ) + run_response = self._run_test_case(test_case) + self._ctx.vlog( + "Test case [%s] resulted in run response [%s]", + test_case, + run_response, + ) + test_results.append((test_case, run_response)) + return test_results + + def _run_test_case(self, test_case): + runnable = test_case.runnable + job_path = test_case.job_path + tmp_path = None + if job_path is None: + job = test_case.job + f = tempfile.NamedTemporaryFile( + dir=test_case.tests_directory, + suffix=".json", + prefix="plnmotmptestjob", + delete=False, + mode="w+", + ) + tmp_path = f.name + job_path = tmp_path + json.dump(job, f) + f.close() + try: + run_response = self._run(runnable, job_path) + finally: + if tmp_path: + os.remove(tmp_path) + return run_response + + +__all__ = ( + "Engine", + "BaseEngine", +)