comparison env/lib/python3.7/site-packages/cwltool/builder.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 from __future__ import absolute_import
2
3 import copy
4 import os
5 import logging
6 from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence,
7 Optional, Set, Tuple, Union)
8
9 from typing_extensions import Text, Type, TYPE_CHECKING # pylint: disable=unused-import
10 # move to a regular typing import when Python 3.3-3.6 is no longer supported
11
12 from rdflib import Graph, URIRef # pylint: disable=unused-import
13 from rdflib.namespace import OWL, RDFS
14 from ruamel.yaml.comments import CommentedMap
15 from schema_salad import validate
16 from schema_salad.schema import Names, convert_to_dict
17 from schema_salad.avro.schema import make_avsc_object, Schema
18 from schema_salad.sourceline import SourceLine
19 from schema_salad.ref_resolver import uri_file_path
20 from six import iteritems, string_types
21 from future.utils import raise_from
22 from typing import IO
23 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import
24 Text, Type)
25 # move to a regular typing import when Python 3.3-3.6 is no longer supported
26
27 from . import expression
28 from .errors import WorkflowException
29 from .loghandler import _logger
30 from .mutation import MutationManager # pylint: disable=unused-import
31 from .pathmapper import PathMapper # pylint: disable=unused-import
32 from .pathmapper import CONTENT_LIMIT, get_listing, normalizeFilesDirs, visit_class
33 from .stdfsaccess import StdFsAccess # pylint: disable=unused-import
34 from .utils import aslist, docker_windows_path_adjust, json_dumps, onWindows
35
36
37
38 if TYPE_CHECKING:
39 from .provenance import ProvenanceProfile # pylint: disable=unused-import
40
41
42 def content_limit_respected_read_bytes(f): # type: (IO[bytes]) -> bytes
43 contents = f.read(CONTENT_LIMIT + 1)
44 if len(contents) > CONTENT_LIMIT:
45 raise WorkflowException("loadContents handling encountered buffer that is exceeds maximum lenght of %d bytes" % CONTENT_LIMIT)
46 return contents
47
48
49 def content_limit_respected_read(f): # type: (IO[bytes]) -> Text
50 return content_limit_respected_read_bytes(f).decode("utf-8")
51
52
53 def substitute(value, replace): # type: (Text, Text) -> Text
54 if replace.startswith("^"):
55 try:
56 return substitute(value[0:value.rindex('.')], replace[1:])
57 except ValueError:
58 # No extension to remove
59 return value + replace.lstrip("^")
60 return value + replace
61
62 def formatSubclassOf(fmt, cls, ontology, visited):
63 # type: (Text, Text, Optional[Graph], Set[Text]) -> bool
64 """Determine if `fmt` is a subclass of `cls`."""
65 if URIRef(fmt) == URIRef(cls):
66 return True
67
68 if ontology is None:
69 return False
70
71 if fmt in visited:
72 return False
73
74 visited.add(fmt)
75
76 uriRefFmt = URIRef(fmt)
77
78 for s, p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)):
79 # Find parent classes of `fmt` and search upward
80 if formatSubclassOf(o, cls, ontology, visited):
81 return True
82
83 for s, p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)):
84 # Find equivalent classes of `fmt` and search horizontally
85 if formatSubclassOf(o, cls, ontology, visited):
86 return True
87
88 for s, p, o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)):
89 # Find equivalent classes of `fmt` and search horizontally
90 if formatSubclassOf(s, cls, ontology, visited):
91 return True
92
93 return False
94
95 def check_format(actual_file, # type: Union[Dict[Text, Any], List[Dict[Text, Any]], Text]
96 input_formats, # type: Union[List[Text], Text]
97 ontology # type: Optional[Graph]
98 ): # type: (...) -> None
99 """Confirm that the format present is valid for the allowed formats."""
100 for afile in aslist(actual_file):
101 if not afile:
102 continue
103 if "format" not in afile:
104 raise validate.ValidationException(
105 u"File has no 'format' defined: {}".format(
106 json_dumps(afile, indent=4)))
107 for inpf in aslist(input_formats):
108 if afile["format"] == inpf or \
109 formatSubclassOf(afile["format"], inpf, ontology, set()):
110 return
111 raise validate.ValidationException(
112 u"File has an incompatible format: {}".format(
113 json_dumps(afile, indent=4)))
114
115 class HasReqsHints(object):
116 def __init__(self): # type: () -> None
117 """Initialize this reqs decorator."""
118 self.requirements = [] # type: List[Dict[Text, Any]]
119 self.hints = [] # type: List[Dict[Text, Any]]
120
121 def get_requirement(self,
122 feature # type: Text
123 ): # type: (...) -> Tuple[Optional[Any], Optional[bool]]
124 for item in reversed(self.requirements):
125 if item["class"] == feature:
126 return (item, True)
127 for item in reversed(self.hints):
128 if item["class"] == feature:
129 return (item, False)
130 return (None, None)
131
132 class Builder(HasReqsHints):
133 def __init__(self,
134 job, # type: Dict[Text, expression.JSON]
135 files, # type: List[Dict[Text, Text]]
136 bindings, # type: List[Dict[Text, Any]]
137 schemaDefs, # type: Dict[Text, Dict[Text, Any]]
138 names, # type: Names
139 requirements, # type: List[Dict[Text, Any]]
140 hints, # type: List[Dict[Text, Any]]
141 resources, # type: Dict[str, int]
142 mutation_manager, # type: Optional[MutationManager]
143 formatgraph, # type: Optional[Graph]
144 make_fs_access, # type: Type[StdFsAccess]
145 fs_access, # type: StdFsAccess
146 job_script_provider, # type: Optional[Any]
147 timeout, # type: float
148 debug, # type: bool
149 js_console, # type: bool
150 force_docker_pull, # type: bool
151 loadListing, # type: Text
152 outdir, # type: Text
153 tmpdir, # type: Text
154 stagedir # type: Text
155 ): # type: (...) -> None
156 """Initialize this Builder."""
157 self.job = job
158 self.files = files
159 self.bindings = bindings
160 self.schemaDefs = schemaDefs
161 self.names = names
162 self.requirements = requirements
163 self.hints = hints
164 self.resources = resources
165 self.mutation_manager = mutation_manager
166 self.formatgraph = formatgraph
167
168 self.make_fs_access = make_fs_access
169 self.fs_access = fs_access
170
171 self.job_script_provider = job_script_provider
172
173 self.timeout = timeout
174
175 self.debug = debug
176 self.js_console = js_console
177 self.force_docker_pull = force_docker_pull
178
179 # One of "no_listing", "shallow_listing", "deep_listing"
180 self.loadListing = loadListing
181
182 self.outdir = outdir
183 self.tmpdir = tmpdir
184 self.stagedir = stagedir
185
186 self.pathmapper = None # type: Optional[PathMapper]
187 self.prov_obj = None # type: Optional[ProvenanceProfile]
188 self.find_default_container = None # type: Optional[Callable[[], Text]]
189
190 def build_job_script(self, commands):
191 # type: (List[Text]) -> Text
192 build_job_script_method = getattr(self.job_script_provider, "build_job_script", None) # type: Callable[[Builder, Union[List[str],List[Text]]], Text]
193 if build_job_script_method is not None:
194 return build_job_script_method(self, commands)
195 return None
196
197 def bind_input(self,
198 schema, # type: MutableMapping[Text, Any]
199 datum, # type: Any
200 discover_secondaryFiles, # type: bool
201 lead_pos=None, # type: Optional[Union[int, List[int]]]
202 tail_pos=None, # type: Optional[List[int]]
203 ): # type: (...) -> List[MutableMapping[Text, Any]]
204
205 if tail_pos is None:
206 tail_pos = []
207 if lead_pos is None:
208 lead_pos = []
209
210 bindings = [] # type: List[MutableMapping[Text, Text]]
211 binding = {} # type: Union[MutableMapping[Text, Text], CommentedMap]
212 value_from_expression = False
213 if "inputBinding" in schema and isinstance(schema["inputBinding"], MutableMapping):
214 binding = CommentedMap(schema["inputBinding"].items())
215
216 bp = list(aslist(lead_pos))
217 if "position" in binding:
218 position = binding["position"]
219 if isinstance(position, str): # no need to test the CWL Version
220 # the schema for v1.0 only allow ints
221 binding['position'] = self.do_eval(position, context=datum)
222 bp.append(binding['position'])
223 else:
224 bp.extend(aslist(binding['position']))
225 else:
226 bp.append(0)
227 bp.extend(aslist(tail_pos))
228 binding["position"] = bp
229
230 binding["datum"] = datum
231 if "valueFrom" in binding:
232 value_from_expression = True
233
234 # Handle union types
235 if isinstance(schema["type"], MutableSequence):
236 bound_input = False
237 for t in schema["type"]:
238 avsc = None # type: Optional[Schema]
239 if isinstance(t, string_types) and self.names.has_name(t, ""):
240 avsc = self.names.get_name(t, "")
241 elif isinstance(t, MutableMapping) and "name" in t and self.names.has_name(t["name"], ""):
242 avsc = self.names.get_name(t["name"], "")
243 if not avsc:
244 avsc = make_avsc_object(convert_to_dict(t), self.names)
245 if validate.validate(avsc, datum):
246 schema = copy.deepcopy(schema)
247 schema["type"] = t
248 if not value_from_expression:
249 return self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)
250 else:
251 self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)
252 bound_input = True
253 if not bound_input:
254 raise validate.ValidationException(u"'%s' is not a valid union %s" % (datum, schema["type"]))
255 elif isinstance(schema["type"], MutableMapping):
256 st = copy.deepcopy(schema["type"])
257 if binding and "inputBinding" not in st\
258 and "type" in st\
259 and st["type"] == "array"\
260 and "itemSeparator" not in binding:
261 st["inputBinding"] = {}
262 for k in ("secondaryFiles", "format", "streamable"):
263 if k in schema:
264 st[k] = schema[k]
265 if value_from_expression:
266 self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)
267 else:
268 bindings.extend(self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles))
269 else:
270 if schema["type"] in self.schemaDefs:
271 schema = self.schemaDefs[schema["type"]]
272
273 if schema["type"] == "record":
274 for f in schema["fields"]:
275 if f["name"] in datum and datum[f["name"]] is not None:
276 bindings.extend(self.bind_input(f, datum[f["name"]], lead_pos=lead_pos, tail_pos=f["name"], discover_secondaryFiles=discover_secondaryFiles))
277 else:
278 datum[f["name"]] = f.get("default")
279
280 if schema["type"] == "array":
281 for n, item in enumerate(datum):
282 b2 = None
283 if binding:
284 b2 = copy.deepcopy(binding)
285 b2["datum"] = item
286 itemschema = {
287 u"type": schema["items"],
288 u"inputBinding": b2
289 }
290 for k in ("secondaryFiles", "format", "streamable"):
291 if k in schema:
292 itemschema[k] = schema[k]
293 bindings.extend(
294 self.bind_input(itemschema, item, lead_pos=n, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles))
295 binding = {}
296
297 def _capture_files(f): # type: (Dict[Text, Text]) -> Dict[Text, Text]
298 self.files.append(f)
299 return f
300
301 if schema["type"] == "File":
302 self.files.append(datum)
303 if (binding and binding.get("loadContents")) or schema.get("loadContents"):
304 with self.fs_access.open(datum["location"], "rb") as f:
305 datum["contents"] = content_limit_respected_read(f)
306
307 if "secondaryFiles" in schema:
308 if "secondaryFiles" not in datum:
309 datum["secondaryFiles"] = []
310 for sf in aslist(schema["secondaryFiles"]):
311 if 'required' in sf:
312 sf_required = self.do_eval(sf['required'], context=datum)
313 else:
314 sf_required = True
315
316
317 if "$(" in sf["pattern"] or "${" in sf["pattern"]:
318 sfpath = self.do_eval(sf["pattern"], context=datum)
319 else:
320 sfpath = substitute(datum["basename"], sf["pattern"])
321
322 for sfname in aslist(sfpath):
323 if not sfname:
324 continue
325 found = False
326 for d in datum["secondaryFiles"]:
327 if not d.get("basename"):
328 d["basename"] = d["location"][d["location"].rindex("/")+1:]
329 if d["basename"] == sfname:
330 found = True
331 if not found:
332 sf_location = datum["location"][0:datum["location"].rindex("/")+1]+sfname
333 if isinstance(sfname, MutableMapping):
334 datum["secondaryFiles"].append(sfname)
335 elif discover_secondaryFiles and self.fs_access.exists(sf_location):
336 datum["secondaryFiles"].append({
337 "location": sf_location,
338 "basename": sfname,
339 "class": "File"})
340 elif sf_required:
341 raise WorkflowException("Missing required secondary file '%s' from file object: %s" % (
342 sfname, json_dumps(datum, indent=4)))
343
344 normalizeFilesDirs(datum["secondaryFiles"])
345
346 if "format" in schema:
347 try:
348 check_format(datum, self.do_eval(schema["format"]),
349 self.formatgraph)
350 except validate.ValidationException as ve:
351 raise_from(WorkflowException(
352 "Expected value of '%s' to have format %s but\n "
353 " %s" % (schema["name"], schema["format"], ve)), ve)
354
355 visit_class(datum.get("secondaryFiles", []), ("File", "Directory"), _capture_files)
356
357 if schema["type"] == "Directory":
358 ll = schema.get("loadListing") or self.loadListing
359 if ll and ll != "no_listing":
360 get_listing(self.fs_access, datum, (ll == "deep_listing"))
361 self.files.append(datum)
362
363 if schema["type"] == "Any":
364 visit_class(datum, ("File", "Directory"), _capture_files)
365
366 # Position to front of the sort key
367 if binding:
368 for bi in bindings:
369 bi["position"] = binding["position"] + bi["position"]
370 bindings.append(binding)
371
372 return bindings
373
374 def tostr(self, value): # type: (Union[MutableMapping[Text, Text], Any]) -> Text
375 if isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"):
376 if "path" not in value:
377 raise WorkflowException(u"%s object missing \"path\": %s" % (value["class"], value))
378
379 # Path adjust for windows file path when passing to docker, docker accepts unix like path only
380 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
381 if onWindows() and docker_req is not None:
382 # docker_req is none only when there is no dockerRequirement
383 # mentioned in hints and Requirement
384 path = docker_windows_path_adjust(value["path"])
385 return path
386 return value["path"]
387 else:
388 return Text(value)
389
390 def generate_arg(self, binding): # type: (Dict[Text, Any]) -> List[Text]
391 value = binding.get("datum")
392 if "valueFrom" in binding:
393 with SourceLine(binding, "valueFrom", WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
394 value = self.do_eval(binding["valueFrom"], context=value)
395
396 prefix = binding.get("prefix") # type: Optional[Text]
397 sep = binding.get("separate", True)
398 if prefix is None and not sep:
399 with SourceLine(binding, "separate", WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
400 raise WorkflowException("'separate' option can not be specified without prefix")
401
402 argl = [] # type: MutableSequence[MutableMapping[Text, Text]]
403 if isinstance(value, MutableSequence):
404 if binding.get("itemSeparator") and value:
405 argl = [binding["itemSeparator"].join([self.tostr(v) for v in value])]
406 elif binding.get("valueFrom"):
407 value = [self.tostr(v) for v in value]
408 return ([prefix] if prefix else []) + value
409 elif prefix and value:
410 return [prefix]
411 else:
412 return []
413 elif isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"):
414 argl = [value]
415 elif isinstance(value, MutableMapping):
416 return [prefix] if prefix else []
417 elif value is True and prefix:
418 return [prefix]
419 elif value is False or value is None or (value is True and not prefix):
420 return []
421 else:
422 argl = [value]
423
424 args = []
425 for j in argl:
426 if sep:
427 args.extend([prefix, self.tostr(j)])
428 else:
429 args.append(self.tostr(j) if prefix is None else prefix + self.tostr(j))
430
431 return [a for a in args if a is not None]
432
433 def do_eval(self, ex, context=None, recursive=False, strip_whitespace=True):
434 # type: (Union[Dict[Text, Text], Text], Any, bool, bool) -> Any
435 if recursive:
436 if isinstance(ex, MutableMapping):
437 return {k: self.do_eval(v, context, recursive)
438 for k, v in iteritems(ex)}
439 if isinstance(ex, MutableSequence):
440 return [self.do_eval(v, context, recursive)
441 for v in ex]
442
443 return expression.do_eval(ex, self.job, self.requirements,
444 self.outdir, self.tmpdir,
445 self.resources,
446 context=context,
447 timeout=self.timeout,
448 debug=self.debug,
449 js_console=self.js_console,
450 force_docker_pull=self.force_docker_pull,
451 strip_whitespace=strip_whitespace)