comparison env/lib/python3.9/site-packages/planemo/engine/interface.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Module contianing the :class:`Engine` abstraction."""
2
3 import abc
4 import json
5 import os
6 import tempfile
7 from typing import List
8
9 from six import add_metaclass
10
11 from planemo.exit_codes import EXIT_CODE_UNSUPPORTED_FILE_TYPE
12 from planemo.io import error
13 from planemo.runnable import (
14 cases,
15 RunnableType,
16 )
17 from planemo.test.results import StructuredData
18
19
20 @add_metaclass(abc.ABCMeta)
21 class Engine(object):
22 """Abstract description of an external process for running tools or workflows.
23 """
24
25 @abc.abstractmethod
26 def run(self, path, job_path):
27 """Run a job using a compatible artifact (workflow or tool)."""
28
29 @abc.abstractmethod
30 def cleanup(self):
31 """Release any resources used to run/test with this engine."""
32
33 @abc.abstractmethod
34 def test(self, runnables):
35 """Test runnable artifacts (workflow or tool)."""
36
37
38 class BaseEngine(Engine):
39 """Base class providing context and keywords for Engine implementations."""
40
41 handled_runnable_types = [] # type: List[RunnableType]
42
43 def __init__(self, ctx, **kwds):
44 """Store context and kwds."""
45 self._ctx = ctx
46 self._kwds = kwds
47
48 def can_run(self, runnable):
49 """Use subclass's ``handled_runnable_types`` variable to infer ``can_run``."""
50 return runnable.type in self.handled_runnable_types
51
52 def cleanup(self):
53 """Default no-op cleanup method."""
54
55 def run(self, runnable, job_path):
56 """Run a job using a compatible artifact (workflow or tool)."""
57 self._check_can_run(runnable)
58 run_response = self._run(runnable, job_path)
59 return run_response
60
61 @abc.abstractmethod
62 def _run(self, runnable, job_path):
63 """Run a job using a compatible artifact (workflow or tool) wrapped as a runnable."""
64
65 def _check_can_run(self, runnable):
66 if not self.can_run(runnable):
67 template = "Engine type [%s] cannot execute [%s]s"
68 message = template % (self.__class__, runnable.type)
69 error(message)
70 self._ctx.exit(EXIT_CODE_UNSUPPORTED_FILE_TYPE)
71
72 def _check_can_run_all(self, runnables):
73 for runnable in runnables:
74 self._check_can_run(runnable)
75
76 def test(self, runnables):
77 """Test runnable artifacts (workflow or tool)."""
78 self._check_can_run_all(runnables)
79 test_cases = [t for tl in map(cases, runnables) for t in tl]
80 test_results = self._collect_test_results(test_cases)
81 tests = []
82 for (test_case, run_response) in test_results:
83 test_case_data = test_case.structured_test_data(run_response)
84 tests.append(test_case_data)
85 test_data = {
86 'version': '0.1',
87 'tests': tests,
88 }
89 structured_results = StructuredData(data=test_data)
90 structured_results.calculate_summary_data()
91 return structured_results
92
93 def _collect_test_results(self, test_cases):
94 test_results = []
95 for test_case in test_cases:
96 self._ctx.vlog(
97 "Running tests %s" % test_case
98 )
99 run_response = self._run_test_case(test_case)
100 self._ctx.vlog(
101 "Test case [%s] resulted in run response [%s]",
102 test_case,
103 run_response,
104 )
105 test_results.append((test_case, run_response))
106 return test_results
107
108 def _run_test_case(self, test_case):
109 runnable = test_case.runnable
110 job_path = test_case.job_path
111 tmp_path = None
112 if job_path is None:
113 job = test_case.job
114 f = tempfile.NamedTemporaryFile(
115 dir=test_case.tests_directory,
116 suffix=".json",
117 prefix="plnmotmptestjob",
118 delete=False,
119 mode="w+",
120 )
121 tmp_path = f.name
122 job_path = tmp_path
123 json.dump(job, f)
124 f.close()
125 try:
126 run_response = self._run(runnable, job_path)
127 finally:
128 if tmp_path:
129 os.remove(tmp_path)
130 return run_response
131
132
133 __all__ = (
134 "Engine",
135 "BaseEngine",
136 )