Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/planemo/engine/interface.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
comparison
equal
deleted
inserted
replaced
1:75ca89e9b81c | 2:6af9afd405e9 |
---|---|
1 """Module contianing the :class:`Engine` abstraction.""" | |
2 | |
3 import abc | |
4 import json | |
5 import os | |
6 import tempfile | |
7 | |
8 from planemo.exit_codes import EXIT_CODE_UNSUPPORTED_FILE_TYPE | |
9 from planemo.io import error | |
10 from planemo.runnable import ( | |
11 cases, | |
12 for_path, | |
13 ) | |
14 from planemo.test.results import StructuredData | |
15 | |
16 | |
17 class Engine(object): | |
18 """Abstract description of an external process for running tools or workflows. | |
19 """ | |
20 | |
21 __metaclass__ = abc.ABCMeta | |
22 | |
23 @abc.abstractmethod | |
24 def run(self, path, job_path): | |
25 """Run a job using a compatible artifact (workflow or tool).""" | |
26 | |
27 @abc.abstractmethod | |
28 def cleanup(self): | |
29 """Release any resources used to run/test with this engine.""" | |
30 | |
31 @abc.abstractmethod | |
32 def test(self, runnables): | |
33 """Test runnable artifacts (workflow or tool).""" | |
34 | |
35 | |
36 class BaseEngine(Engine): | |
37 """Base class providing context and keywords for Engine implementations.""" | |
38 | |
39 handled_runnable_types = [] | |
40 | |
41 def __init__(self, ctx, **kwds): | |
42 """Store context and kwds.""" | |
43 self._ctx = ctx | |
44 self._kwds = kwds | |
45 | |
46 def can_run(self, runnable): | |
47 """Use subclass's ``handled_runnable_types`` variable to infer ``can_run``.""" | |
48 return runnable.type in self.handled_runnable_types | |
49 | |
50 def cleanup(self): | |
51 """Default no-op cleanup method.""" | |
52 | |
53 def run(self, path, job_path): | |
54 """Run a job using a compatible artifact (workflow or tool).""" | |
55 runnable = for_path(path) | |
56 self._check_can_run(runnable) | |
57 run_response = self._run(runnable, job_path) | |
58 return run_response | |
59 | |
60 @abc.abstractmethod | |
61 def _run(self, runnable, job_path): | |
62 """Run a job using a compatible artifact (workflow or tool) wrapped as a runnable.""" | |
63 | |
64 def _check_can_run(self, runnable): | |
65 if not self.can_run(runnable): | |
66 template = "Engine type [%s] cannot execute [%s]s" | |
67 message = template % (self.__class__, runnable.type) | |
68 error(message) | |
69 self._ctx.exit(EXIT_CODE_UNSUPPORTED_FILE_TYPE) | |
70 | |
71 def _check_can_run_all(self, runnables): | |
72 for runnable in runnables: | |
73 self._check_can_run(runnable) | |
74 | |
75 def test(self, runnables): | |
76 """Test runnable artifacts (workflow or tool).""" | |
77 self._check_can_run_all(runnables) | |
78 test_cases = [t for tl in map(cases, runnables) for t in tl] | |
79 test_results = self._collect_test_results(test_cases) | |
80 tests = [] | |
81 for (test_case, run_response) in test_results: | |
82 test_case_data = test_case.structured_test_data(run_response) | |
83 tests.append(test_case_data) | |
84 test_data = { | |
85 'version': '0.1', | |
86 'tests': tests, | |
87 } | |
88 structured_results = StructuredData(data=test_data) | |
89 structured_results.calculate_summary_data() | |
90 return structured_results | |
91 | |
92 def _collect_test_results(self, test_cases): | |
93 test_results = [] | |
94 for test_case in test_cases: | |
95 self._ctx.vlog( | |
96 "Running tests %s" % test_case | |
97 ) | |
98 run_response = self._run_test_case(test_case) | |
99 self._ctx.vlog( | |
100 "Test case [%s] resulted in run response [%s]", | |
101 test_case, | |
102 run_response, | |
103 ) | |
104 test_results.append((test_case, run_response)) | |
105 return test_results | |
106 | |
107 def _run_test_case(self, test_case): | |
108 runnable = test_case.runnable | |
109 job_path = test_case.job_path | |
110 tmp_path = None | |
111 if job_path is None: | |
112 job = test_case.job | |
113 f = tempfile.NamedTemporaryFile( | |
114 dir=test_case.tests_directory, | |
115 suffix=".json", | |
116 prefix="plnmotmptestjob", | |
117 delete=False, | |
118 mode="w+", | |
119 ) | |
120 tmp_path = f.name | |
121 job_path = tmp_path | |
122 json.dump(job, f) | |
123 f.close() | |
124 try: | |
125 run_response = self._run(runnable, job_path) | |
126 finally: | |
127 if tmp_path: | |
128 os.remove(tmp_path) | |
129 return run_response | |
130 | |
131 def _process_test_results(self, test_results): | |
132 for (test_case, run_response) in test_results: | |
133 pass | |
134 | |
135 | |
136 __all__ = ( | |
137 "Engine", | |
138 "BaseEngine", | |
139 ) |