comparison env/lib/python3.7/site-packages/cwltool/checker.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """Static checking of CWL workflow connectivity."""
2 from collections import namedtuple
3 from typing import Any, Dict, List, MutableMapping, MutableSequence, Optional
4
5 import six
6 from schema_salad import validate
7 from schema_salad.sourceline import SourceLine, bullets, strip_dup_lineno
8 from typing_extensions import Text # pylint: disable=unused-import
9 # move to a regular typing import when Python 3.3-3.6 is no longer supported
10
11 from .errors import WorkflowException
12 from .loghandler import _logger
13 from .process import shortname
14 from .utils import json_dumps
15
16
17 def _get_type(tp):
18 # type: (Any) -> Any
19 if isinstance(tp, MutableMapping):
20 if tp.get("type") not in ("array", "record", "enum"):
21 return tp["type"]
22 return tp
23
24 def check_types(srctype, sinktype, linkMerge, valueFrom):
25 # type: (Any, Any, Optional[Text], Optional[Text]) -> Text
26 """
27 Check if the source and sink types are correct.
28
29 Acceptable types are "pass", "warning", or "exception".
30 """
31 if valueFrom is not None:
32 return "pass"
33 if linkMerge is None:
34 if can_assign_src_to_sink(srctype, sinktype, strict=True):
35 return "pass"
36 if can_assign_src_to_sink(srctype, sinktype, strict=False):
37 return "warning"
38 return "exception"
39 if linkMerge == "merge_nested":
40 return check_types({"items": _get_type(srctype), "type": "array"},
41 _get_type(sinktype), None, None)
42 if linkMerge == "merge_flattened":
43 return check_types(merge_flatten_type(_get_type(srctype)), _get_type(sinktype), None, None)
44 raise WorkflowException(u"Unrecognized linkMerge enum '{}'".format(linkMerge))
45
46
47 def merge_flatten_type(src):
48 # type: (Any) -> Any
49 """Return the merge flattened type of the source type."""
50 if isinstance(src, MutableSequence):
51 return [merge_flatten_type(t) for t in src]
52 if isinstance(src, MutableMapping) and src.get("type") == "array":
53 return src
54 return {"items": src, "type": "array"}
55
56
57 def can_assign_src_to_sink(src, sink, strict=False): # type: (Any, Any, bool) -> bool
58 """
59 Check for identical type specifications, ignoring extra keys like inputBinding.
60
61 src: admissible source types
62 sink: admissible sink types
63
64 In non-strict comparison, at least one source type must match one sink type.
65 In strict comparison, all source types must match at least one sink type.
66 """
67 if src == "Any" or sink == "Any":
68 return True
69 if isinstance(src, MutableMapping) and isinstance(sink, MutableMapping):
70 if sink.get("not_connected") and strict:
71 return False
72 if src["type"] == "array" and sink["type"] == "array":
73 return can_assign_src_to_sink(src["items"], sink["items"], strict)
74 if src["type"] == "record" and sink["type"] == "record":
75 return _compare_records(src, sink, strict)
76 if src["type"] == "File" and sink["type"] == "File":
77 for sinksf in sink.get("secondaryFiles", []):
78 if not [1 for srcsf in src.get("secondaryFiles", []) if sinksf == srcsf]:
79 if strict:
80 return False
81 return True
82 return can_assign_src_to_sink(src["type"], sink["type"], strict)
83 if isinstance(src, MutableSequence):
84 if strict:
85 for this_src in src:
86 if not can_assign_src_to_sink(this_src, sink):
87 return False
88 return True
89 for this_src in src:
90 if can_assign_src_to_sink(this_src, sink):
91 return True
92 return False
93 if isinstance(sink, MutableSequence):
94 for this_sink in sink:
95 if can_assign_src_to_sink(src, this_sink):
96 return True
97 return False
98 return bool(src == sink)
99
100
101 def _compare_records(src, sink, strict=False):
102 # type: (MutableMapping[Text, Any], MutableMapping[Text, Any], bool) -> bool
103 """
104 Compare two records, ensuring they have compatible fields.
105
106 This handles normalizing record names, which will be relative to workflow
107 step, so that they can be compared.
108 """
109 def _rec_fields(rec): # type: (MutableMapping[Text, Any]) -> MutableMapping[Text, Any]
110 out = {}
111 for field in rec["fields"]:
112 name = shortname(field["name"])
113 out[name] = field["type"]
114 return out
115
116 srcfields = _rec_fields(src)
117 sinkfields = _rec_fields(sink)
118 for key in six.iterkeys(sinkfields):
119 if (not can_assign_src_to_sink(
120 srcfields.get(key, "null"), sinkfields.get(key, "null"), strict)
121 and sinkfields.get(key) is not None):
122 _logger.info("Record comparison failure for %s and %s\n"
123 "Did not match fields for %s: %s and %s",
124 src["name"], sink["name"], key, srcfields.get(key),
125 sinkfields.get(key))
126 return False
127 return True
128
129 def missing_subset(fullset, subset): # type: (List[Any], List[Any]) -> List[Any]
130 missing = []
131 for i in subset:
132 if i not in fullset:
133 missing.append(i)
134 return missing
135
136 def static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs, param_to_step):
137 # type: (List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], Dict[Text, Dict[Text, Any]]) -> None
138 """Check if all source and sink types of a workflow are compatible before run time."""
139 # source parameters: workflow_inputs and step_outputs
140 # sink parameters: step_inputs and workflow_outputs
141
142 # make a dictionary of source parameters, indexed by the "id" field
143 src_parms = workflow_inputs + step_outputs
144 src_dict = {}
145 for parm in src_parms:
146 src_dict[parm["id"]] = parm
147
148 step_inputs_val = check_all_types(src_dict, step_inputs, "source")
149 workflow_outputs_val = check_all_types(src_dict, workflow_outputs, "outputSource")
150
151 warnings = step_inputs_val["warning"] + workflow_outputs_val["warning"]
152 exceptions = step_inputs_val["exception"] + workflow_outputs_val["exception"]
153
154 warning_msgs = []
155 exception_msgs = []
156 for warning in warnings:
157 src = warning.src
158 sink = warning.sink
159 linkMerge = warning.linkMerge
160 sinksf = sorted([p["pattern"] for p in sink.get("secondaryFiles", []) if p.get("required", True)])
161 srcsf = sorted([p["pattern"] for p in src.get("secondaryFiles", [])])
162 # Every secondaryFile required by the sink, should be declared
163 # by the source
164 missing = missing_subset(srcsf, sinksf)
165 if missing:
166 msg1 = "Parameter '%s' requires secondaryFiles %s but" % (shortname(sink["id"]), missing)
167 msg3 = SourceLine(src, "id").makeError(
168 "source '%s' does not provide those secondaryFiles." % (shortname(src["id"])))
169 msg4 = SourceLine(src.get("_tool_entry", src), "secondaryFiles").makeError("To resolve, add missing secondaryFiles patterns to definition of '%s' or" % (shortname(src["id"])))
170 msg5 = SourceLine(sink.get("_tool_entry", sink), "secondaryFiles").makeError("mark missing secondaryFiles in definition of '%s' as optional." % shortname(sink["id"]))
171 msg = SourceLine(sink).makeError("%s\n%s" % (msg1, bullets([msg3, msg4, msg5], " ")))
172 elif sink.get("not_connected"):
173 msg = SourceLine(sink, "type").makeError(
174 "'%s' is not an input parameter of %s, expected %s"
175 % (shortname(sink["id"]), param_to_step[sink["id"]]["run"],
176 ", ".join(shortname(s["id"])
177 for s in param_to_step[sink["id"]]["inputs"]
178 if not s.get("not_connected"))))
179 else:
180 msg = SourceLine(src, "type").makeError(
181 "Source '%s' of type %s may be incompatible"
182 % (shortname(src["id"]), json_dumps(src["type"]))) + "\n" + \
183 SourceLine(sink, "type").makeError(
184 " with sink '%s' of type %s"
185 % (shortname(sink["id"]), json_dumps(sink["type"])))
186 if linkMerge is not None:
187 msg += "\n" + SourceLine(sink).makeError(" source has linkMerge method %s" % linkMerge)
188
189 warning_msgs.append(msg)
190 for exception in exceptions:
191 src = exception.src
192 sink = exception.sink
193 linkMerge = exception.linkMerge
194 msg = SourceLine(src, "type").makeError(
195 "Source '%s' of type %s is incompatible"
196 % (shortname(src["id"]), json_dumps(src["type"]))) + "\n" + \
197 SourceLine(sink, "type").makeError(
198 " with sink '%s' of type %s"
199 % (shortname(sink["id"]), json_dumps(sink["type"])))
200 if linkMerge is not None:
201 msg += "\n" + SourceLine(sink).makeError(" source has linkMerge method %s" % linkMerge)
202 exception_msgs.append(msg)
203
204 for sink in step_inputs:
205 if ('null' != sink["type"] and 'null' not in sink["type"]
206 and "source" not in sink and "default" not in sink and "valueFrom" not in sink):
207 msg = SourceLine(sink).makeError(
208 "Required parameter '%s' does not have source, default, or valueFrom expression"
209 % shortname(sink["id"]))
210 exception_msgs.append(msg)
211
212 all_warning_msg = strip_dup_lineno("\n".join(warning_msgs))
213 all_exception_msg = strip_dup_lineno("\n".join(exception_msgs))
214
215 if warnings:
216 _logger.warning("Workflow checker warning:\n%s", all_warning_msg)
217 if exceptions:
218 raise validate.ValidationException(all_exception_msg)
219
220
221 SrcSink = namedtuple("SrcSink", ["src", "sink", "linkMerge"])
222
223 def check_all_types(src_dict, sinks, sourceField):
224 # type: (Dict[Text, Any], List[Dict[Text, Any]], Text) -> Dict[Text, List[SrcSink]]
225 """
226 Given a list of sinks, check if their types match with the types of their sources.
227
228 sourceField is either "soure" or "outputSource"
229 """
230 validation = {"warning": [], "exception": []} # type: Dict[Text, List[SrcSink]]
231 for sink in sinks:
232 if sourceField in sink:
233 valueFrom = sink.get("valueFrom")
234 if isinstance(sink[sourceField], MutableSequence):
235 srcs_of_sink = [src_dict[parm_id] for parm_id in sink[sourceField]]
236 linkMerge = sink.get("linkMerge", ("merge_nested"
237 if len(sink[sourceField]) > 1 else None))
238 else:
239 parm_id = sink[sourceField]
240 srcs_of_sink = [src_dict[parm_id]]
241 linkMerge = None
242 for src in srcs_of_sink:
243 check_result = check_types(src, sink, linkMerge, valueFrom)
244 if check_result == "warning":
245 validation["warning"].append(SrcSink(src, sink, linkMerge))
246 elif check_result == "exception":
247 validation["exception"].append(SrcSink(src, sink, linkMerge))
248 return validation