comparison planemo/lib/python3.7/site-packages/cwltool/executors.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d30785e31577
1 # -*- coding: utf-8 -*-
2 """ Single and multi-threaded executors."""
3 import datetime
4 import os
5 import tempfile
6 import threading
7 import logging
8 from threading import Lock
9 from abc import ABCMeta, abstractmethod
10 from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
11
12 import psutil
13 from six import string_types, with_metaclass
14 from typing_extensions import Text # pylint: disable=unused-import
15 from future.utils import raise_from
16 from schema_salad.validate import ValidationException
17
18 from .builder import Builder # pylint: disable=unused-import
19 from .context import (RuntimeContext, # pylint: disable=unused-import
20 getdefault)
21 from .errors import WorkflowException
22 from .job import JobBase # pylint: disable=unused-import
23 from .loghandler import _logger
24 from .mutation import MutationManager
25 from .process import Process # pylint: disable=unused-import
26 from .process import cleanIntermediate, relocateOutputs
27 from .provenance import ProvenanceProfile
28 from .utils import DEFAULT_TMP_PREFIX
29 from .workflow import Workflow, WorkflowJob, WorkflowJobStep
30 from .command_line_tool import CallbackJob
31
32 TMPDIR_LOCK = Lock()
33
34
35 class JobExecutor(with_metaclass(ABCMeta, object)):
36 """Abstract base job executor."""
37
38 def __init__(self):
39 # type: (...) -> None
40 """Initialize."""
41 self.final_output = [] # type: List[Union[Dict[Text, Any], List[Dict[Text, Any]]]]
42 self.final_status = [] # type: List[Text]
43 self.output_dirs = set() # type: Set[Text]
44
45 def __call__(self, *args, **kwargs): # type: (*Any, **Any) -> Any
46 return self.execute(*args, **kwargs)
47
48 def output_callback(self, out, process_status): # type: (Dict[Text, Any], Text) -> None
49 """Collect the final status and outputs."""
50 self.final_status.append(process_status)
51 self.final_output.append(out)
52
53 @abstractmethod
54 def run_jobs(self,
55 process, # type: Process
56 job_order_object, # type: Dict[Text, Any]
57 logger, # type: logging.Logger
58 runtime_context # type: RuntimeContext
59 ): # type: (...) -> None
60 """Execute the jobs for the given Process."""
61
62 def execute(self,
63 process, # type: Process
64 job_order_object, # type: Dict[Text, Any]
65 runtime_context, # type: RuntimeContext
66 logger=_logger, # type: logging.Logger
67 ): # type: (...) -> Tuple[Optional[Union[Dict[Text, Any], List[Dict[Text, Any]]]], Text]
68 """Execute the process."""
69 if not runtime_context.basedir:
70 raise WorkflowException("Must provide 'basedir' in runtimeContext")
71
72 finaloutdir = None # Type: Optional[Text]
73 original_outdir = runtime_context.outdir
74 if isinstance(original_outdir, string_types):
75 finaloutdir = os.path.abspath(original_outdir)
76 runtime_context = runtime_context.copy()
77 outdir = tempfile.mkdtemp(
78 prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))
79 self.output_dirs.add(outdir)
80 runtime_context.outdir = outdir
81 runtime_context.mutation_manager = MutationManager()
82 runtime_context.toplevel = True
83 runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())
84
85 job_reqs = None
86 if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
87 if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
88 raise WorkflowException(
89 "`cwl:requirements` in the input object is not part of CWL "
90 "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
91 "can set the cwlVersion to v1.1")
92 job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
93 elif ("cwl:defaults" in process.metadata
94 and "https://w3id.org/cwl/cwl#requirements"
95 in process.metadata["cwl:defaults"]):
96 if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
97 raise WorkflowException(
98 "`cwl:requirements` in the input object is not part of CWL "
99 "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
100 "can set the cwlVersion to v1.1")
101 job_reqs = process.metadata["cwl:defaults"]["https://w3id.org/cwl/cwl#requirements"]
102 if job_reqs is not None:
103 for req in job_reqs:
104 process.requirements.append(req)
105
106 self.run_jobs(process, job_order_object, logger, runtime_context)
107
108 if self.final_output and self.final_output[0] is not None and finaloutdir is not None:
109 self.final_output[0] = relocateOutputs(
110 self.final_output[0], finaloutdir, self.output_dirs,
111 runtime_context.move_outputs, runtime_context.make_fs_access(""),
112 getdefault(runtime_context.compute_checksum, True),
113 path_mapper=runtime_context.path_mapper)
114
115 if runtime_context.rm_tmpdir:
116 if runtime_context.cachedir is None:
117 output_dirs = self.output_dirs # type: Iterable[Any]
118 else:
119 output_dirs = filter(lambda x: not x.startswith(
120 runtime_context.cachedir), self.output_dirs)
121 cleanIntermediate(output_dirs)
122
123 if self.final_output and self.final_status:
124
125 if runtime_context.research_obj is not None and \
126 isinstance(process, (JobBase, Process, WorkflowJobStep,
127 WorkflowJob)) and process.parent_wf:
128 process_run_id = None
129 name = "primary"
130 process.parent_wf.generate_output_prov(self.final_output[0],
131 process_run_id, name)
132 process.parent_wf.document.wasEndedBy(
133 process.parent_wf.workflow_run_uri, None, process.parent_wf.engine_uuid,
134 datetime.datetime.now())
135 process.parent_wf.finalize_prov_profile(name=None)
136 return (self.final_output[0], self.final_status[0])
137 return (None, "permanentFail")
138
139
140 class SingleJobExecutor(JobExecutor):
141 """Default single-threaded CWL reference executor."""
142
143 def run_jobs(self,
144 process, # type: Process
145 job_order_object, # type: Dict[Text, Any]
146 logger, # type: logging.Logger
147 runtime_context # type: RuntimeContext
148 ): # type: (...) -> None
149
150 process_run_id = None # type: Optional[str]
151
152 # define provenance profile for single commandline tool
153 if not isinstance(process, Workflow) \
154 and runtime_context.research_obj is not None:
155 process.provenance_object = ProvenanceProfile(
156 runtime_context.research_obj,
157 full_name=runtime_context.cwl_full_name,
158 host_provenance=False,
159 user_provenance=False,
160 orcid=runtime_context.orcid,
161 # single tool execution, so RO UUID = wf UUID = tool UUID
162 run_uuid=runtime_context.research_obj.ro_uuid,
163 fsaccess=runtime_context.make_fs_access(''))
164 process.parent_wf = process.provenance_object
165 jobiter = process.job(job_order_object, self.output_callback,
166 runtime_context)
167
168 try:
169 for job in jobiter:
170 if job is not None:
171 if runtime_context.builder is not None:
172 job.builder = runtime_context.builder
173 if job.outdir is not None:
174 self.output_dirs.add(job.outdir)
175 if runtime_context.research_obj is not None:
176 if not isinstance(process, Workflow):
177 prov_obj = process.provenance_object
178 else:
179 prov_obj = job.prov_obj
180 if prov_obj:
181 runtime_context.prov_obj = prov_obj
182 prov_obj.fsaccess = runtime_context.make_fs_access('')
183 prov_obj.evaluate(
184 process, job, job_order_object,
185 runtime_context.research_obj)
186 process_run_id =\
187 prov_obj.record_process_start(process, job)
188 runtime_context = runtime_context.copy()
189 runtime_context.process_run_id = process_run_id
190 job.run(runtime_context)
191 else:
192 logger.error("Workflow cannot make any more progress.")
193 break
194 except (ValidationException, WorkflowException): # pylint: disable=try-except-raise
195 raise
196 except Exception as err:
197 logger.exception("Got workflow error")
198 raise_from(WorkflowException(Text(err)), err)
199
200
201 class MultithreadedJobExecutor(JobExecutor):
202 """
203 Experimental multi-threaded CWL executor.
204
205 Does simple resource accounting, will not start a job unless it
206 has cores / ram available, but does not make any attempt to
207 optimize usage.
208 """
209
210 def __init__(self): # type: () -> None
211 """Initialize."""
212 super(MultithreadedJobExecutor, self).__init__()
213 self.threads = set() # type: Set[threading.Thread]
214 self.exceptions = [] # type: List[WorkflowException]
215 self.pending_jobs = [] # type: List[Union[JobBase, WorkflowJob]]
216 self.pending_jobs_lock = threading.Lock()
217
218 self.max_ram = int(psutil.virtual_memory().available / 2**20)
219 self.max_cores = psutil.cpu_count()
220 self.allocated_ram = 0
221 self.allocated_cores = 0
222
223 def select_resources(self, request, runtime_context): # pylint: disable=unused-argument
224 # type: (Dict[str, int], RuntimeContext) -> Dict[str, int]
225 """Naïve check for available cpu cores and memory."""
226 result = {} # type: Dict[str, int]
227 maxrsc = {
228 "cores": self.max_cores,
229 "ram": self.max_ram
230 }
231 for rsc in ("cores", "ram"):
232 if request[rsc+"Min"] > maxrsc[rsc]:
233 raise WorkflowException(
234 "Requested at least %d %s but only %d available" %
235 (request[rsc+"Min"], rsc, maxrsc[rsc]))
236 if request[rsc+"Max"] < maxrsc[rsc]:
237 result[rsc] = request[rsc+"Max"]
238 else:
239 result[rsc] = maxrsc[rsc]
240
241 return result
242
243 def _runner(self, job, runtime_context, TMPDIR_LOCK):
244 # type: (Union[JobBase, WorkflowJob, CallbackJob], RuntimeContext, threading.Lock) -> None
245 """Job running thread."""
246 try:
247 _logger.debug("job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format(job, runtime_context, TMPDIR_LOCK))
248 job.run(runtime_context, TMPDIR_LOCK)
249 except WorkflowException as err:
250 _logger.exception("Got workflow error")
251 self.exceptions.append(err)
252 except Exception as err: # pylint: disable=broad-except
253 _logger.exception("Got workflow error")
254 self.exceptions.append(WorkflowException(Text(err)))
255 finally:
256 if runtime_context.workflow_eval_lock:
257 with runtime_context.workflow_eval_lock:
258 self.threads.remove(threading.current_thread())
259 if isinstance(job, JobBase):
260 self.allocated_ram -= job.builder.resources["ram"]
261 self.allocated_cores -= job.builder.resources["cores"]
262 runtime_context.workflow_eval_lock.notifyAll()
263
264 def run_job(self,
265 job, # type: Union[JobBase, WorkflowJob, None]
266 runtime_context # type: RuntimeContext
267 ): # type: (...) -> None
268 """Execute a single Job in a seperate thread."""
269 if job is not None:
270 with self.pending_jobs_lock:
271 self.pending_jobs.append(job)
272
273 with self.pending_jobs_lock:
274 n = 0
275 while (n+1) <= len(self.pending_jobs):
276 job = self.pending_jobs[n]
277 if isinstance(job, JobBase):
278 if ((job.builder.resources["ram"])
279 > self.max_ram
280 or (job.builder.resources["cores"])
281 > self.max_cores):
282 _logger.error(
283 'Job "%s" cannot be run, requests more resources (%s) '
284 'than available on this host (max ram %d, max cores %d',
285 job.name, job.builder.resources,
286 self.allocated_ram,
287 self.allocated_cores,
288 self.max_ram,
289 self.max_cores)
290 self.pending_jobs.remove(job)
291 return
292
293 if ((self.allocated_ram + job.builder.resources["ram"])
294 > self.max_ram
295 or (self.allocated_cores + job.builder.resources["cores"])
296 > self.max_cores):
297 _logger.debug(
298 'Job "%s" cannot run yet, resources (%s) are not '
299 'available (already allocated ram is %d, allocated cores is %d, '
300 'max ram %d, max cores %d',
301 job.name, job.builder.resources,
302 self.allocated_ram,
303 self.allocated_cores,
304 self.max_ram,
305 self.max_cores)
306 n += 1
307 continue
308
309 thread = threading.Thread(target=self._runner, args=(job, runtime_context, TMPDIR_LOCK))
310 thread.daemon = True
311 self.threads.add(thread)
312 if isinstance(job, JobBase):
313 self.allocated_ram += job.builder.resources["ram"]
314 self.allocated_cores += job.builder.resources["cores"]
315 thread.start()
316 self.pending_jobs.remove(job)
317
318 def wait_for_next_completion(self, runtime_context):
319 # type: (RuntimeContext) -> None
320 """Wait for jobs to finish."""
321 if runtime_context.workflow_eval_lock is not None:
322 runtime_context.workflow_eval_lock.wait()
323 if self.exceptions:
324 raise self.exceptions[0]
325
326 def run_jobs(self,
327 process, # type: Process
328 job_order_object, # type: Dict[Text, Any]
329 logger, # type: logging.Logger
330 runtime_context # type: RuntimeContext
331 ): # type: (...) -> None
332
333 jobiter = process.job(job_order_object, self.output_callback,
334 runtime_context)
335
336 if runtime_context.workflow_eval_lock is None:
337 raise WorkflowException(
338 "runtimeContext.workflow_eval_lock must not be None")
339
340 runtime_context.workflow_eval_lock.acquire()
341 for job in jobiter:
342 if job is not None:
343 if isinstance(job, JobBase):
344 job.builder = runtime_context.builder or job.builder
345 if job.outdir is not None:
346 self.output_dirs.add(job.outdir)
347
348 self.run_job(job, runtime_context)
349
350 if job is None:
351 if self.threads:
352 self.wait_for_next_completion(runtime_context)
353 else:
354 logger.error("Workflow cannot make any more progress.")
355 break
356
357 self.run_job(None, runtime_context)
358 while self.threads:
359 self.wait_for_next_completion(runtime_context)
360 self.run_job(None, runtime_context)
361
362 runtime_context.workflow_eval_lock.release()