Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/cwltool/executors.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 # -*- coding: utf-8 -*- | |
2 """ Single and multi-threaded executors.""" | |
3 import datetime | |
4 import os | |
5 import tempfile | |
6 import threading | |
7 import logging | |
8 from threading import Lock | |
9 from abc import ABCMeta, abstractmethod | |
10 from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union | |
11 | |
12 import psutil | |
13 from six import string_types, with_metaclass | |
14 from typing_extensions import Text # pylint: disable=unused-import | |
15 from future.utils import raise_from | |
16 from schema_salad.validate import ValidationException | |
17 | |
18 from .builder import Builder # pylint: disable=unused-import | |
19 from .context import (RuntimeContext, # pylint: disable=unused-import | |
20 getdefault) | |
21 from .errors import WorkflowException | |
22 from .job import JobBase # pylint: disable=unused-import | |
23 from .loghandler import _logger | |
24 from .mutation import MutationManager | |
25 from .process import Process # pylint: disable=unused-import | |
26 from .process import cleanIntermediate, relocateOutputs | |
27 from .provenance import ProvenanceProfile | |
28 from .utils import DEFAULT_TMP_PREFIX | |
29 from .workflow import Workflow, WorkflowJob, WorkflowJobStep | |
30 from .command_line_tool import CallbackJob | |
31 | |
32 TMPDIR_LOCK = Lock() | |
33 | |
34 | |
35 class JobExecutor(with_metaclass(ABCMeta, object)): | |
36 """Abstract base job executor.""" | |
37 | |
38 def __init__(self): | |
39 # type: (...) -> None | |
40 """Initialize.""" | |
41 self.final_output = [] # type: List[Union[Dict[Text, Any], List[Dict[Text, Any]]]] | |
42 self.final_status = [] # type: List[Text] | |
43 self.output_dirs = set() # type: Set[Text] | |
44 | |
45 def __call__(self, *args, **kwargs): # type: (*Any, **Any) -> Any | |
46 return self.execute(*args, **kwargs) | |
47 | |
48 def output_callback(self, out, process_status): # type: (Dict[Text, Any], Text) -> None | |
49 """Collect the final status and outputs.""" | |
50 self.final_status.append(process_status) | |
51 self.final_output.append(out) | |
52 | |
53 @abstractmethod | |
54 def run_jobs(self, | |
55 process, # type: Process | |
56 job_order_object, # type: Dict[Text, Any] | |
57 logger, # type: logging.Logger | |
58 runtime_context # type: RuntimeContext | |
59 ): # type: (...) -> None | |
60 """Execute the jobs for the given Process.""" | |
61 | |
62 def execute(self, | |
63 process, # type: Process | |
64 job_order_object, # type: Dict[Text, Any] | |
65 runtime_context, # type: RuntimeContext | |
66 logger=_logger, # type: logging.Logger | |
67 ): # type: (...) -> Tuple[Optional[Union[Dict[Text, Any], List[Dict[Text, Any]]]], Text] | |
68 """Execute the process.""" | |
69 if not runtime_context.basedir: | |
70 raise WorkflowException("Must provide 'basedir' in runtimeContext") | |
71 | |
72 finaloutdir = None # Type: Optional[Text] | |
73 original_outdir = runtime_context.outdir | |
74 if isinstance(original_outdir, string_types): | |
75 finaloutdir = os.path.abspath(original_outdir) | |
76 runtime_context = runtime_context.copy() | |
77 outdir = tempfile.mkdtemp( | |
78 prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX)) | |
79 self.output_dirs.add(outdir) | |
80 runtime_context.outdir = outdir | |
81 runtime_context.mutation_manager = MutationManager() | |
82 runtime_context.toplevel = True | |
83 runtime_context.workflow_eval_lock = threading.Condition(threading.RLock()) | |
84 | |
85 job_reqs = None | |
86 if "https://w3id.org/cwl/cwl#requirements" in job_order_object: | |
87 if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0': | |
88 raise WorkflowException( | |
89 "`cwl:requirements` in the input object is not part of CWL " | |
90 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
91 "can set the cwlVersion to v1.1") | |
92 job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"] | |
93 elif ("cwl:defaults" in process.metadata | |
94 and "https://w3id.org/cwl/cwl#requirements" | |
95 in process.metadata["cwl:defaults"]): | |
96 if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0': | |
97 raise WorkflowException( | |
98 "`cwl:requirements` in the input object is not part of CWL " | |
99 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
100 "can set the cwlVersion to v1.1") | |
101 job_reqs = process.metadata["cwl:defaults"]["https://w3id.org/cwl/cwl#requirements"] | |
102 if job_reqs is not None: | |
103 for req in job_reqs: | |
104 process.requirements.append(req) | |
105 | |
106 self.run_jobs(process, job_order_object, logger, runtime_context) | |
107 | |
108 if self.final_output and self.final_output[0] is not None and finaloutdir is not None: | |
109 self.final_output[0] = relocateOutputs( | |
110 self.final_output[0], finaloutdir, self.output_dirs, | |
111 runtime_context.move_outputs, runtime_context.make_fs_access(""), | |
112 getdefault(runtime_context.compute_checksum, True), | |
113 path_mapper=runtime_context.path_mapper) | |
114 | |
115 if runtime_context.rm_tmpdir: | |
116 if runtime_context.cachedir is None: | |
117 output_dirs = self.output_dirs # type: Iterable[Any] | |
118 else: | |
119 output_dirs = filter(lambda x: not x.startswith( | |
120 runtime_context.cachedir), self.output_dirs) | |
121 cleanIntermediate(output_dirs) | |
122 | |
123 if self.final_output and self.final_status: | |
124 | |
125 if runtime_context.research_obj is not None and \ | |
126 isinstance(process, (JobBase, Process, WorkflowJobStep, | |
127 WorkflowJob)) and process.parent_wf: | |
128 process_run_id = None | |
129 name = "primary" | |
130 process.parent_wf.generate_output_prov(self.final_output[0], | |
131 process_run_id, name) | |
132 process.parent_wf.document.wasEndedBy( | |
133 process.parent_wf.workflow_run_uri, None, process.parent_wf.engine_uuid, | |
134 datetime.datetime.now()) | |
135 process.parent_wf.finalize_prov_profile(name=None) | |
136 return (self.final_output[0], self.final_status[0]) | |
137 return (None, "permanentFail") | |
138 | |
139 | |
140 class SingleJobExecutor(JobExecutor): | |
141 """Default single-threaded CWL reference executor.""" | |
142 | |
143 def run_jobs(self, | |
144 process, # type: Process | |
145 job_order_object, # type: Dict[Text, Any] | |
146 logger, # type: logging.Logger | |
147 runtime_context # type: RuntimeContext | |
148 ): # type: (...) -> None | |
149 | |
150 process_run_id = None # type: Optional[str] | |
151 | |
152 # define provenance profile for single commandline tool | |
153 if not isinstance(process, Workflow) \ | |
154 and runtime_context.research_obj is not None: | |
155 process.provenance_object = ProvenanceProfile( | |
156 runtime_context.research_obj, | |
157 full_name=runtime_context.cwl_full_name, | |
158 host_provenance=False, | |
159 user_provenance=False, | |
160 orcid=runtime_context.orcid, | |
161 # single tool execution, so RO UUID = wf UUID = tool UUID | |
162 run_uuid=runtime_context.research_obj.ro_uuid, | |
163 fsaccess=runtime_context.make_fs_access('')) | |
164 process.parent_wf = process.provenance_object | |
165 jobiter = process.job(job_order_object, self.output_callback, | |
166 runtime_context) | |
167 | |
168 try: | |
169 for job in jobiter: | |
170 if job is not None: | |
171 if runtime_context.builder is not None: | |
172 job.builder = runtime_context.builder | |
173 if job.outdir is not None: | |
174 self.output_dirs.add(job.outdir) | |
175 if runtime_context.research_obj is not None: | |
176 if not isinstance(process, Workflow): | |
177 prov_obj = process.provenance_object | |
178 else: | |
179 prov_obj = job.prov_obj | |
180 if prov_obj: | |
181 runtime_context.prov_obj = prov_obj | |
182 prov_obj.fsaccess = runtime_context.make_fs_access('') | |
183 prov_obj.evaluate( | |
184 process, job, job_order_object, | |
185 runtime_context.research_obj) | |
186 process_run_id =\ | |
187 prov_obj.record_process_start(process, job) | |
188 runtime_context = runtime_context.copy() | |
189 runtime_context.process_run_id = process_run_id | |
190 job.run(runtime_context) | |
191 else: | |
192 logger.error("Workflow cannot make any more progress.") | |
193 break | |
194 except (ValidationException, WorkflowException): # pylint: disable=try-except-raise | |
195 raise | |
196 except Exception as err: | |
197 logger.exception("Got workflow error") | |
198 raise_from(WorkflowException(Text(err)), err) | |
199 | |
200 | |
201 class MultithreadedJobExecutor(JobExecutor): | |
202 """ | |
203 Experimental multi-threaded CWL executor. | |
204 | |
205 Does simple resource accounting, will not start a job unless it | |
206 has cores / ram available, but does not make any attempt to | |
207 optimize usage. | |
208 """ | |
209 | |
210 def __init__(self): # type: () -> None | |
211 """Initialize.""" | |
212 super(MultithreadedJobExecutor, self).__init__() | |
213 self.threads = set() # type: Set[threading.Thread] | |
214 self.exceptions = [] # type: List[WorkflowException] | |
215 self.pending_jobs = [] # type: List[Union[JobBase, WorkflowJob]] | |
216 self.pending_jobs_lock = threading.Lock() | |
217 | |
218 self.max_ram = int(psutil.virtual_memory().available / 2**20) | |
219 self.max_cores = psutil.cpu_count() | |
220 self.allocated_ram = 0 | |
221 self.allocated_cores = 0 | |
222 | |
223 def select_resources(self, request, runtime_context): # pylint: disable=unused-argument | |
224 # type: (Dict[str, int], RuntimeContext) -> Dict[str, int] | |
225 """Naïve check for available cpu cores and memory.""" | |
226 result = {} # type: Dict[str, int] | |
227 maxrsc = { | |
228 "cores": self.max_cores, | |
229 "ram": self.max_ram | |
230 } | |
231 for rsc in ("cores", "ram"): | |
232 if request[rsc+"Min"] > maxrsc[rsc]: | |
233 raise WorkflowException( | |
234 "Requested at least %d %s but only %d available" % | |
235 (request[rsc+"Min"], rsc, maxrsc[rsc])) | |
236 if request[rsc+"Max"] < maxrsc[rsc]: | |
237 result[rsc] = request[rsc+"Max"] | |
238 else: | |
239 result[rsc] = maxrsc[rsc] | |
240 | |
241 return result | |
242 | |
243 def _runner(self, job, runtime_context, TMPDIR_LOCK): | |
244 # type: (Union[JobBase, WorkflowJob, CallbackJob], RuntimeContext, threading.Lock) -> None | |
245 """Job running thread.""" | |
246 try: | |
247 _logger.debug("job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format(job, runtime_context, TMPDIR_LOCK)) | |
248 job.run(runtime_context, TMPDIR_LOCK) | |
249 except WorkflowException as err: | |
250 _logger.exception("Got workflow error") | |
251 self.exceptions.append(err) | |
252 except Exception as err: # pylint: disable=broad-except | |
253 _logger.exception("Got workflow error") | |
254 self.exceptions.append(WorkflowException(Text(err))) | |
255 finally: | |
256 if runtime_context.workflow_eval_lock: | |
257 with runtime_context.workflow_eval_lock: | |
258 self.threads.remove(threading.current_thread()) | |
259 if isinstance(job, JobBase): | |
260 self.allocated_ram -= job.builder.resources["ram"] | |
261 self.allocated_cores -= job.builder.resources["cores"] | |
262 runtime_context.workflow_eval_lock.notifyAll() | |
263 | |
264 def run_job(self, | |
265 job, # type: Union[JobBase, WorkflowJob, None] | |
266 runtime_context # type: RuntimeContext | |
267 ): # type: (...) -> None | |
268 """Execute a single Job in a seperate thread.""" | |
269 if job is not None: | |
270 with self.pending_jobs_lock: | |
271 self.pending_jobs.append(job) | |
272 | |
273 with self.pending_jobs_lock: | |
274 n = 0 | |
275 while (n+1) <= len(self.pending_jobs): | |
276 job = self.pending_jobs[n] | |
277 if isinstance(job, JobBase): | |
278 if ((job.builder.resources["ram"]) | |
279 > self.max_ram | |
280 or (job.builder.resources["cores"]) | |
281 > self.max_cores): | |
282 _logger.error( | |
283 'Job "%s" cannot be run, requests more resources (%s) ' | |
284 'than available on this host (max ram %d, max cores %d', | |
285 job.name, job.builder.resources, | |
286 self.allocated_ram, | |
287 self.allocated_cores, | |
288 self.max_ram, | |
289 self.max_cores) | |
290 self.pending_jobs.remove(job) | |
291 return | |
292 | |
293 if ((self.allocated_ram + job.builder.resources["ram"]) | |
294 > self.max_ram | |
295 or (self.allocated_cores + job.builder.resources["cores"]) | |
296 > self.max_cores): | |
297 _logger.debug( | |
298 'Job "%s" cannot run yet, resources (%s) are not ' | |
299 'available (already allocated ram is %d, allocated cores is %d, ' | |
300 'max ram %d, max cores %d', | |
301 job.name, job.builder.resources, | |
302 self.allocated_ram, | |
303 self.allocated_cores, | |
304 self.max_ram, | |
305 self.max_cores) | |
306 n += 1 | |
307 continue | |
308 | |
309 thread = threading.Thread(target=self._runner, args=(job, runtime_context, TMPDIR_LOCK)) | |
310 thread.daemon = True | |
311 self.threads.add(thread) | |
312 if isinstance(job, JobBase): | |
313 self.allocated_ram += job.builder.resources["ram"] | |
314 self.allocated_cores += job.builder.resources["cores"] | |
315 thread.start() | |
316 self.pending_jobs.remove(job) | |
317 | |
318 def wait_for_next_completion(self, runtime_context): | |
319 # type: (RuntimeContext) -> None | |
320 """Wait for jobs to finish.""" | |
321 if runtime_context.workflow_eval_lock is not None: | |
322 runtime_context.workflow_eval_lock.wait() | |
323 if self.exceptions: | |
324 raise self.exceptions[0] | |
325 | |
326 def run_jobs(self, | |
327 process, # type: Process | |
328 job_order_object, # type: Dict[Text, Any] | |
329 logger, # type: logging.Logger | |
330 runtime_context # type: RuntimeContext | |
331 ): # type: (...) -> None | |
332 | |
333 jobiter = process.job(job_order_object, self.output_callback, | |
334 runtime_context) | |
335 | |
336 if runtime_context.workflow_eval_lock is None: | |
337 raise WorkflowException( | |
338 "runtimeContext.workflow_eval_lock must not be None") | |
339 | |
340 runtime_context.workflow_eval_lock.acquire() | |
341 for job in jobiter: | |
342 if job is not None: | |
343 if isinstance(job, JobBase): | |
344 job.builder = runtime_context.builder or job.builder | |
345 if job.outdir is not None: | |
346 self.output_dirs.add(job.outdir) | |
347 | |
348 self.run_job(job, runtime_context) | |
349 | |
350 if job is None: | |
351 if self.threads: | |
352 self.wait_for_next_completion(runtime_context) | |
353 else: | |
354 logger.error("Workflow cannot make any more progress.") | |
355 break | |
356 | |
357 self.run_job(None, runtime_context) | |
358 while self.threads: | |
359 self.wait_for_next_completion(runtime_context) | |
360 self.run_job(None, runtime_context) | |
361 | |
362 runtime_context.workflow_eval_lock.release() |