comparison env/lib/python3.9/site-packages/cwltool/executors.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Single and multi-threaded executors."""
2 import datetime
3 import functools
4 import logging
5 import math
6 import os
7 import threading
8 from abc import ABCMeta, abstractmethod
9 from threading import Lock
10 from typing import (
11 Dict,
12 Iterable,
13 List,
14 MutableSequence,
15 Optional,
16 Set,
17 Tuple,
18 Union,
19 cast,
20 )
21
22 import psutil
23 from schema_salad.exceptions import ValidationException
24 from schema_salad.sourceline import SourceLine
25
26 from .command_line_tool import CallbackJob, ExpressionJob
27 from .context import RuntimeContext, getdefault
28 from .errors import WorkflowException
29 from .job import JobBase
30 from .loghandler import _logger
31 from .mutation import MutationManager
32 from .process import Process, cleanIntermediate, relocateOutputs
33 from .provenance_profile import ProvenanceProfile
34 from .task_queue import TaskQueue
35 from .utils import CWLObjectType, JobsType
36 from .workflow import Workflow
37 from .workflow_job import WorkflowJob, WorkflowJobStep
38
39 TMPDIR_LOCK = Lock()
40
41
42 class JobExecutor(metaclass=ABCMeta):
43 """Abstract base job executor."""
44
45 def __init__(self) -> None:
46 """Initialize."""
47 self.final_output = [] # type: MutableSequence[Optional[CWLObjectType]]
48 self.final_status = [] # type: List[str]
49 self.output_dirs = set() # type: Set[str]
50
51 def __call__(
52 self,
53 process: Process,
54 job_order_object: CWLObjectType,
55 runtime_context: RuntimeContext,
56 logger: logging.Logger = _logger,
57 ) -> Tuple[Optional[CWLObjectType], str]:
58
59 return self.execute(process, job_order_object, runtime_context, logger)
60
61 def output_callback(
62 self, out: Optional[CWLObjectType], process_status: str
63 ) -> None:
64 """Collect the final status and outputs."""
65 self.final_status.append(process_status)
66 self.final_output.append(out)
67
68 @abstractmethod
69 def run_jobs(
70 self,
71 process: Process,
72 job_order_object: CWLObjectType,
73 logger: logging.Logger,
74 runtime_context: RuntimeContext,
75 ) -> None:
76 """Execute the jobs for the given Process."""
77
78 def execute(
79 self,
80 process: Process,
81 job_order_object: CWLObjectType,
82 runtime_context: RuntimeContext,
83 logger: logging.Logger = _logger,
84 ) -> Tuple[Union[Optional[CWLObjectType]], str]:
85 """Execute the process."""
86 if not runtime_context.basedir:
87 raise WorkflowException("Must provide 'basedir' in runtimeContext")
88
89 def check_for_abstract_op(tool: CWLObjectType) -> None:
90 if tool["class"] == "Operation":
91 raise SourceLine(tool, "class", WorkflowException).makeError(
92 "Workflow has unrunnable abstract Operation"
93 )
94
95 process.visit(check_for_abstract_op)
96
97 finaloutdir = None # Type: Optional[str]
98 original_outdir = runtime_context.outdir
99 if isinstance(original_outdir, str):
100 finaloutdir = os.path.abspath(original_outdir)
101 runtime_context = runtime_context.copy()
102 outdir = runtime_context.create_outdir()
103 self.output_dirs.add(outdir)
104 runtime_context.outdir = outdir
105 runtime_context.mutation_manager = MutationManager()
106 runtime_context.toplevel = True
107 runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())
108
109 job_reqs = None # type: Optional[List[CWLObjectType]]
110 if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
111 if (
112 process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion")
113 == "v1.0"
114 ):
115 raise WorkflowException(
116 "`cwl:requirements` in the input object is not part of CWL "
117 "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
118 "can set the cwlVersion to v1.1"
119 )
120 job_reqs = cast(
121 List[CWLObjectType],
122 job_order_object["https://w3id.org/cwl/cwl#requirements"],
123 )
124 elif (
125 "cwl:defaults" in process.metadata
126 and "https://w3id.org/cwl/cwl#requirements"
127 in cast(CWLObjectType, process.metadata["cwl:defaults"])
128 ):
129 if (
130 process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion")
131 == "v1.0"
132 ):
133 raise WorkflowException(
134 "`cwl:requirements` in the input object is not part of CWL "
135 "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
136 "can set the cwlVersion to v1.1"
137 )
138 job_reqs = cast(
139 Optional[List[CWLObjectType]],
140 cast(CWLObjectType, process.metadata["cwl:defaults"])[
141 "https://w3id.org/cwl/cwl#requirements"
142 ],
143 )
144 if job_reqs is not None:
145 for req in job_reqs:
146 process.requirements.append(req)
147
148 self.run_jobs(process, job_order_object, logger, runtime_context)
149
150 if (
151 self.final_output
152 and self.final_output[0] is not None
153 and finaloutdir is not None
154 ):
155 self.final_output[0] = relocateOutputs(
156 self.final_output[0],
157 finaloutdir,
158 self.output_dirs,
159 runtime_context.move_outputs,
160 runtime_context.make_fs_access(""),
161 getdefault(runtime_context.compute_checksum, True),
162 path_mapper=runtime_context.path_mapper,
163 )
164
165 if runtime_context.rm_tmpdir:
166 if runtime_context.cachedir is None:
167 output_dirs = self.output_dirs # type: Iterable[str]
168 else:
169 output_dirs = filter(
170 lambda x: not x.startswith(runtime_context.cachedir), # type: ignore
171 self.output_dirs,
172 )
173 cleanIntermediate(output_dirs)
174
175 if self.final_output and self.final_status:
176
177 if (
178 runtime_context.research_obj is not None
179 and isinstance(
180 process, (JobBase, Process, WorkflowJobStep, WorkflowJob)
181 )
182 and process.parent_wf
183 ):
184 process_run_id = None # type: Optional[str]
185 name = "primary"
186 process.parent_wf.generate_output_prov(
187 self.final_output[0], process_run_id, name
188 )
189 process.parent_wf.document.wasEndedBy(
190 process.parent_wf.workflow_run_uri,
191 None,
192 process.parent_wf.engine_uuid,
193 datetime.datetime.now(),
194 )
195 process.parent_wf.finalize_prov_profile(name=None)
196 return (self.final_output[0], self.final_status[0])
197 return (None, "permanentFail")
198
199
200 class SingleJobExecutor(JobExecutor):
201 """Default single-threaded CWL reference executor."""
202
203 def run_jobs(
204 self,
205 process: Process,
206 job_order_object: CWLObjectType,
207 logger: logging.Logger,
208 runtime_context: RuntimeContext,
209 ) -> None:
210
211 process_run_id = None # type: Optional[str]
212
213 # define provenance profile for single commandline tool
214 if (
215 not isinstance(process, Workflow)
216 and runtime_context.research_obj is not None
217 ):
218 process.provenance_object = ProvenanceProfile(
219 runtime_context.research_obj,
220 full_name=runtime_context.cwl_full_name,
221 host_provenance=False,
222 user_provenance=False,
223 orcid=runtime_context.orcid,
224 # single tool execution, so RO UUID = wf UUID = tool UUID
225 run_uuid=runtime_context.research_obj.ro_uuid,
226 fsaccess=runtime_context.make_fs_access(""),
227 )
228 process.parent_wf = process.provenance_object
229 jobiter = process.job(job_order_object, self.output_callback, runtime_context)
230
231 try:
232 for job in jobiter:
233 if job is not None:
234 if runtime_context.builder is not None and hasattr(job, "builder"):
235 job.builder = runtime_context.builder # type: ignore
236 if job.outdir is not None:
237 self.output_dirs.add(job.outdir)
238 if runtime_context.research_obj is not None:
239 if not isinstance(process, Workflow):
240 prov_obj = process.provenance_object
241 else:
242 prov_obj = job.prov_obj
243 if prov_obj:
244 runtime_context.prov_obj = prov_obj
245 prov_obj.fsaccess = runtime_context.make_fs_access("")
246 prov_obj.evaluate(
247 process,
248 job,
249 job_order_object,
250 runtime_context.research_obj,
251 )
252 process_run_id = prov_obj.record_process_start(process, job)
253 runtime_context = runtime_context.copy()
254 runtime_context.process_run_id = process_run_id
255 job.run(runtime_context)
256 else:
257 logger.error("Workflow cannot make any more progress.")
258 break
259 except (
260 ValidationException,
261 WorkflowException,
262 ): # pylint: disable=try-except-raise
263 raise
264 except Exception as err:
265 logger.exception("Got workflow error")
266 raise WorkflowException(str(err)) from err
267
268
269 class MultithreadedJobExecutor(JobExecutor):
270 """
271 Experimental multi-threaded CWL executor.
272
273 Does simple resource accounting, will not start a job unless it
274 has cores / ram available, but does not make any attempt to
275 optimize usage.
276 """
277
278 def __init__(self) -> None:
279 """Initialize."""
280 super().__init__()
281 self.exceptions = [] # type: List[WorkflowException]
282 self.pending_jobs = [] # type: List[JobsType]
283 self.pending_jobs_lock = threading.Lock()
284
285 self.max_ram = int(psutil.virtual_memory().available / 2 ** 20)
286 self.max_cores = float(psutil.cpu_count())
287 self.allocated_ram = float(0)
288 self.allocated_cores = float(0)
289
290 def select_resources(
291 self, request, runtime_context
292 ): # pylint: disable=unused-argument
293 # type: (Dict[str, Union[int, float, str]], RuntimeContext) -> Dict[str, Union[int, float, str]]
294 """Naïve check for available cpu cores and memory."""
295 result = {} # type: Dict[str, Union[int, float, str]]
296 maxrsc = {"cores": self.max_cores, "ram": self.max_ram}
297 for rsc in ("cores", "ram"):
298 rsc_min = request[rsc + "Min"]
299 if not isinstance(rsc_min, str) and rsc_min > maxrsc[rsc]:
300 raise WorkflowException(
301 "Requested at least %d %s but only %d available"
302 % (rsc_min, rsc, maxrsc[rsc])
303 )
304 rsc_max = request[rsc + "Max"]
305 if not isinstance(rsc_max, str) and rsc_max < maxrsc[rsc]:
306 result[rsc] = math.ceil(rsc_max)
307 else:
308 result[rsc] = maxrsc[rsc]
309
310 result["tmpdirSize"] = (
311 math.ceil(request["tmpdirMin"])
312 if not isinstance(request["tmpdirMin"], str)
313 else request["tmpdirMin"]
314 )
315 result["outdirSize"] = (
316 math.ceil(request["outdirMin"])
317 if not isinstance(request["outdirMin"], str)
318 else request["outdirMin"]
319 )
320
321 return result
322
323 def _runner(self, job, runtime_context, TMPDIR_LOCK):
324 # type: (Union[JobBase, WorkflowJob, CallbackJob, ExpressionJob], RuntimeContext, threading.Lock) -> None
325 """Job running thread."""
326 try:
327 _logger.debug(
328 "job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format(
329 job, runtime_context, TMPDIR_LOCK
330 )
331 )
332 job.run(runtime_context, TMPDIR_LOCK)
333 except WorkflowException as err:
334 _logger.exception(f"Got workflow error: {err}")
335 self.exceptions.append(err)
336 except Exception as err: # pylint: disable=broad-except
337 _logger.exception(f"Got workflow error: {err}")
338 self.exceptions.append(WorkflowException(str(err)))
339 finally:
340 if runtime_context.workflow_eval_lock:
341 with runtime_context.workflow_eval_lock:
342 if isinstance(job, JobBase):
343 ram = job.builder.resources["ram"]
344 if not isinstance(ram, str):
345 self.allocated_ram -= ram
346 cores = job.builder.resources["cores"]
347 if not isinstance(cores, str):
348 self.allocated_cores -= cores
349 runtime_context.workflow_eval_lock.notifyAll()
350
351 def run_job(
352 self,
353 job: Optional[JobsType],
354 runtime_context: RuntimeContext,
355 ) -> None:
356 """Execute a single Job in a seperate thread."""
357 if job is not None:
358 with self.pending_jobs_lock:
359 self.pending_jobs.append(job)
360
361 with self.pending_jobs_lock:
362 n = 0
363 while (n + 1) <= len(self.pending_jobs):
364 # Simple greedy resource allocation strategy. Go
365 # through pending jobs in the order they were
366 # generated and add them to the queue only if there
367 # are resources available.
368 job = self.pending_jobs[n]
369 if isinstance(job, JobBase):
370 ram = job.builder.resources["ram"]
371 cores = job.builder.resources["cores"]
372 if (not isinstance(ram, str) and ram > self.max_ram) or (
373 not isinstance(cores, str) and cores > self.max_cores
374 ):
375 _logger.error(
376 'Job "%s" cannot be run, requests more resources (%s) '
377 "than available on this host (max ram %d, max cores %d",
378 job.name,
379 job.builder.resources,
380 self.allocated_ram,
381 self.allocated_cores,
382 self.max_ram,
383 self.max_cores,
384 )
385 self.pending_jobs.remove(job)
386 return
387
388 if (
389 not isinstance(ram, str)
390 and self.allocated_ram + ram > self.max_ram
391 ) or (
392 not isinstance(cores, str)
393 and self.allocated_cores + cores > self.max_cores
394 ):
395 _logger.debug(
396 'Job "%s" cannot run yet, resources (%s) are not '
397 "available (already allocated ram is %d, allocated cores is %d, "
398 "max ram %d, max cores %d",
399 job.name,
400 job.builder.resources,
401 self.allocated_ram,
402 self.allocated_cores,
403 self.max_ram,
404 self.max_cores,
405 )
406 n += 1
407 continue
408
409 if isinstance(job, JobBase):
410 ram = job.builder.resources["ram"]
411 if not isinstance(ram, str):
412 self.allocated_ram += ram
413 cores = job.builder.resources["cores"]
414 if not isinstance(cores, str):
415 self.allocated_cores += cores
416 self.taskqueue.add(
417 functools.partial(self._runner, job, runtime_context, TMPDIR_LOCK),
418 runtime_context.workflow_eval_lock,
419 )
420 self.pending_jobs.remove(job)
421
422 def wait_for_next_completion(self, runtime_context):
423 # type: (RuntimeContext) -> None
424 """Wait for jobs to finish."""
425 if runtime_context.workflow_eval_lock is not None:
426 runtime_context.workflow_eval_lock.wait(timeout=3)
427 if self.exceptions:
428 raise self.exceptions[0]
429
430 def run_jobs(
431 self,
432 process: Process,
433 job_order_object: CWLObjectType,
434 logger: logging.Logger,
435 runtime_context: RuntimeContext,
436 ) -> None:
437
438 self.taskqueue = TaskQueue(
439 threading.Lock(), psutil.cpu_count()
440 ) # type: TaskQueue
441 try:
442
443 jobiter = process.job(
444 job_order_object, self.output_callback, runtime_context
445 )
446
447 if runtime_context.workflow_eval_lock is None:
448 raise WorkflowException(
449 "runtimeContext.workflow_eval_lock must not be None"
450 )
451
452 runtime_context.workflow_eval_lock.acquire()
453 for job in jobiter:
454 if job is not None:
455 if isinstance(job, JobBase):
456 job.builder = runtime_context.builder or job.builder
457 if job.outdir is not None:
458 self.output_dirs.add(job.outdir)
459
460 self.run_job(job, runtime_context)
461
462 if job is None:
463 if self.taskqueue.in_flight > 0:
464 self.wait_for_next_completion(runtime_context)
465 else:
466 logger.error("Workflow cannot make any more progress.")
467 break
468
469 self.run_job(None, runtime_context)
470 while self.taskqueue.in_flight > 0:
471 self.wait_for_next_completion(runtime_context)
472 self.run_job(None, runtime_context)
473
474 runtime_context.workflow_eval_lock.release()
475 finally:
476 self.taskqueue.drain()
477 self.taskqueue.join()
478
479
480 class NoopJobExecutor(JobExecutor):
481 """Do nothing executor, for testing purposes only."""
482
483 def run_jobs(
484 self,
485 process: Process,
486 job_order_object: CWLObjectType,
487 logger: logging.Logger,
488 runtime_context: RuntimeContext,
489 ) -> None:
490 pass
491
492 def execute(
493 self,
494 process: Process,
495 job_order_object: CWLObjectType,
496 runtime_context: RuntimeContext,
497 logger: Optional[logging.Logger] = None,
498 ) -> Tuple[Optional[CWLObjectType], str]:
499 return {}, "success"