Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/planemo/engine/interface.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
"""Module contianing the :class:`Engine` abstraction.""" import abc import json import os import tempfile from typing import List from six import add_metaclass from planemo.exit_codes import EXIT_CODE_UNSUPPORTED_FILE_TYPE from planemo.io import error from planemo.runnable import ( cases, RunnableType, ) from planemo.test.results import StructuredData @add_metaclass(abc.ABCMeta) class Engine(object): """Abstract description of an external process for running tools or workflows. """ @abc.abstractmethod def run(self, path, job_path): """Run a job using a compatible artifact (workflow or tool).""" @abc.abstractmethod def cleanup(self): """Release any resources used to run/test with this engine.""" @abc.abstractmethod def test(self, runnables): """Test runnable artifacts (workflow or tool).""" class BaseEngine(Engine): """Base class providing context and keywords for Engine implementations.""" handled_runnable_types = [] # type: List[RunnableType] def __init__(self, ctx, **kwds): """Store context and kwds.""" self._ctx = ctx self._kwds = kwds def can_run(self, runnable): """Use subclass's ``handled_runnable_types`` variable to infer ``can_run``.""" return runnable.type in self.handled_runnable_types def cleanup(self): """Default no-op cleanup method.""" def run(self, runnable, job_path): """Run a job using a compatible artifact (workflow or tool).""" self._check_can_run(runnable) run_response = self._run(runnable, job_path) return run_response @abc.abstractmethod def _run(self, runnable, job_path): """Run a job using a compatible artifact (workflow or tool) wrapped as a runnable.""" def _check_can_run(self, runnable): if not self.can_run(runnable): template = "Engine type [%s] cannot execute [%s]s" message = template % (self.__class__, runnable.type) error(message) self._ctx.exit(EXIT_CODE_UNSUPPORTED_FILE_TYPE) def _check_can_run_all(self, runnables): for runnable in runnables: self._check_can_run(runnable) def test(self, runnables): """Test runnable artifacts (workflow or tool).""" self._check_can_run_all(runnables) test_cases = [t for tl in map(cases, runnables) for t in tl] test_results = self._collect_test_results(test_cases) tests = [] for (test_case, run_response) in test_results: test_case_data = test_case.structured_test_data(run_response) tests.append(test_case_data) test_data = { 'version': '0.1', 'tests': tests, } structured_results = StructuredData(data=test_data) structured_results.calculate_summary_data() return structured_results def _collect_test_results(self, test_cases): test_results = [] for test_case in test_cases: self._ctx.vlog( "Running tests %s" % test_case ) run_response = self._run_test_case(test_case) self._ctx.vlog( "Test case [%s] resulted in run response [%s]", test_case, run_response, ) test_results.append((test_case, run_response)) return test_results def _run_test_case(self, test_case): runnable = test_case.runnable job_path = test_case.job_path tmp_path = None if job_path is None: job = test_case.job f = tempfile.NamedTemporaryFile( dir=test_case.tests_directory, suffix=".json", prefix="plnmotmptestjob", delete=False, mode="w+", ) tmp_path = f.name job_path = tmp_path json.dump(job, f) f.close() try: run_response = self._run(runnable, job_path) finally: if tmp_path: os.remove(tmp_path) return run_response __all__ = ( "Engine", "BaseEngine", )