Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/builder.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 from __future__ import absolute_import | |
2 | |
3 import copy | |
4 import os | |
5 import logging | |
6 from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence, | |
7 Optional, Set, Tuple, Union) | |
8 | |
9 from typing_extensions import Text, Type, TYPE_CHECKING # pylint: disable=unused-import | |
10 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
11 | |
12 from rdflib import Graph, URIRef # pylint: disable=unused-import | |
13 from rdflib.namespace import OWL, RDFS | |
14 from ruamel.yaml.comments import CommentedMap | |
15 from schema_salad import validate | |
16 from schema_salad.schema import Names, convert_to_dict | |
17 from schema_salad.avro.schema import make_avsc_object, Schema | |
18 from schema_salad.sourceline import SourceLine | |
19 from schema_salad.ref_resolver import uri_file_path | |
20 from six import iteritems, string_types | |
21 from future.utils import raise_from | |
22 from typing import IO | |
23 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import | |
24 Text, Type) | |
25 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
26 | |
27 from . import expression | |
28 from .errors import WorkflowException | |
29 from .loghandler import _logger | |
30 from .mutation import MutationManager # pylint: disable=unused-import | |
31 from .pathmapper import PathMapper # pylint: disable=unused-import | |
32 from .pathmapper import CONTENT_LIMIT, get_listing, normalizeFilesDirs, visit_class | |
33 from .stdfsaccess import StdFsAccess # pylint: disable=unused-import | |
34 from .utils import aslist, docker_windows_path_adjust, json_dumps, onWindows | |
35 | |
36 | |
37 | |
38 if TYPE_CHECKING: | |
39 from .provenance import ProvenanceProfile # pylint: disable=unused-import | |
40 | |
41 | |
42 def content_limit_respected_read_bytes(f): # type: (IO[bytes]) -> bytes | |
43 contents = f.read(CONTENT_LIMIT + 1) | |
44 if len(contents) > CONTENT_LIMIT: | |
45 raise WorkflowException("loadContents handling encountered buffer that is exceeds maximum lenght of %d bytes" % CONTENT_LIMIT) | |
46 return contents | |
47 | |
48 | |
49 def content_limit_respected_read(f): # type: (IO[bytes]) -> Text | |
50 return content_limit_respected_read_bytes(f).decode("utf-8") | |
51 | |
52 | |
53 def substitute(value, replace): # type: (Text, Text) -> Text | |
54 if replace.startswith("^"): | |
55 try: | |
56 return substitute(value[0:value.rindex('.')], replace[1:]) | |
57 except ValueError: | |
58 # No extension to remove | |
59 return value + replace.lstrip("^") | |
60 return value + replace | |
61 | |
62 def formatSubclassOf(fmt, cls, ontology, visited): | |
63 # type: (Text, Text, Optional[Graph], Set[Text]) -> bool | |
64 """Determine if `fmt` is a subclass of `cls`.""" | |
65 if URIRef(fmt) == URIRef(cls): | |
66 return True | |
67 | |
68 if ontology is None: | |
69 return False | |
70 | |
71 if fmt in visited: | |
72 return False | |
73 | |
74 visited.add(fmt) | |
75 | |
76 uriRefFmt = URIRef(fmt) | |
77 | |
78 for s, p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)): | |
79 # Find parent classes of `fmt` and search upward | |
80 if formatSubclassOf(o, cls, ontology, visited): | |
81 return True | |
82 | |
83 for s, p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)): | |
84 # Find equivalent classes of `fmt` and search horizontally | |
85 if formatSubclassOf(o, cls, ontology, visited): | |
86 return True | |
87 | |
88 for s, p, o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)): | |
89 # Find equivalent classes of `fmt` and search horizontally | |
90 if formatSubclassOf(s, cls, ontology, visited): | |
91 return True | |
92 | |
93 return False | |
94 | |
95 def check_format(actual_file, # type: Union[Dict[Text, Any], List[Dict[Text, Any]], Text] | |
96 input_formats, # type: Union[List[Text], Text] | |
97 ontology # type: Optional[Graph] | |
98 ): # type: (...) -> None | |
99 """Confirm that the format present is valid for the allowed formats.""" | |
100 for afile in aslist(actual_file): | |
101 if not afile: | |
102 continue | |
103 if "format" not in afile: | |
104 raise validate.ValidationException( | |
105 u"File has no 'format' defined: {}".format( | |
106 json_dumps(afile, indent=4))) | |
107 for inpf in aslist(input_formats): | |
108 if afile["format"] == inpf or \ | |
109 formatSubclassOf(afile["format"], inpf, ontology, set()): | |
110 return | |
111 raise validate.ValidationException( | |
112 u"File has an incompatible format: {}".format( | |
113 json_dumps(afile, indent=4))) | |
114 | |
115 class HasReqsHints(object): | |
116 def __init__(self): # type: () -> None | |
117 """Initialize this reqs decorator.""" | |
118 self.requirements = [] # type: List[Dict[Text, Any]] | |
119 self.hints = [] # type: List[Dict[Text, Any]] | |
120 | |
121 def get_requirement(self, | |
122 feature # type: Text | |
123 ): # type: (...) -> Tuple[Optional[Any], Optional[bool]] | |
124 for item in reversed(self.requirements): | |
125 if item["class"] == feature: | |
126 return (item, True) | |
127 for item in reversed(self.hints): | |
128 if item["class"] == feature: | |
129 return (item, False) | |
130 return (None, None) | |
131 | |
132 class Builder(HasReqsHints): | |
133 def __init__(self, | |
134 job, # type: Dict[Text, expression.JSON] | |
135 files, # type: List[Dict[Text, Text]] | |
136 bindings, # type: List[Dict[Text, Any]] | |
137 schemaDefs, # type: Dict[Text, Dict[Text, Any]] | |
138 names, # type: Names | |
139 requirements, # type: List[Dict[Text, Any]] | |
140 hints, # type: List[Dict[Text, Any]] | |
141 resources, # type: Dict[str, int] | |
142 mutation_manager, # type: Optional[MutationManager] | |
143 formatgraph, # type: Optional[Graph] | |
144 make_fs_access, # type: Type[StdFsAccess] | |
145 fs_access, # type: StdFsAccess | |
146 job_script_provider, # type: Optional[Any] | |
147 timeout, # type: float | |
148 debug, # type: bool | |
149 js_console, # type: bool | |
150 force_docker_pull, # type: bool | |
151 loadListing, # type: Text | |
152 outdir, # type: Text | |
153 tmpdir, # type: Text | |
154 stagedir # type: Text | |
155 ): # type: (...) -> None | |
156 """Initialize this Builder.""" | |
157 self.job = job | |
158 self.files = files | |
159 self.bindings = bindings | |
160 self.schemaDefs = schemaDefs | |
161 self.names = names | |
162 self.requirements = requirements | |
163 self.hints = hints | |
164 self.resources = resources | |
165 self.mutation_manager = mutation_manager | |
166 self.formatgraph = formatgraph | |
167 | |
168 self.make_fs_access = make_fs_access | |
169 self.fs_access = fs_access | |
170 | |
171 self.job_script_provider = job_script_provider | |
172 | |
173 self.timeout = timeout | |
174 | |
175 self.debug = debug | |
176 self.js_console = js_console | |
177 self.force_docker_pull = force_docker_pull | |
178 | |
179 # One of "no_listing", "shallow_listing", "deep_listing" | |
180 self.loadListing = loadListing | |
181 | |
182 self.outdir = outdir | |
183 self.tmpdir = tmpdir | |
184 self.stagedir = stagedir | |
185 | |
186 self.pathmapper = None # type: Optional[PathMapper] | |
187 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
188 self.find_default_container = None # type: Optional[Callable[[], Text]] | |
189 | |
190 def build_job_script(self, commands): | |
191 # type: (List[Text]) -> Text | |
192 build_job_script_method = getattr(self.job_script_provider, "build_job_script", None) # type: Callable[[Builder, Union[List[str],List[Text]]], Text] | |
193 if build_job_script_method is not None: | |
194 return build_job_script_method(self, commands) | |
195 return None | |
196 | |
197 def bind_input(self, | |
198 schema, # type: MutableMapping[Text, Any] | |
199 datum, # type: Any | |
200 discover_secondaryFiles, # type: bool | |
201 lead_pos=None, # type: Optional[Union[int, List[int]]] | |
202 tail_pos=None, # type: Optional[List[int]] | |
203 ): # type: (...) -> List[MutableMapping[Text, Any]] | |
204 | |
205 if tail_pos is None: | |
206 tail_pos = [] | |
207 if lead_pos is None: | |
208 lead_pos = [] | |
209 | |
210 bindings = [] # type: List[MutableMapping[Text, Text]] | |
211 binding = {} # type: Union[MutableMapping[Text, Text], CommentedMap] | |
212 value_from_expression = False | |
213 if "inputBinding" in schema and isinstance(schema["inputBinding"], MutableMapping): | |
214 binding = CommentedMap(schema["inputBinding"].items()) | |
215 | |
216 bp = list(aslist(lead_pos)) | |
217 if "position" in binding: | |
218 position = binding["position"] | |
219 if isinstance(position, str): # no need to test the CWL Version | |
220 # the schema for v1.0 only allow ints | |
221 binding['position'] = self.do_eval(position, context=datum) | |
222 bp.append(binding['position']) | |
223 else: | |
224 bp.extend(aslist(binding['position'])) | |
225 else: | |
226 bp.append(0) | |
227 bp.extend(aslist(tail_pos)) | |
228 binding["position"] = bp | |
229 | |
230 binding["datum"] = datum | |
231 if "valueFrom" in binding: | |
232 value_from_expression = True | |
233 | |
234 # Handle union types | |
235 if isinstance(schema["type"], MutableSequence): | |
236 bound_input = False | |
237 for t in schema["type"]: | |
238 avsc = None # type: Optional[Schema] | |
239 if isinstance(t, string_types) and self.names.has_name(t, ""): | |
240 avsc = self.names.get_name(t, "") | |
241 elif isinstance(t, MutableMapping) and "name" in t and self.names.has_name(t["name"], ""): | |
242 avsc = self.names.get_name(t["name"], "") | |
243 if not avsc: | |
244 avsc = make_avsc_object(convert_to_dict(t), self.names) | |
245 if validate.validate(avsc, datum): | |
246 schema = copy.deepcopy(schema) | |
247 schema["type"] = t | |
248 if not value_from_expression: | |
249 return self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles) | |
250 else: | |
251 self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles) | |
252 bound_input = True | |
253 if not bound_input: | |
254 raise validate.ValidationException(u"'%s' is not a valid union %s" % (datum, schema["type"])) | |
255 elif isinstance(schema["type"], MutableMapping): | |
256 st = copy.deepcopy(schema["type"]) | |
257 if binding and "inputBinding" not in st\ | |
258 and "type" in st\ | |
259 and st["type"] == "array"\ | |
260 and "itemSeparator" not in binding: | |
261 st["inputBinding"] = {} | |
262 for k in ("secondaryFiles", "format", "streamable"): | |
263 if k in schema: | |
264 st[k] = schema[k] | |
265 if value_from_expression: | |
266 self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles) | |
267 else: | |
268 bindings.extend(self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)) | |
269 else: | |
270 if schema["type"] in self.schemaDefs: | |
271 schema = self.schemaDefs[schema["type"]] | |
272 | |
273 if schema["type"] == "record": | |
274 for f in schema["fields"]: | |
275 if f["name"] in datum and datum[f["name"]] is not None: | |
276 bindings.extend(self.bind_input(f, datum[f["name"]], lead_pos=lead_pos, tail_pos=f["name"], discover_secondaryFiles=discover_secondaryFiles)) | |
277 else: | |
278 datum[f["name"]] = f.get("default") | |
279 | |
280 if schema["type"] == "array": | |
281 for n, item in enumerate(datum): | |
282 b2 = None | |
283 if binding: | |
284 b2 = copy.deepcopy(binding) | |
285 b2["datum"] = item | |
286 itemschema = { | |
287 u"type": schema["items"], | |
288 u"inputBinding": b2 | |
289 } | |
290 for k in ("secondaryFiles", "format", "streamable"): | |
291 if k in schema: | |
292 itemschema[k] = schema[k] | |
293 bindings.extend( | |
294 self.bind_input(itemschema, item, lead_pos=n, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)) | |
295 binding = {} | |
296 | |
297 def _capture_files(f): # type: (Dict[Text, Text]) -> Dict[Text, Text] | |
298 self.files.append(f) | |
299 return f | |
300 | |
301 if schema["type"] == "File": | |
302 self.files.append(datum) | |
303 if (binding and binding.get("loadContents")) or schema.get("loadContents"): | |
304 with self.fs_access.open(datum["location"], "rb") as f: | |
305 datum["contents"] = content_limit_respected_read(f) | |
306 | |
307 if "secondaryFiles" in schema: | |
308 if "secondaryFiles" not in datum: | |
309 datum["secondaryFiles"] = [] | |
310 for sf in aslist(schema["secondaryFiles"]): | |
311 if 'required' in sf: | |
312 sf_required = self.do_eval(sf['required'], context=datum) | |
313 else: | |
314 sf_required = True | |
315 | |
316 | |
317 if "$(" in sf["pattern"] or "${" in sf["pattern"]: | |
318 sfpath = self.do_eval(sf["pattern"], context=datum) | |
319 else: | |
320 sfpath = substitute(datum["basename"], sf["pattern"]) | |
321 | |
322 for sfname in aslist(sfpath): | |
323 if not sfname: | |
324 continue | |
325 found = False | |
326 for d in datum["secondaryFiles"]: | |
327 if not d.get("basename"): | |
328 d["basename"] = d["location"][d["location"].rindex("/")+1:] | |
329 if d["basename"] == sfname: | |
330 found = True | |
331 if not found: | |
332 sf_location = datum["location"][0:datum["location"].rindex("/")+1]+sfname | |
333 if isinstance(sfname, MutableMapping): | |
334 datum["secondaryFiles"].append(sfname) | |
335 elif discover_secondaryFiles and self.fs_access.exists(sf_location): | |
336 datum["secondaryFiles"].append({ | |
337 "location": sf_location, | |
338 "basename": sfname, | |
339 "class": "File"}) | |
340 elif sf_required: | |
341 raise WorkflowException("Missing required secondary file '%s' from file object: %s" % ( | |
342 sfname, json_dumps(datum, indent=4))) | |
343 | |
344 normalizeFilesDirs(datum["secondaryFiles"]) | |
345 | |
346 if "format" in schema: | |
347 try: | |
348 check_format(datum, self.do_eval(schema["format"]), | |
349 self.formatgraph) | |
350 except validate.ValidationException as ve: | |
351 raise_from(WorkflowException( | |
352 "Expected value of '%s' to have format %s but\n " | |
353 " %s" % (schema["name"], schema["format"], ve)), ve) | |
354 | |
355 visit_class(datum.get("secondaryFiles", []), ("File", "Directory"), _capture_files) | |
356 | |
357 if schema["type"] == "Directory": | |
358 ll = schema.get("loadListing") or self.loadListing | |
359 if ll and ll != "no_listing": | |
360 get_listing(self.fs_access, datum, (ll == "deep_listing")) | |
361 self.files.append(datum) | |
362 | |
363 if schema["type"] == "Any": | |
364 visit_class(datum, ("File", "Directory"), _capture_files) | |
365 | |
366 # Position to front of the sort key | |
367 if binding: | |
368 for bi in bindings: | |
369 bi["position"] = binding["position"] + bi["position"] | |
370 bindings.append(binding) | |
371 | |
372 return bindings | |
373 | |
374 def tostr(self, value): # type: (Union[MutableMapping[Text, Text], Any]) -> Text | |
375 if isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"): | |
376 if "path" not in value: | |
377 raise WorkflowException(u"%s object missing \"path\": %s" % (value["class"], value)) | |
378 | |
379 # Path adjust for windows file path when passing to docker, docker accepts unix like path only | |
380 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") | |
381 if onWindows() and docker_req is not None: | |
382 # docker_req is none only when there is no dockerRequirement | |
383 # mentioned in hints and Requirement | |
384 path = docker_windows_path_adjust(value["path"]) | |
385 return path | |
386 return value["path"] | |
387 else: | |
388 return Text(value) | |
389 | |
390 def generate_arg(self, binding): # type: (Dict[Text, Any]) -> List[Text] | |
391 value = binding.get("datum") | |
392 if "valueFrom" in binding: | |
393 with SourceLine(binding, "valueFrom", WorkflowException, _logger.isEnabledFor(logging.DEBUG)): | |
394 value = self.do_eval(binding["valueFrom"], context=value) | |
395 | |
396 prefix = binding.get("prefix") # type: Optional[Text] | |
397 sep = binding.get("separate", True) | |
398 if prefix is None and not sep: | |
399 with SourceLine(binding, "separate", WorkflowException, _logger.isEnabledFor(logging.DEBUG)): | |
400 raise WorkflowException("'separate' option can not be specified without prefix") | |
401 | |
402 argl = [] # type: MutableSequence[MutableMapping[Text, Text]] | |
403 if isinstance(value, MutableSequence): | |
404 if binding.get("itemSeparator") and value: | |
405 argl = [binding["itemSeparator"].join([self.tostr(v) for v in value])] | |
406 elif binding.get("valueFrom"): | |
407 value = [self.tostr(v) for v in value] | |
408 return ([prefix] if prefix else []) + value | |
409 elif prefix and value: | |
410 return [prefix] | |
411 else: | |
412 return [] | |
413 elif isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"): | |
414 argl = [value] | |
415 elif isinstance(value, MutableMapping): | |
416 return [prefix] if prefix else [] | |
417 elif value is True and prefix: | |
418 return [prefix] | |
419 elif value is False or value is None or (value is True and not prefix): | |
420 return [] | |
421 else: | |
422 argl = [value] | |
423 | |
424 args = [] | |
425 for j in argl: | |
426 if sep: | |
427 args.extend([prefix, self.tostr(j)]) | |
428 else: | |
429 args.append(self.tostr(j) if prefix is None else prefix + self.tostr(j)) | |
430 | |
431 return [a for a in args if a is not None] | |
432 | |
433 def do_eval(self, ex, context=None, recursive=False, strip_whitespace=True): | |
434 # type: (Union[Dict[Text, Text], Text], Any, bool, bool) -> Any | |
435 if recursive: | |
436 if isinstance(ex, MutableMapping): | |
437 return {k: self.do_eval(v, context, recursive) | |
438 for k, v in iteritems(ex)} | |
439 if isinstance(ex, MutableSequence): | |
440 return [self.do_eval(v, context, recursive) | |
441 for v in ex] | |
442 | |
443 return expression.do_eval(ex, self.job, self.requirements, | |
444 self.outdir, self.tmpdir, | |
445 self.resources, | |
446 context=context, | |
447 timeout=self.timeout, | |
448 debug=self.debug, | |
449 js_console=self.js_console, | |
450 force_docker_pull=self.force_docker_pull, | |
451 strip_whitespace=strip_whitespace) |