Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/checker.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
comparison
equal
deleted
inserted
replaced
4:79f47841a781 | 5:9b1c78e6ba9c |
---|---|
1 """Static checking of CWL workflow connectivity.""" | |
2 from collections import namedtuple | |
3 from typing import Any, Dict, List, MutableMapping, MutableSequence, Optional | |
4 | |
5 import six | |
6 from schema_salad import validate | |
7 from schema_salad.sourceline import SourceLine, bullets, strip_dup_lineno | |
8 from typing_extensions import Text # pylint: disable=unused-import | |
9 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
10 | |
11 from .errors import WorkflowException | |
12 from .loghandler import _logger | |
13 from .process import shortname | |
14 from .utils import json_dumps | |
15 | |
16 | |
17 def _get_type(tp): | |
18 # type: (Any) -> Any | |
19 if isinstance(tp, MutableMapping): | |
20 if tp.get("type") not in ("array", "record", "enum"): | |
21 return tp["type"] | |
22 return tp | |
23 | |
24 def check_types(srctype, sinktype, linkMerge, valueFrom): | |
25 # type: (Any, Any, Optional[Text], Optional[Text]) -> Text | |
26 """ | |
27 Check if the source and sink types are correct. | |
28 | |
29 Acceptable types are "pass", "warning", or "exception". | |
30 """ | |
31 if valueFrom is not None: | |
32 return "pass" | |
33 if linkMerge is None: | |
34 if can_assign_src_to_sink(srctype, sinktype, strict=True): | |
35 return "pass" | |
36 if can_assign_src_to_sink(srctype, sinktype, strict=False): | |
37 return "warning" | |
38 return "exception" | |
39 if linkMerge == "merge_nested": | |
40 return check_types({"items": _get_type(srctype), "type": "array"}, | |
41 _get_type(sinktype), None, None) | |
42 if linkMerge == "merge_flattened": | |
43 return check_types(merge_flatten_type(_get_type(srctype)), _get_type(sinktype), None, None) | |
44 raise WorkflowException(u"Unrecognized linkMerge enum '{}'".format(linkMerge)) | |
45 | |
46 | |
47 def merge_flatten_type(src): | |
48 # type: (Any) -> Any | |
49 """Return the merge flattened type of the source type.""" | |
50 if isinstance(src, MutableSequence): | |
51 return [merge_flatten_type(t) for t in src] | |
52 if isinstance(src, MutableMapping) and src.get("type") == "array": | |
53 return src | |
54 return {"items": src, "type": "array"} | |
55 | |
56 | |
57 def can_assign_src_to_sink(src, sink, strict=False): # type: (Any, Any, bool) -> bool | |
58 """ | |
59 Check for identical type specifications, ignoring extra keys like inputBinding. | |
60 | |
61 src: admissible source types | |
62 sink: admissible sink types | |
63 | |
64 In non-strict comparison, at least one source type must match one sink type. | |
65 In strict comparison, all source types must match at least one sink type. | |
66 """ | |
67 if src == "Any" or sink == "Any": | |
68 return True | |
69 if isinstance(src, MutableMapping) and isinstance(sink, MutableMapping): | |
70 if sink.get("not_connected") and strict: | |
71 return False | |
72 if src["type"] == "array" and sink["type"] == "array": | |
73 return can_assign_src_to_sink(src["items"], sink["items"], strict) | |
74 if src["type"] == "record" and sink["type"] == "record": | |
75 return _compare_records(src, sink, strict) | |
76 if src["type"] == "File" and sink["type"] == "File": | |
77 for sinksf in sink.get("secondaryFiles", []): | |
78 if not [1 for srcsf in src.get("secondaryFiles", []) if sinksf == srcsf]: | |
79 if strict: | |
80 return False | |
81 return True | |
82 return can_assign_src_to_sink(src["type"], sink["type"], strict) | |
83 if isinstance(src, MutableSequence): | |
84 if strict: | |
85 for this_src in src: | |
86 if not can_assign_src_to_sink(this_src, sink): | |
87 return False | |
88 return True | |
89 for this_src in src: | |
90 if can_assign_src_to_sink(this_src, sink): | |
91 return True | |
92 return False | |
93 if isinstance(sink, MutableSequence): | |
94 for this_sink in sink: | |
95 if can_assign_src_to_sink(src, this_sink): | |
96 return True | |
97 return False | |
98 return bool(src == sink) | |
99 | |
100 | |
101 def _compare_records(src, sink, strict=False): | |
102 # type: (MutableMapping[Text, Any], MutableMapping[Text, Any], bool) -> bool | |
103 """ | |
104 Compare two records, ensuring they have compatible fields. | |
105 | |
106 This handles normalizing record names, which will be relative to workflow | |
107 step, so that they can be compared. | |
108 """ | |
109 def _rec_fields(rec): # type: (MutableMapping[Text, Any]) -> MutableMapping[Text, Any] | |
110 out = {} | |
111 for field in rec["fields"]: | |
112 name = shortname(field["name"]) | |
113 out[name] = field["type"] | |
114 return out | |
115 | |
116 srcfields = _rec_fields(src) | |
117 sinkfields = _rec_fields(sink) | |
118 for key in six.iterkeys(sinkfields): | |
119 if (not can_assign_src_to_sink( | |
120 srcfields.get(key, "null"), sinkfields.get(key, "null"), strict) | |
121 and sinkfields.get(key) is not None): | |
122 _logger.info("Record comparison failure for %s and %s\n" | |
123 "Did not match fields for %s: %s and %s", | |
124 src["name"], sink["name"], key, srcfields.get(key), | |
125 sinkfields.get(key)) | |
126 return False | |
127 return True | |
128 | |
129 def missing_subset(fullset, subset): # type: (List[Any], List[Any]) -> List[Any] | |
130 missing = [] | |
131 for i in subset: | |
132 if i not in fullset: | |
133 missing.append(i) | |
134 return missing | |
135 | |
136 def static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs, param_to_step): | |
137 # type: (List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], Dict[Text, Dict[Text, Any]]) -> None | |
138 """Check if all source and sink types of a workflow are compatible before run time.""" | |
139 # source parameters: workflow_inputs and step_outputs | |
140 # sink parameters: step_inputs and workflow_outputs | |
141 | |
142 # make a dictionary of source parameters, indexed by the "id" field | |
143 src_parms = workflow_inputs + step_outputs | |
144 src_dict = {} | |
145 for parm in src_parms: | |
146 src_dict[parm["id"]] = parm | |
147 | |
148 step_inputs_val = check_all_types(src_dict, step_inputs, "source") | |
149 workflow_outputs_val = check_all_types(src_dict, workflow_outputs, "outputSource") | |
150 | |
151 warnings = step_inputs_val["warning"] + workflow_outputs_val["warning"] | |
152 exceptions = step_inputs_val["exception"] + workflow_outputs_val["exception"] | |
153 | |
154 warning_msgs = [] | |
155 exception_msgs = [] | |
156 for warning in warnings: | |
157 src = warning.src | |
158 sink = warning.sink | |
159 linkMerge = warning.linkMerge | |
160 sinksf = sorted([p["pattern"] for p in sink.get("secondaryFiles", []) if p.get("required", True)]) | |
161 srcsf = sorted([p["pattern"] for p in src.get("secondaryFiles", [])]) | |
162 # Every secondaryFile required by the sink, should be declared | |
163 # by the source | |
164 missing = missing_subset(srcsf, sinksf) | |
165 if missing: | |
166 msg1 = "Parameter '%s' requires secondaryFiles %s but" % (shortname(sink["id"]), missing) | |
167 msg3 = SourceLine(src, "id").makeError( | |
168 "source '%s' does not provide those secondaryFiles." % (shortname(src["id"]))) | |
169 msg4 = SourceLine(src.get("_tool_entry", src), "secondaryFiles").makeError("To resolve, add missing secondaryFiles patterns to definition of '%s' or" % (shortname(src["id"]))) | |
170 msg5 = SourceLine(sink.get("_tool_entry", sink), "secondaryFiles").makeError("mark missing secondaryFiles in definition of '%s' as optional." % shortname(sink["id"])) | |
171 msg = SourceLine(sink).makeError("%s\n%s" % (msg1, bullets([msg3, msg4, msg5], " "))) | |
172 elif sink.get("not_connected"): | |
173 msg = SourceLine(sink, "type").makeError( | |
174 "'%s' is not an input parameter of %s, expected %s" | |
175 % (shortname(sink["id"]), param_to_step[sink["id"]]["run"], | |
176 ", ".join(shortname(s["id"]) | |
177 for s in param_to_step[sink["id"]]["inputs"] | |
178 if not s.get("not_connected")))) | |
179 else: | |
180 msg = SourceLine(src, "type").makeError( | |
181 "Source '%s' of type %s may be incompatible" | |
182 % (shortname(src["id"]), json_dumps(src["type"]))) + "\n" + \ | |
183 SourceLine(sink, "type").makeError( | |
184 " with sink '%s' of type %s" | |
185 % (shortname(sink["id"]), json_dumps(sink["type"]))) | |
186 if linkMerge is not None: | |
187 msg += "\n" + SourceLine(sink).makeError(" source has linkMerge method %s" % linkMerge) | |
188 | |
189 warning_msgs.append(msg) | |
190 for exception in exceptions: | |
191 src = exception.src | |
192 sink = exception.sink | |
193 linkMerge = exception.linkMerge | |
194 msg = SourceLine(src, "type").makeError( | |
195 "Source '%s' of type %s is incompatible" | |
196 % (shortname(src["id"]), json_dumps(src["type"]))) + "\n" + \ | |
197 SourceLine(sink, "type").makeError( | |
198 " with sink '%s' of type %s" | |
199 % (shortname(sink["id"]), json_dumps(sink["type"]))) | |
200 if linkMerge is not None: | |
201 msg += "\n" + SourceLine(sink).makeError(" source has linkMerge method %s" % linkMerge) | |
202 exception_msgs.append(msg) | |
203 | |
204 for sink in step_inputs: | |
205 if ('null' != sink["type"] and 'null' not in sink["type"] | |
206 and "source" not in sink and "default" not in sink and "valueFrom" not in sink): | |
207 msg = SourceLine(sink).makeError( | |
208 "Required parameter '%s' does not have source, default, or valueFrom expression" | |
209 % shortname(sink["id"])) | |
210 exception_msgs.append(msg) | |
211 | |
212 all_warning_msg = strip_dup_lineno("\n".join(warning_msgs)) | |
213 all_exception_msg = strip_dup_lineno("\n".join(exception_msgs)) | |
214 | |
215 if warnings: | |
216 _logger.warning("Workflow checker warning:\n%s", all_warning_msg) | |
217 if exceptions: | |
218 raise validate.ValidationException(all_exception_msg) | |
219 | |
220 | |
221 SrcSink = namedtuple("SrcSink", ["src", "sink", "linkMerge"]) | |
222 | |
223 def check_all_types(src_dict, sinks, sourceField): | |
224 # type: (Dict[Text, Any], List[Dict[Text, Any]], Text) -> Dict[Text, List[SrcSink]] | |
225 """ | |
226 Given a list of sinks, check if their types match with the types of their sources. | |
227 | |
228 sourceField is either "soure" or "outputSource" | |
229 """ | |
230 validation = {"warning": [], "exception": []} # type: Dict[Text, List[SrcSink]] | |
231 for sink in sinks: | |
232 if sourceField in sink: | |
233 valueFrom = sink.get("valueFrom") | |
234 if isinstance(sink[sourceField], MutableSequence): | |
235 srcs_of_sink = [src_dict[parm_id] for parm_id in sink[sourceField]] | |
236 linkMerge = sink.get("linkMerge", ("merge_nested" | |
237 if len(sink[sourceField]) > 1 else None)) | |
238 else: | |
239 parm_id = sink[sourceField] | |
240 srcs_of_sink = [src_dict[parm_id]] | |
241 linkMerge = None | |
242 for src in srcs_of_sink: | |
243 check_result = check_types(src, sink, linkMerge, valueFrom) | |
244 if check_result == "warning": | |
245 validation["warning"].append(SrcSink(src, sink, linkMerge)) | |
246 elif check_result == "exception": | |
247 validation["exception"].append(SrcSink(src, sink, linkMerge)) | |
248 return validation |