Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/procgenerator.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
comparison
equal
deleted
inserted
replaced
1:75ca89e9b81c | 2:6af9afd405e9 |
---|---|
1 import copy | |
2 from .process import Process, shortname | |
3 from .load_tool import load_tool | |
4 from schema_salad import validate | |
5 from schema_salad.sourceline import indent | |
6 from .errors import WorkflowException | |
7 from .context import RuntimeContext, LoadingContext | |
8 from typing import (Any, Callable, Dict, Generator, Iterable, List, | |
9 Mapping, MutableMapping, MutableSequence, | |
10 Optional, Tuple, Union, cast) | |
11 from .loghandler import _logger | |
12 from typing_extensions import Text # pylint: disable=unused-import | |
13 | |
14 class ProcessGeneratorJob(object): | |
15 def __init__(self, procgenerator): | |
16 # type: (ProcessGenerator) -> None | |
17 self.procgenerator = procgenerator | |
18 self.jobout = None # type: Optional[Dict[Text, Any]] | |
19 self.processStatus = None # type: Optional[Text] | |
20 | |
21 def receive_output(self, jobout, processStatus): | |
22 # type: (Dict[Text, Any], Text) -> None | |
23 self.jobout = jobout | |
24 self.processStatus = processStatus | |
25 | |
26 def job(self, | |
27 job_order, # type: Mapping[Text, Any] | |
28 output_callbacks, # type: Callable[[Any, Any], Any] | |
29 runtimeContext # type: RuntimeContext | |
30 ): # type: (...) -> Generator[Any, None, None] | |
31 # FIXME: Declare base type for what Generator yields | |
32 | |
33 try: | |
34 for tool in self.procgenerator.embedded_tool.job( | |
35 job_order, | |
36 self.receive_output, | |
37 runtimeContext): | |
38 yield tool | |
39 | |
40 while self.processStatus is None: | |
41 yield None | |
42 | |
43 if self.processStatus != "success": | |
44 output_callbacks(self.jobout, self.processStatus) | |
45 return | |
46 | |
47 if self.jobout is None: | |
48 raise WorkflowException("jobout should not be None") | |
49 | |
50 created_tool, runinputs = self.procgenerator.result(job_order, self.jobout, runtimeContext) | |
51 | |
52 for tool in created_tool.job( | |
53 runinputs, | |
54 output_callbacks, | |
55 runtimeContext): | |
56 yield tool | |
57 | |
58 except WorkflowException: | |
59 raise | |
60 except Exception as exc: | |
61 _logger.exception("Unexpected exception") | |
62 raise WorkflowException(Text(exc)) | |
63 | |
64 | |
65 class ProcessGenerator(Process): | |
66 def __init__(self, | |
67 toolpath_object, # type: MutableMapping[Text, Any] | |
68 loadingContext # type: LoadingContext | |
69 ): # type: (...) -> None | |
70 super(ProcessGenerator, self).__init__( | |
71 toolpath_object, loadingContext) | |
72 self.loadingContext = loadingContext # type: LoadingContext | |
73 try: | |
74 if isinstance(toolpath_object["run"], MutableMapping): | |
75 self.embedded_tool = loadingContext.construct_tool_object( | |
76 toolpath_object["run"], loadingContext) # type: Process | |
77 else: | |
78 loadingContext.metadata = {} | |
79 self.embedded_tool = load_tool( | |
80 toolpath_object["run"], loadingContext) | |
81 except validate.ValidationException as vexc: | |
82 if loadingContext.debug: | |
83 _logger.exception("Validation exception") | |
84 raise WorkflowException( | |
85 u"Tool definition %s failed validation:\n%s" % | |
86 (toolpath_object["run"], indent(str(vexc)))) | |
87 | |
88 def job(self, | |
89 job_order, # type: Mapping[Text, Text] | |
90 output_callbacks, # type: Callable[[Any, Any], Any] | |
91 runtimeContext # type: RuntimeContext | |
92 ): # type: (...) -> Generator[Any, None, None] | |
93 # FIXME: Declare base type for what Generator yields | |
94 return ProcessGeneratorJob(self).job(job_order, output_callbacks, runtimeContext) | |
95 | |
96 def result(self, | |
97 job_order, # type: Mapping[Text, Any] | |
98 jobout, # type: Mapping[Text, Any] | |
99 runtimeContext # type: RuntimeContext | |
100 ): # type: (...) -> Tuple[Process, MutableMapping[Text, Any]] | |
101 try: | |
102 loadingContext = self.loadingContext.copy() | |
103 loadingContext.metadata = {} | |
104 embedded_tool = load_tool( | |
105 jobout["runProcess"]["location"], loadingContext) | |
106 except validate.ValidationException as vexc: | |
107 if runtimeContext.debug: | |
108 _logger.exception("Validation exception") | |
109 raise WorkflowException( | |
110 u"Tool definition %s failed validation:\n%s" % | |
111 (jobout["runProcess"], indent(str(vexc)))) | |
112 | |
113 if "runInputs" in jobout: | |
114 runinputs = cast(MutableMapping[Text, Any], jobout["runInputs"]) | |
115 else: | |
116 runinputs = cast(MutableMapping[Text, Any], copy.deepcopy(job_order)) | |
117 for i in self.embedded_tool.tool["inputs"]: | |
118 if shortname(i["id"]) in runinputs: | |
119 del runinputs[shortname(i["id"])] | |
120 if "id" in runinputs: | |
121 del runinputs["id"] | |
122 | |
123 return embedded_tool, runinputs |