Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/executors.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """Single and multi-threaded executors.""" | |
| 2 import datetime | |
| 3 import functools | |
| 4 import logging | |
| 5 import math | |
| 6 import os | |
| 7 import threading | |
| 8 from abc import ABCMeta, abstractmethod | |
| 9 from threading import Lock | |
| 10 from typing import ( | |
| 11 Dict, | |
| 12 Iterable, | |
| 13 List, | |
| 14 MutableSequence, | |
| 15 Optional, | |
| 16 Set, | |
| 17 Tuple, | |
| 18 Union, | |
| 19 cast, | |
| 20 ) | |
| 21 | |
| 22 import psutil | |
| 23 from schema_salad.exceptions import ValidationException | |
| 24 from schema_salad.sourceline import SourceLine | |
| 25 | |
| 26 from .command_line_tool import CallbackJob, ExpressionJob | |
| 27 from .context import RuntimeContext, getdefault | |
| 28 from .errors import WorkflowException | |
| 29 from .job import JobBase | |
| 30 from .loghandler import _logger | |
| 31 from .mutation import MutationManager | |
| 32 from .process import Process, cleanIntermediate, relocateOutputs | |
| 33 from .provenance_profile import ProvenanceProfile | |
| 34 from .task_queue import TaskQueue | |
| 35 from .utils import CWLObjectType, JobsType | |
| 36 from .workflow import Workflow | |
| 37 from .workflow_job import WorkflowJob, WorkflowJobStep | |
| 38 | |
| 39 TMPDIR_LOCK = Lock() | |
| 40 | |
| 41 | |
| 42 class JobExecutor(metaclass=ABCMeta): | |
| 43 """Abstract base job executor.""" | |
| 44 | |
| 45 def __init__(self) -> None: | |
| 46 """Initialize.""" | |
| 47 self.final_output = [] # type: MutableSequence[Optional[CWLObjectType]] | |
| 48 self.final_status = [] # type: List[str] | |
| 49 self.output_dirs = set() # type: Set[str] | |
| 50 | |
| 51 def __call__( | |
| 52 self, | |
| 53 process: Process, | |
| 54 job_order_object: CWLObjectType, | |
| 55 runtime_context: RuntimeContext, | |
| 56 logger: logging.Logger = _logger, | |
| 57 ) -> Tuple[Optional[CWLObjectType], str]: | |
| 58 | |
| 59 return self.execute(process, job_order_object, runtime_context, logger) | |
| 60 | |
| 61 def output_callback( | |
| 62 self, out: Optional[CWLObjectType], process_status: str | |
| 63 ) -> None: | |
| 64 """Collect the final status and outputs.""" | |
| 65 self.final_status.append(process_status) | |
| 66 self.final_output.append(out) | |
| 67 | |
| 68 @abstractmethod | |
| 69 def run_jobs( | |
| 70 self, | |
| 71 process: Process, | |
| 72 job_order_object: CWLObjectType, | |
| 73 logger: logging.Logger, | |
| 74 runtime_context: RuntimeContext, | |
| 75 ) -> None: | |
| 76 """Execute the jobs for the given Process.""" | |
| 77 | |
| 78 def execute( | |
| 79 self, | |
| 80 process: Process, | |
| 81 job_order_object: CWLObjectType, | |
| 82 runtime_context: RuntimeContext, | |
| 83 logger: logging.Logger = _logger, | |
| 84 ) -> Tuple[Union[Optional[CWLObjectType]], str]: | |
| 85 """Execute the process.""" | |
| 86 if not runtime_context.basedir: | |
| 87 raise WorkflowException("Must provide 'basedir' in runtimeContext") | |
| 88 | |
| 89 def check_for_abstract_op(tool: CWLObjectType) -> None: | |
| 90 if tool["class"] == "Operation": | |
| 91 raise SourceLine(tool, "class", WorkflowException).makeError( | |
| 92 "Workflow has unrunnable abstract Operation" | |
| 93 ) | |
| 94 | |
| 95 process.visit(check_for_abstract_op) | |
| 96 | |
| 97 finaloutdir = None # Type: Optional[str] | |
| 98 original_outdir = runtime_context.outdir | |
| 99 if isinstance(original_outdir, str): | |
| 100 finaloutdir = os.path.abspath(original_outdir) | |
| 101 runtime_context = runtime_context.copy() | |
| 102 outdir = runtime_context.create_outdir() | |
| 103 self.output_dirs.add(outdir) | |
| 104 runtime_context.outdir = outdir | |
| 105 runtime_context.mutation_manager = MutationManager() | |
| 106 runtime_context.toplevel = True | |
| 107 runtime_context.workflow_eval_lock = threading.Condition(threading.RLock()) | |
| 108 | |
| 109 job_reqs = None # type: Optional[List[CWLObjectType]] | |
| 110 if "https://w3id.org/cwl/cwl#requirements" in job_order_object: | |
| 111 if ( | |
| 112 process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") | |
| 113 == "v1.0" | |
| 114 ): | |
| 115 raise WorkflowException( | |
| 116 "`cwl:requirements` in the input object is not part of CWL " | |
| 117 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
| 118 "can set the cwlVersion to v1.1" | |
| 119 ) | |
| 120 job_reqs = cast( | |
| 121 List[CWLObjectType], | |
| 122 job_order_object["https://w3id.org/cwl/cwl#requirements"], | |
| 123 ) | |
| 124 elif ( | |
| 125 "cwl:defaults" in process.metadata | |
| 126 and "https://w3id.org/cwl/cwl#requirements" | |
| 127 in cast(CWLObjectType, process.metadata["cwl:defaults"]) | |
| 128 ): | |
| 129 if ( | |
| 130 process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") | |
| 131 == "v1.0" | |
| 132 ): | |
| 133 raise WorkflowException( | |
| 134 "`cwl:requirements` in the input object is not part of CWL " | |
| 135 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
| 136 "can set the cwlVersion to v1.1" | |
| 137 ) | |
| 138 job_reqs = cast( | |
| 139 Optional[List[CWLObjectType]], | |
| 140 cast(CWLObjectType, process.metadata["cwl:defaults"])[ | |
| 141 "https://w3id.org/cwl/cwl#requirements" | |
| 142 ], | |
| 143 ) | |
| 144 if job_reqs is not None: | |
| 145 for req in job_reqs: | |
| 146 process.requirements.append(req) | |
| 147 | |
| 148 self.run_jobs(process, job_order_object, logger, runtime_context) | |
| 149 | |
| 150 if ( | |
| 151 self.final_output | |
| 152 and self.final_output[0] is not None | |
| 153 and finaloutdir is not None | |
| 154 ): | |
| 155 self.final_output[0] = relocateOutputs( | |
| 156 self.final_output[0], | |
| 157 finaloutdir, | |
| 158 self.output_dirs, | |
| 159 runtime_context.move_outputs, | |
| 160 runtime_context.make_fs_access(""), | |
| 161 getdefault(runtime_context.compute_checksum, True), | |
| 162 path_mapper=runtime_context.path_mapper, | |
| 163 ) | |
| 164 | |
| 165 if runtime_context.rm_tmpdir: | |
| 166 if runtime_context.cachedir is None: | |
| 167 output_dirs = self.output_dirs # type: Iterable[str] | |
| 168 else: | |
| 169 output_dirs = filter( | |
| 170 lambda x: not x.startswith(runtime_context.cachedir), # type: ignore | |
| 171 self.output_dirs, | |
| 172 ) | |
| 173 cleanIntermediate(output_dirs) | |
| 174 | |
| 175 if self.final_output and self.final_status: | |
| 176 | |
| 177 if ( | |
| 178 runtime_context.research_obj is not None | |
| 179 and isinstance( | |
| 180 process, (JobBase, Process, WorkflowJobStep, WorkflowJob) | |
| 181 ) | |
| 182 and process.parent_wf | |
| 183 ): | |
| 184 process_run_id = None # type: Optional[str] | |
| 185 name = "primary" | |
| 186 process.parent_wf.generate_output_prov( | |
| 187 self.final_output[0], process_run_id, name | |
| 188 ) | |
| 189 process.parent_wf.document.wasEndedBy( | |
| 190 process.parent_wf.workflow_run_uri, | |
| 191 None, | |
| 192 process.parent_wf.engine_uuid, | |
| 193 datetime.datetime.now(), | |
| 194 ) | |
| 195 process.parent_wf.finalize_prov_profile(name=None) | |
| 196 return (self.final_output[0], self.final_status[0]) | |
| 197 return (None, "permanentFail") | |
| 198 | |
| 199 | |
| 200 class SingleJobExecutor(JobExecutor): | |
| 201 """Default single-threaded CWL reference executor.""" | |
| 202 | |
| 203 def run_jobs( | |
| 204 self, | |
| 205 process: Process, | |
| 206 job_order_object: CWLObjectType, | |
| 207 logger: logging.Logger, | |
| 208 runtime_context: RuntimeContext, | |
| 209 ) -> None: | |
| 210 | |
| 211 process_run_id = None # type: Optional[str] | |
| 212 | |
| 213 # define provenance profile for single commandline tool | |
| 214 if ( | |
| 215 not isinstance(process, Workflow) | |
| 216 and runtime_context.research_obj is not None | |
| 217 ): | |
| 218 process.provenance_object = ProvenanceProfile( | |
| 219 runtime_context.research_obj, | |
| 220 full_name=runtime_context.cwl_full_name, | |
| 221 host_provenance=False, | |
| 222 user_provenance=False, | |
| 223 orcid=runtime_context.orcid, | |
| 224 # single tool execution, so RO UUID = wf UUID = tool UUID | |
| 225 run_uuid=runtime_context.research_obj.ro_uuid, | |
| 226 fsaccess=runtime_context.make_fs_access(""), | |
| 227 ) | |
| 228 process.parent_wf = process.provenance_object | |
| 229 jobiter = process.job(job_order_object, self.output_callback, runtime_context) | |
| 230 | |
| 231 try: | |
| 232 for job in jobiter: | |
| 233 if job is not None: | |
| 234 if runtime_context.builder is not None and hasattr(job, "builder"): | |
| 235 job.builder = runtime_context.builder # type: ignore | |
| 236 if job.outdir is not None: | |
| 237 self.output_dirs.add(job.outdir) | |
| 238 if runtime_context.research_obj is not None: | |
| 239 if not isinstance(process, Workflow): | |
| 240 prov_obj = process.provenance_object | |
| 241 else: | |
| 242 prov_obj = job.prov_obj | |
| 243 if prov_obj: | |
| 244 runtime_context.prov_obj = prov_obj | |
| 245 prov_obj.fsaccess = runtime_context.make_fs_access("") | |
| 246 prov_obj.evaluate( | |
| 247 process, | |
| 248 job, | |
| 249 job_order_object, | |
| 250 runtime_context.research_obj, | |
| 251 ) | |
| 252 process_run_id = prov_obj.record_process_start(process, job) | |
| 253 runtime_context = runtime_context.copy() | |
| 254 runtime_context.process_run_id = process_run_id | |
| 255 job.run(runtime_context) | |
| 256 else: | |
| 257 logger.error("Workflow cannot make any more progress.") | |
| 258 break | |
| 259 except ( | |
| 260 ValidationException, | |
| 261 WorkflowException, | |
| 262 ): # pylint: disable=try-except-raise | |
| 263 raise | |
| 264 except Exception as err: | |
| 265 logger.exception("Got workflow error") | |
| 266 raise WorkflowException(str(err)) from err | |
| 267 | |
| 268 | |
| 269 class MultithreadedJobExecutor(JobExecutor): | |
| 270 """ | |
| 271 Experimental multi-threaded CWL executor. | |
| 272 | |
| 273 Does simple resource accounting, will not start a job unless it | |
| 274 has cores / ram available, but does not make any attempt to | |
| 275 optimize usage. | |
| 276 """ | |
| 277 | |
| 278 def __init__(self) -> None: | |
| 279 """Initialize.""" | |
| 280 super().__init__() | |
| 281 self.exceptions = [] # type: List[WorkflowException] | |
| 282 self.pending_jobs = [] # type: List[JobsType] | |
| 283 self.pending_jobs_lock = threading.Lock() | |
| 284 | |
| 285 self.max_ram = int(psutil.virtual_memory().available / 2 ** 20) | |
| 286 self.max_cores = float(psutil.cpu_count()) | |
| 287 self.allocated_ram = float(0) | |
| 288 self.allocated_cores = float(0) | |
| 289 | |
| 290 def select_resources( | |
| 291 self, request, runtime_context | |
| 292 ): # pylint: disable=unused-argument | |
| 293 # type: (Dict[str, Union[int, float, str]], RuntimeContext) -> Dict[str, Union[int, float, str]] | |
| 294 """Naïve check for available cpu cores and memory.""" | |
| 295 result = {} # type: Dict[str, Union[int, float, str]] | |
| 296 maxrsc = {"cores": self.max_cores, "ram": self.max_ram} | |
| 297 for rsc in ("cores", "ram"): | |
| 298 rsc_min = request[rsc + "Min"] | |
| 299 if not isinstance(rsc_min, str) and rsc_min > maxrsc[rsc]: | |
| 300 raise WorkflowException( | |
| 301 "Requested at least %d %s but only %d available" | |
| 302 % (rsc_min, rsc, maxrsc[rsc]) | |
| 303 ) | |
| 304 rsc_max = request[rsc + "Max"] | |
| 305 if not isinstance(rsc_max, str) and rsc_max < maxrsc[rsc]: | |
| 306 result[rsc] = math.ceil(rsc_max) | |
| 307 else: | |
| 308 result[rsc] = maxrsc[rsc] | |
| 309 | |
| 310 result["tmpdirSize"] = ( | |
| 311 math.ceil(request["tmpdirMin"]) | |
| 312 if not isinstance(request["tmpdirMin"], str) | |
| 313 else request["tmpdirMin"] | |
| 314 ) | |
| 315 result["outdirSize"] = ( | |
| 316 math.ceil(request["outdirMin"]) | |
| 317 if not isinstance(request["outdirMin"], str) | |
| 318 else request["outdirMin"] | |
| 319 ) | |
| 320 | |
| 321 return result | |
| 322 | |
| 323 def _runner(self, job, runtime_context, TMPDIR_LOCK): | |
| 324 # type: (Union[JobBase, WorkflowJob, CallbackJob, ExpressionJob], RuntimeContext, threading.Lock) -> None | |
| 325 """Job running thread.""" | |
| 326 try: | |
| 327 _logger.debug( | |
| 328 "job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format( | |
| 329 job, runtime_context, TMPDIR_LOCK | |
| 330 ) | |
| 331 ) | |
| 332 job.run(runtime_context, TMPDIR_LOCK) | |
| 333 except WorkflowException as err: | |
| 334 _logger.exception(f"Got workflow error: {err}") | |
| 335 self.exceptions.append(err) | |
| 336 except Exception as err: # pylint: disable=broad-except | |
| 337 _logger.exception(f"Got workflow error: {err}") | |
| 338 self.exceptions.append(WorkflowException(str(err))) | |
| 339 finally: | |
| 340 if runtime_context.workflow_eval_lock: | |
| 341 with runtime_context.workflow_eval_lock: | |
| 342 if isinstance(job, JobBase): | |
| 343 ram = job.builder.resources["ram"] | |
| 344 if not isinstance(ram, str): | |
| 345 self.allocated_ram -= ram | |
| 346 cores = job.builder.resources["cores"] | |
| 347 if not isinstance(cores, str): | |
| 348 self.allocated_cores -= cores | |
| 349 runtime_context.workflow_eval_lock.notifyAll() | |
| 350 | |
| 351 def run_job( | |
| 352 self, | |
| 353 job: Optional[JobsType], | |
| 354 runtime_context: RuntimeContext, | |
| 355 ) -> None: | |
| 356 """Execute a single Job in a seperate thread.""" | |
| 357 if job is not None: | |
| 358 with self.pending_jobs_lock: | |
| 359 self.pending_jobs.append(job) | |
| 360 | |
| 361 with self.pending_jobs_lock: | |
| 362 n = 0 | |
| 363 while (n + 1) <= len(self.pending_jobs): | |
| 364 # Simple greedy resource allocation strategy. Go | |
| 365 # through pending jobs in the order they were | |
| 366 # generated and add them to the queue only if there | |
| 367 # are resources available. | |
| 368 job = self.pending_jobs[n] | |
| 369 if isinstance(job, JobBase): | |
| 370 ram = job.builder.resources["ram"] | |
| 371 cores = job.builder.resources["cores"] | |
| 372 if (not isinstance(ram, str) and ram > self.max_ram) or ( | |
| 373 not isinstance(cores, str) and cores > self.max_cores | |
| 374 ): | |
| 375 _logger.error( | |
| 376 'Job "%s" cannot be run, requests more resources (%s) ' | |
| 377 "than available on this host (max ram %d, max cores %d", | |
| 378 job.name, | |
| 379 job.builder.resources, | |
| 380 self.allocated_ram, | |
| 381 self.allocated_cores, | |
| 382 self.max_ram, | |
| 383 self.max_cores, | |
| 384 ) | |
| 385 self.pending_jobs.remove(job) | |
| 386 return | |
| 387 | |
| 388 if ( | |
| 389 not isinstance(ram, str) | |
| 390 and self.allocated_ram + ram > self.max_ram | |
| 391 ) or ( | |
| 392 not isinstance(cores, str) | |
| 393 and self.allocated_cores + cores > self.max_cores | |
| 394 ): | |
| 395 _logger.debug( | |
| 396 'Job "%s" cannot run yet, resources (%s) are not ' | |
| 397 "available (already allocated ram is %d, allocated cores is %d, " | |
| 398 "max ram %d, max cores %d", | |
| 399 job.name, | |
| 400 job.builder.resources, | |
| 401 self.allocated_ram, | |
| 402 self.allocated_cores, | |
| 403 self.max_ram, | |
| 404 self.max_cores, | |
| 405 ) | |
| 406 n += 1 | |
| 407 continue | |
| 408 | |
| 409 if isinstance(job, JobBase): | |
| 410 ram = job.builder.resources["ram"] | |
| 411 if not isinstance(ram, str): | |
| 412 self.allocated_ram += ram | |
| 413 cores = job.builder.resources["cores"] | |
| 414 if not isinstance(cores, str): | |
| 415 self.allocated_cores += cores | |
| 416 self.taskqueue.add( | |
| 417 functools.partial(self._runner, job, runtime_context, TMPDIR_LOCK), | |
| 418 runtime_context.workflow_eval_lock, | |
| 419 ) | |
| 420 self.pending_jobs.remove(job) | |
| 421 | |
| 422 def wait_for_next_completion(self, runtime_context): | |
| 423 # type: (RuntimeContext) -> None | |
| 424 """Wait for jobs to finish.""" | |
| 425 if runtime_context.workflow_eval_lock is not None: | |
| 426 runtime_context.workflow_eval_lock.wait(timeout=3) | |
| 427 if self.exceptions: | |
| 428 raise self.exceptions[0] | |
| 429 | |
| 430 def run_jobs( | |
| 431 self, | |
| 432 process: Process, | |
| 433 job_order_object: CWLObjectType, | |
| 434 logger: logging.Logger, | |
| 435 runtime_context: RuntimeContext, | |
| 436 ) -> None: | |
| 437 | |
| 438 self.taskqueue = TaskQueue( | |
| 439 threading.Lock(), psutil.cpu_count() | |
| 440 ) # type: TaskQueue | |
| 441 try: | |
| 442 | |
| 443 jobiter = process.job( | |
| 444 job_order_object, self.output_callback, runtime_context | |
| 445 ) | |
| 446 | |
| 447 if runtime_context.workflow_eval_lock is None: | |
| 448 raise WorkflowException( | |
| 449 "runtimeContext.workflow_eval_lock must not be None" | |
| 450 ) | |
| 451 | |
| 452 runtime_context.workflow_eval_lock.acquire() | |
| 453 for job in jobiter: | |
| 454 if job is not None: | |
| 455 if isinstance(job, JobBase): | |
| 456 job.builder = runtime_context.builder or job.builder | |
| 457 if job.outdir is not None: | |
| 458 self.output_dirs.add(job.outdir) | |
| 459 | |
| 460 self.run_job(job, runtime_context) | |
| 461 | |
| 462 if job is None: | |
| 463 if self.taskqueue.in_flight > 0: | |
| 464 self.wait_for_next_completion(runtime_context) | |
| 465 else: | |
| 466 logger.error("Workflow cannot make any more progress.") | |
| 467 break | |
| 468 | |
| 469 self.run_job(None, runtime_context) | |
| 470 while self.taskqueue.in_flight > 0: | |
| 471 self.wait_for_next_completion(runtime_context) | |
| 472 self.run_job(None, runtime_context) | |
| 473 | |
| 474 runtime_context.workflow_eval_lock.release() | |
| 475 finally: | |
| 476 self.taskqueue.drain() | |
| 477 self.taskqueue.join() | |
| 478 | |
| 479 | |
| 480 class NoopJobExecutor(JobExecutor): | |
| 481 """Do nothing executor, for testing purposes only.""" | |
| 482 | |
| 483 def run_jobs( | |
| 484 self, | |
| 485 process: Process, | |
| 486 job_order_object: CWLObjectType, | |
| 487 logger: logging.Logger, | |
| 488 runtime_context: RuntimeContext, | |
| 489 ) -> None: | |
| 490 pass | |
| 491 | |
| 492 def execute( | |
| 493 self, | |
| 494 process: Process, | |
| 495 job_order_object: CWLObjectType, | |
| 496 runtime_context: RuntimeContext, | |
| 497 logger: Optional[logging.Logger] = None, | |
| 498 ) -> Tuple[Optional[CWLObjectType], str]: | |
| 499 return {}, "success" |
