Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/planemo/engine/interface.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Module contianing the :class:`Engine` abstraction.""" | |
2 | |
3 import abc | |
4 import json | |
5 import os | |
6 import tempfile | |
7 from typing import List | |
8 | |
9 from six import add_metaclass | |
10 | |
11 from planemo.exit_codes import EXIT_CODE_UNSUPPORTED_FILE_TYPE | |
12 from planemo.io import error | |
13 from planemo.runnable import ( | |
14 cases, | |
15 RunnableType, | |
16 ) | |
17 from planemo.test.results import StructuredData | |
18 | |
19 | |
20 @add_metaclass(abc.ABCMeta) | |
21 class Engine(object): | |
22 """Abstract description of an external process for running tools or workflows. | |
23 """ | |
24 | |
25 @abc.abstractmethod | |
26 def run(self, path, job_path): | |
27 """Run a job using a compatible artifact (workflow or tool).""" | |
28 | |
29 @abc.abstractmethod | |
30 def cleanup(self): | |
31 """Release any resources used to run/test with this engine.""" | |
32 | |
33 @abc.abstractmethod | |
34 def test(self, runnables): | |
35 """Test runnable artifacts (workflow or tool).""" | |
36 | |
37 | |
38 class BaseEngine(Engine): | |
39 """Base class providing context and keywords for Engine implementations.""" | |
40 | |
41 handled_runnable_types = [] # type: List[RunnableType] | |
42 | |
43 def __init__(self, ctx, **kwds): | |
44 """Store context and kwds.""" | |
45 self._ctx = ctx | |
46 self._kwds = kwds | |
47 | |
48 def can_run(self, runnable): | |
49 """Use subclass's ``handled_runnable_types`` variable to infer ``can_run``.""" | |
50 return runnable.type in self.handled_runnable_types | |
51 | |
52 def cleanup(self): | |
53 """Default no-op cleanup method.""" | |
54 | |
55 def run(self, runnable, job_path): | |
56 """Run a job using a compatible artifact (workflow or tool).""" | |
57 self._check_can_run(runnable) | |
58 run_response = self._run(runnable, job_path) | |
59 return run_response | |
60 | |
61 @abc.abstractmethod | |
62 def _run(self, runnable, job_path): | |
63 """Run a job using a compatible artifact (workflow or tool) wrapped as a runnable.""" | |
64 | |
65 def _check_can_run(self, runnable): | |
66 if not self.can_run(runnable): | |
67 template = "Engine type [%s] cannot execute [%s]s" | |
68 message = template % (self.__class__, runnable.type) | |
69 error(message) | |
70 self._ctx.exit(EXIT_CODE_UNSUPPORTED_FILE_TYPE) | |
71 | |
72 def _check_can_run_all(self, runnables): | |
73 for runnable in runnables: | |
74 self._check_can_run(runnable) | |
75 | |
76 def test(self, runnables): | |
77 """Test runnable artifacts (workflow or tool).""" | |
78 self._check_can_run_all(runnables) | |
79 test_cases = [t for tl in map(cases, runnables) for t in tl] | |
80 test_results = self._collect_test_results(test_cases) | |
81 tests = [] | |
82 for (test_case, run_response) in test_results: | |
83 test_case_data = test_case.structured_test_data(run_response) | |
84 tests.append(test_case_data) | |
85 test_data = { | |
86 'version': '0.1', | |
87 'tests': tests, | |
88 } | |
89 structured_results = StructuredData(data=test_data) | |
90 structured_results.calculate_summary_data() | |
91 return structured_results | |
92 | |
93 def _collect_test_results(self, test_cases): | |
94 test_results = [] | |
95 for test_case in test_cases: | |
96 self._ctx.vlog( | |
97 "Running tests %s" % test_case | |
98 ) | |
99 run_response = self._run_test_case(test_case) | |
100 self._ctx.vlog( | |
101 "Test case [%s] resulted in run response [%s]", | |
102 test_case, | |
103 run_response, | |
104 ) | |
105 test_results.append((test_case, run_response)) | |
106 return test_results | |
107 | |
108 def _run_test_case(self, test_case): | |
109 runnable = test_case.runnable | |
110 job_path = test_case.job_path | |
111 tmp_path = None | |
112 if job_path is None: | |
113 job = test_case.job | |
114 f = tempfile.NamedTemporaryFile( | |
115 dir=test_case.tests_directory, | |
116 suffix=".json", | |
117 prefix="plnmotmptestjob", | |
118 delete=False, | |
119 mode="w+", | |
120 ) | |
121 tmp_path = f.name | |
122 job_path = tmp_path | |
123 json.dump(job, f) | |
124 f.close() | |
125 try: | |
126 run_response = self._run(runnable, job_path) | |
127 finally: | |
128 if tmp_path: | |
129 os.remove(tmp_path) | |
130 return run_response | |
131 | |
132 | |
133 __all__ = ( | |
134 "Engine", | |
135 "BaseEngine", | |
136 ) |