comparison env/lib/python3.7/site-packages/cwltool/procgenerator.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 import copy
2 from .process import Process, shortname
3 from .load_tool import load_tool
4 from schema_salad import validate
5 from schema_salad.sourceline import indent
6 from .errors import WorkflowException
7 from .context import RuntimeContext, LoadingContext
8 from typing import (Any, Callable, Dict, Generator, Iterable, List,
9 Mapping, MutableMapping, MutableSequence,
10 Optional, Tuple, Union, cast)
11 from .loghandler import _logger
12 from typing_extensions import Text # pylint: disable=unused-import
13
14 class ProcessGeneratorJob(object):
15 def __init__(self, procgenerator):
16 # type: (ProcessGenerator) -> None
17 self.procgenerator = procgenerator
18 self.jobout = None # type: Optional[Dict[Text, Any]]
19 self.processStatus = None # type: Optional[Text]
20
21 def receive_output(self, jobout, processStatus):
22 # type: (Dict[Text, Any], Text) -> None
23 self.jobout = jobout
24 self.processStatus = processStatus
25
26 def job(self,
27 job_order, # type: Mapping[Text, Any]
28 output_callbacks, # type: Callable[[Any, Any], Any]
29 runtimeContext # type: RuntimeContext
30 ): # type: (...) -> Generator[Any, None, None]
31 # FIXME: Declare base type for what Generator yields
32
33 try:
34 for tool in self.procgenerator.embedded_tool.job(
35 job_order,
36 self.receive_output,
37 runtimeContext):
38 yield tool
39
40 while self.processStatus is None:
41 yield None
42
43 if self.processStatus != "success":
44 output_callbacks(self.jobout, self.processStatus)
45 return
46
47 if self.jobout is None:
48 raise WorkflowException("jobout should not be None")
49
50 created_tool, runinputs = self.procgenerator.result(job_order, self.jobout, runtimeContext)
51
52 for tool in created_tool.job(
53 runinputs,
54 output_callbacks,
55 runtimeContext):
56 yield tool
57
58 except WorkflowException:
59 raise
60 except Exception as exc:
61 _logger.exception("Unexpected exception")
62 raise WorkflowException(Text(exc))
63
64
65 class ProcessGenerator(Process):
66 def __init__(self,
67 toolpath_object, # type: MutableMapping[Text, Any]
68 loadingContext # type: LoadingContext
69 ): # type: (...) -> None
70 super(ProcessGenerator, self).__init__(
71 toolpath_object, loadingContext)
72 self.loadingContext = loadingContext # type: LoadingContext
73 try:
74 if isinstance(toolpath_object["run"], MutableMapping):
75 self.embedded_tool = loadingContext.construct_tool_object(
76 toolpath_object["run"], loadingContext) # type: Process
77 else:
78 loadingContext.metadata = {}
79 self.embedded_tool = load_tool(
80 toolpath_object["run"], loadingContext)
81 except validate.ValidationException as vexc:
82 if loadingContext.debug:
83 _logger.exception("Validation exception")
84 raise WorkflowException(
85 u"Tool definition %s failed validation:\n%s" %
86 (toolpath_object["run"], indent(str(vexc))))
87
88 def job(self,
89 job_order, # type: Mapping[Text, Text]
90 output_callbacks, # type: Callable[[Any, Any], Any]
91 runtimeContext # type: RuntimeContext
92 ): # type: (...) -> Generator[Any, None, None]
93 # FIXME: Declare base type for what Generator yields
94 return ProcessGeneratorJob(self).job(job_order, output_callbacks, runtimeContext)
95
96 def result(self,
97 job_order, # type: Mapping[Text, Any]
98 jobout, # type: Mapping[Text, Any]
99 runtimeContext # type: RuntimeContext
100 ): # type: (...) -> Tuple[Process, MutableMapping[Text, Any]]
101 try:
102 loadingContext = self.loadingContext.copy()
103 loadingContext.metadata = {}
104 embedded_tool = load_tool(
105 jobout["runProcess"]["location"], loadingContext)
106 except validate.ValidationException as vexc:
107 if runtimeContext.debug:
108 _logger.exception("Validation exception")
109 raise WorkflowException(
110 u"Tool definition %s failed validation:\n%s" %
111 (jobout["runProcess"], indent(str(vexc))))
112
113 if "runInputs" in jobout:
114 runinputs = cast(MutableMapping[Text, Any], jobout["runInputs"])
115 else:
116 runinputs = cast(MutableMapping[Text, Any], copy.deepcopy(job_order))
117 for i in self.embedded_tool.tool["inputs"]:
118 if shortname(i["id"]) in runinputs:
119 del runinputs[shortname(i["id"])]
120 if "id" in runinputs:
121 del runinputs["id"]
122
123 return embedded_tool, runinputs