Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/workflow.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import copy | |
2 import datetime | |
3 import functools | |
4 import logging | |
5 import random | |
6 from typing import ( | |
7 Callable, | |
8 Dict, | |
9 List, | |
10 Mapping, | |
11 MutableMapping, | |
12 MutableSequence, | |
13 Optional, | |
14 cast, | |
15 ) | |
16 from uuid import UUID | |
17 | |
18 from ruamel.yaml.comments import CommentedMap | |
19 from schema_salad.exceptions import ValidationException | |
20 from schema_salad.sourceline import SourceLine, indent | |
21 | |
22 from . import command_line_tool, context, procgenerator | |
23 from .checker import static_checker | |
24 from .context import LoadingContext, RuntimeContext, getdefault | |
25 from .errors import WorkflowException | |
26 from .load_tool import load_tool | |
27 from .loghandler import _logger | |
28 from .process import Process, get_overrides, shortname | |
29 from .provenance_profile import ProvenanceProfile | |
30 from .utils import ( | |
31 CWLObjectType, | |
32 JobsGeneratorType, | |
33 OutputCallbackType, | |
34 StepType, | |
35 aslist, | |
36 ) | |
37 from .workflow_job import WorkflowJob | |
38 | |
39 | |
40 def default_make_tool( | |
41 toolpath_object: CommentedMap, loadingContext: LoadingContext | |
42 ) -> Process: | |
43 if not isinstance(toolpath_object, MutableMapping): | |
44 raise WorkflowException("Not a dict: '%s'" % toolpath_object) | |
45 if "class" in toolpath_object: | |
46 if toolpath_object["class"] == "CommandLineTool": | |
47 return command_line_tool.CommandLineTool(toolpath_object, loadingContext) | |
48 if toolpath_object["class"] == "ExpressionTool": | |
49 return command_line_tool.ExpressionTool(toolpath_object, loadingContext) | |
50 if toolpath_object["class"] == "Workflow": | |
51 return Workflow(toolpath_object, loadingContext) | |
52 if toolpath_object["class"] == "ProcessGenerator": | |
53 return procgenerator.ProcessGenerator(toolpath_object, loadingContext) | |
54 if toolpath_object["class"] == "Operation": | |
55 return command_line_tool.AbstractOperation(toolpath_object, loadingContext) | |
56 | |
57 raise WorkflowException( | |
58 "Missing or invalid 'class' field in " | |
59 "%s, expecting one of: CommandLineTool, ExpressionTool, Workflow" | |
60 % toolpath_object["id"] | |
61 ) | |
62 | |
63 | |
64 context.default_make_tool = default_make_tool | |
65 | |
66 | |
67 class Workflow(Process): | |
68 def __init__( | |
69 self, | |
70 toolpath_object: CommentedMap, | |
71 loadingContext: LoadingContext, | |
72 ) -> None: | |
73 """Initialize this Workflow.""" | |
74 super().__init__(toolpath_object, loadingContext) | |
75 self.provenance_object = None # type: Optional[ProvenanceProfile] | |
76 if loadingContext.research_obj is not None: | |
77 run_uuid = None # type: Optional[UUID] | |
78 is_main = not loadingContext.prov_obj # Not yet set | |
79 if is_main: | |
80 run_uuid = loadingContext.research_obj.ro_uuid | |
81 | |
82 self.provenance_object = ProvenanceProfile( | |
83 loadingContext.research_obj, | |
84 full_name=loadingContext.cwl_full_name, | |
85 host_provenance=loadingContext.host_provenance, | |
86 user_provenance=loadingContext.user_provenance, | |
87 orcid=loadingContext.orcid, | |
88 run_uuid=run_uuid, | |
89 fsaccess=loadingContext.research_obj.fsaccess, | |
90 ) # inherit RO UUID for main wf run | |
91 # TODO: Is Workflow(..) only called when we are the main workflow? | |
92 self.parent_wf = self.provenance_object | |
93 | |
94 # FIXME: Won't this overwrite prov_obj for nested workflows? | |
95 loadingContext.prov_obj = self.provenance_object | |
96 loadingContext = loadingContext.copy() | |
97 loadingContext.requirements = self.requirements | |
98 loadingContext.hints = self.hints | |
99 | |
100 self.steps = [] # type: List[WorkflowStep] | |
101 validation_errors = [] | |
102 for index, step in enumerate(self.tool.get("steps", [])): | |
103 try: | |
104 self.steps.append( | |
105 self.make_workflow_step( | |
106 step, index, loadingContext, loadingContext.prov_obj | |
107 ) | |
108 ) | |
109 except ValidationException as vexc: | |
110 if _logger.isEnabledFor(logging.DEBUG): | |
111 _logger.exception("Validation failed at") | |
112 validation_errors.append(vexc) | |
113 | |
114 if validation_errors: | |
115 raise ValidationException("\n".join(str(v) for v in validation_errors)) | |
116 | |
117 random.shuffle(self.steps) | |
118 | |
119 # statically validate data links instead of doing it at runtime. | |
120 workflow_inputs = self.tool["inputs"] | |
121 workflow_outputs = self.tool["outputs"] | |
122 | |
123 step_inputs = [] # type: List[CWLObjectType] | |
124 step_outputs = [] # type: List[CWLObjectType] | |
125 param_to_step = {} # type: Dict[str, CWLObjectType] | |
126 for step in self.steps: | |
127 step_inputs.extend(step.tool["inputs"]) | |
128 step_outputs.extend(step.tool["outputs"]) | |
129 for s in step.tool["inputs"]: | |
130 param_to_step[s["id"]] = step.tool | |
131 for s in step.tool["outputs"]: | |
132 param_to_step[s["id"]] = step.tool | |
133 | |
134 if getdefault(loadingContext.do_validate, True): | |
135 static_checker( | |
136 workflow_inputs, | |
137 workflow_outputs, | |
138 step_inputs, | |
139 step_outputs, | |
140 param_to_step, | |
141 ) | |
142 | |
143 def make_workflow_step( | |
144 self, | |
145 toolpath_object: CommentedMap, | |
146 pos: int, | |
147 loadingContext: LoadingContext, | |
148 parentworkflowProv: Optional[ProvenanceProfile] = None, | |
149 ) -> "WorkflowStep": | |
150 return WorkflowStep(toolpath_object, pos, loadingContext, parentworkflowProv) | |
151 | |
152 def job( | |
153 self, | |
154 job_order: CWLObjectType, | |
155 output_callbacks: Optional[OutputCallbackType], | |
156 runtimeContext: RuntimeContext, | |
157 ) -> JobsGeneratorType: | |
158 builder = self._init_job(job_order, runtimeContext) | |
159 | |
160 if runtimeContext.research_obj is not None: | |
161 if runtimeContext.toplevel: | |
162 # Record primary-job.json | |
163 runtimeContext.research_obj.fsaccess = runtimeContext.make_fs_access("") | |
164 runtimeContext.research_obj.create_job(builder.job) | |
165 | |
166 job = WorkflowJob(self, runtimeContext) | |
167 yield job | |
168 | |
169 runtimeContext = runtimeContext.copy() | |
170 runtimeContext.part_of = "workflow %s" % job.name | |
171 runtimeContext.toplevel = False | |
172 | |
173 yield from job.job(builder.job, output_callbacks, runtimeContext) | |
174 | |
175 def visit(self, op: Callable[[CommentedMap], None]) -> None: | |
176 op(self.tool) | |
177 for step in self.steps: | |
178 step.visit(op) | |
179 | |
180 | |
181 def used_by_step(step: StepType, shortinputid: str) -> bool: | |
182 for st in cast(MutableSequence[CWLObjectType], step["in"]): | |
183 if st.get("valueFrom"): | |
184 if ("inputs.%s" % shortinputid) in cast(str, st.get("valueFrom")): | |
185 return True | |
186 if step.get("when"): | |
187 if ("inputs.%s" % shortinputid) in cast(str, step.get("when")): | |
188 return True | |
189 return False | |
190 | |
191 | |
192 class WorkflowStep(Process): | |
193 def __init__( | |
194 self, | |
195 toolpath_object: CommentedMap, | |
196 pos: int, | |
197 loadingContext: LoadingContext, | |
198 parentworkflowProv: Optional[ProvenanceProfile] = None, | |
199 ) -> None: | |
200 """Initialize this WorkflowStep.""" | |
201 if "id" in toolpath_object: | |
202 self.id = toolpath_object["id"] | |
203 else: | |
204 self.id = "#step" + str(pos) | |
205 | |
206 loadingContext = loadingContext.copy() | |
207 | |
208 loadingContext.requirements = copy.deepcopy( | |
209 getdefault(loadingContext.requirements, []) | |
210 ) | |
211 assert loadingContext.requirements is not None # nosec | |
212 loadingContext.requirements.extend(toolpath_object.get("requirements", [])) | |
213 loadingContext.requirements.extend( | |
214 cast( | |
215 List[CWLObjectType], | |
216 get_overrides( | |
217 getdefault(loadingContext.overrides_list, []), self.id | |
218 ).get("requirements", []), | |
219 ) | |
220 ) | |
221 | |
222 hints = copy.deepcopy(getdefault(loadingContext.hints, [])) | |
223 hints.extend(toolpath_object.get("hints", [])) | |
224 loadingContext.hints = hints | |
225 | |
226 try: | |
227 if isinstance(toolpath_object["run"], CommentedMap): | |
228 self.embedded_tool = loadingContext.construct_tool_object( | |
229 toolpath_object["run"], loadingContext | |
230 ) # type: Process | |
231 else: | |
232 loadingContext.metadata = {} | |
233 self.embedded_tool = load_tool(toolpath_object["run"], loadingContext) | |
234 except ValidationException as vexc: | |
235 if loadingContext.debug: | |
236 _logger.exception("Validation exception") | |
237 raise WorkflowException( | |
238 "Tool definition %s failed validation:\n%s" | |
239 % (toolpath_object["run"], indent(str(vexc))) | |
240 ) from vexc | |
241 | |
242 validation_errors = [] | |
243 self.tool = toolpath_object = copy.deepcopy(toolpath_object) | |
244 bound = set() | |
245 for stepfield, toolfield in (("in", "inputs"), ("out", "outputs")): | |
246 toolpath_object[toolfield] = [] | |
247 for index, step_entry in enumerate(toolpath_object[stepfield]): | |
248 if isinstance(step_entry, str): | |
249 param = CommentedMap() # type: CommentedMap | |
250 inputid = step_entry | |
251 else: | |
252 param = CommentedMap(step_entry.items()) | |
253 inputid = step_entry["id"] | |
254 | |
255 shortinputid = shortname(inputid) | |
256 found = False | |
257 for tool_entry in self.embedded_tool.tool[toolfield]: | |
258 frag = shortname(tool_entry["id"]) | |
259 if frag == shortinputid: | |
260 # if the case that the step has a default for a parameter, | |
261 # we do not want the default of the tool to override it | |
262 step_default = None | |
263 if "default" in param and "default" in tool_entry: | |
264 step_default = param["default"] | |
265 param.update(tool_entry) | |
266 param["_tool_entry"] = tool_entry | |
267 if step_default is not None: | |
268 param["default"] = step_default | |
269 found = True | |
270 bound.add(frag) | |
271 break | |
272 if not found: | |
273 if stepfield == "in": | |
274 param["type"] = "Any" | |
275 param["used_by_step"] = used_by_step(self.tool, shortinputid) | |
276 param["not_connected"] = True | |
277 else: | |
278 if isinstance(step_entry, Mapping): | |
279 step_entry_name = step_entry["id"] | |
280 else: | |
281 step_entry_name = step_entry | |
282 validation_errors.append( | |
283 SourceLine(self.tool["out"], index).makeError( | |
284 "Workflow step output '%s' does not correspond to" | |
285 % shortname(step_entry_name) | |
286 ) | |
287 + "\n" | |
288 + SourceLine(self.embedded_tool.tool, "outputs").makeError( | |
289 " tool output (expected '%s')" | |
290 % ( | |
291 "', '".join( | |
292 [ | |
293 shortname(tool_entry["id"]) | |
294 for tool_entry in self.embedded_tool.tool[ | |
295 "outputs" | |
296 ] | |
297 ] | |
298 ) | |
299 ) | |
300 ) | |
301 ) | |
302 param["id"] = inputid | |
303 param.lc.line = toolpath_object[stepfield].lc.data[index][0] | |
304 param.lc.col = toolpath_object[stepfield].lc.data[index][1] | |
305 param.lc.filename = toolpath_object[stepfield].lc.filename | |
306 toolpath_object[toolfield].append(param) | |
307 | |
308 missing_values = [] | |
309 for _, tool_entry in enumerate(self.embedded_tool.tool["inputs"]): | |
310 if shortname(tool_entry["id"]) not in bound: | |
311 if "null" not in tool_entry["type"] and "default" not in tool_entry: | |
312 missing_values.append(shortname(tool_entry["id"])) | |
313 | |
314 if missing_values: | |
315 validation_errors.append( | |
316 SourceLine(self.tool, "in").makeError( | |
317 "Step is missing required parameter%s '%s'" | |
318 % ( | |
319 "s" if len(missing_values) > 1 else "", | |
320 "', '".join(missing_values), | |
321 ) | |
322 ) | |
323 ) | |
324 | |
325 if validation_errors: | |
326 raise ValidationException("\n".join(validation_errors)) | |
327 | |
328 super().__init__(toolpath_object, loadingContext) | |
329 | |
330 if self.embedded_tool.tool["class"] == "Workflow": | |
331 (feature, _) = self.get_requirement("SubworkflowFeatureRequirement") | |
332 if not feature: | |
333 raise WorkflowException( | |
334 "Workflow contains embedded workflow but " | |
335 "SubworkflowFeatureRequirement not in requirements" | |
336 ) | |
337 | |
338 if "scatter" in self.tool: | |
339 (feature, _) = self.get_requirement("ScatterFeatureRequirement") | |
340 if not feature: | |
341 raise WorkflowException( | |
342 "Workflow contains scatter but ScatterFeatureRequirement " | |
343 "not in requirements" | |
344 ) | |
345 | |
346 inputparms = copy.deepcopy(self.tool["inputs"]) | |
347 outputparms = copy.deepcopy(self.tool["outputs"]) | |
348 scatter = aslist(self.tool["scatter"]) | |
349 | |
350 method = self.tool.get("scatterMethod") | |
351 if method is None and len(scatter) != 1: | |
352 raise ValidationException( | |
353 "Must specify scatterMethod when scattering over multiple inputs" | |
354 ) | |
355 | |
356 inp_map = {i["id"]: i for i in inputparms} | |
357 for inp in scatter: | |
358 if inp not in inp_map: | |
359 raise ValidationException( | |
360 SourceLine(self.tool, "scatter").makeError( | |
361 "Scatter parameter '%s' does not correspond to " | |
362 "an input parameter of this step, expecting '%s'" | |
363 % ( | |
364 shortname(inp), | |
365 "', '".join(shortname(k) for k in inp_map.keys()), | |
366 ) | |
367 ) | |
368 ) | |
369 | |
370 inp_map[inp]["type"] = {"type": "array", "items": inp_map[inp]["type"]} | |
371 | |
372 if self.tool.get("scatterMethod") == "nested_crossproduct": | |
373 nesting = len(scatter) | |
374 else: | |
375 nesting = 1 | |
376 | |
377 for _ in range(0, nesting): | |
378 for oparam in outputparms: | |
379 oparam["type"] = {"type": "array", "items": oparam["type"]} | |
380 self.tool["inputs"] = inputparms | |
381 self.tool["outputs"] = outputparms | |
382 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
383 if loadingContext.research_obj is not None: | |
384 self.prov_obj = parentworkflowProv | |
385 if self.embedded_tool.tool["class"] == "Workflow": | |
386 self.parent_wf = self.embedded_tool.parent_wf | |
387 else: | |
388 self.parent_wf = self.prov_obj | |
389 | |
390 def receive_output( | |
391 self, | |
392 output_callback: OutputCallbackType, | |
393 jobout: CWLObjectType, | |
394 processStatus: str, | |
395 ) -> None: | |
396 output = {} | |
397 for i in self.tool["outputs"]: | |
398 field = shortname(i["id"]) | |
399 if field in jobout: | |
400 output[i["id"]] = jobout[field] | |
401 else: | |
402 processStatus = "permanentFail" | |
403 output_callback(output, processStatus) | |
404 | |
405 def job( | |
406 self, | |
407 job_order: CWLObjectType, | |
408 output_callbacks: Optional[OutputCallbackType], | |
409 runtimeContext: RuntimeContext, | |
410 ) -> JobsGeneratorType: | |
411 """Initialize sub-workflow as a step in the parent profile.""" | |
412 if ( | |
413 self.embedded_tool.tool["class"] == "Workflow" | |
414 and runtimeContext.research_obj | |
415 and self.prov_obj | |
416 and self.embedded_tool.provenance_object | |
417 ): | |
418 self.embedded_tool.parent_wf = self.prov_obj | |
419 process_name = self.tool["id"].split("#")[1] | |
420 self.prov_obj.start_process( | |
421 process_name, | |
422 datetime.datetime.now(), | |
423 self.embedded_tool.provenance_object.workflow_run_uri, | |
424 ) | |
425 | |
426 step_input = {} | |
427 for inp in self.tool["inputs"]: | |
428 field = shortname(inp["id"]) | |
429 if not inp.get("not_connected"): | |
430 step_input[field] = job_order[inp["id"]] | |
431 | |
432 try: | |
433 yield from self.embedded_tool.job( | |
434 step_input, | |
435 functools.partial(self.receive_output, output_callbacks), | |
436 runtimeContext, | |
437 ) | |
438 except WorkflowException: | |
439 _logger.error("Exception on step '%s'", runtimeContext.name) | |
440 raise | |
441 except Exception as exc: | |
442 _logger.exception("Unexpected exception") | |
443 raise WorkflowException(str(exc)) from exc | |
444 | |
445 def visit(self, op: Callable[[CommentedMap], None]) -> None: | |
446 self.embedded_tool.visit(op) |