Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/process.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Classes and methods relevant for all CWL Proccess types.""" | |
2 import abc | |
3 import copy | |
4 import functools | |
5 import hashlib | |
6 import json | |
7 import logging | |
8 import math | |
9 import os | |
10 import shutil | |
11 import stat | |
12 import textwrap | |
13 import urllib | |
14 import uuid | |
15 from os import scandir | |
16 from typing import ( | |
17 Any, | |
18 Callable, | |
19 Dict, | |
20 Iterable, | |
21 Iterator, | |
22 List, | |
23 MutableMapping, | |
24 MutableSequence, | |
25 Optional, | |
26 Set, | |
27 Sized, | |
28 Tuple, | |
29 Type, | |
30 Union, | |
31 cast, | |
32 ) | |
33 | |
34 from pkg_resources import resource_stream | |
35 from rdflib import Graph | |
36 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
37 from schema_salad.avro.schema import ( | |
38 Names, | |
39 Schema, | |
40 SchemaParseException, | |
41 make_avsc_object, | |
42 ) | |
43 from schema_salad.exceptions import ValidationException | |
44 from schema_salad.ref_resolver import Loader, file_uri, uri_file_path | |
45 from schema_salad.schema import load_schema, make_avro_schema, make_valid_avro | |
46 from schema_salad.sourceline import SourceLine, strip_dup_lineno | |
47 from schema_salad.utils import convert_to_dict | |
48 from schema_salad.validate import validate_ex | |
49 from typing_extensions import TYPE_CHECKING | |
50 | |
51 from . import expression | |
52 from .builder import Builder, HasReqsHints | |
53 from .context import LoadingContext, RuntimeContext, getdefault | |
54 from .errors import UnsupportedRequirement, WorkflowException | |
55 from .loghandler import _logger | |
56 from .mpi import MPIRequirementName | |
57 from .pathmapper import MapperEnt, PathMapper | |
58 from .secrets import SecretStore | |
59 from .stdfsaccess import StdFsAccess | |
60 from .update import INTERNAL_VERSION | |
61 from .utils import ( | |
62 CWLObjectType, | |
63 CWLOutputAtomType, | |
64 CWLOutputType, | |
65 JobsGeneratorType, | |
66 OutputCallbackType, | |
67 adjustDirObjs, | |
68 aslist, | |
69 cmp_like_py2, | |
70 copytree_with_merge, | |
71 ensure_writable, | |
72 get_listing, | |
73 normalizeFilesDirs, | |
74 onWindows, | |
75 random_outdir, | |
76 visit_class, | |
77 ) | |
78 from .validate_js import validate_js_expressions | |
79 | |
80 if TYPE_CHECKING: | |
81 from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import | |
82 | |
83 | |
84 class LogAsDebugFilter(logging.Filter): | |
85 def __init__(self, name: str, parent: logging.Logger) -> None: | |
86 """Initialize.""" | |
87 name = str(name) | |
88 super().__init__(name) | |
89 self.parent = parent | |
90 | |
91 def filter(self, record: logging.LogRecord) -> bool: | |
92 return self.parent.isEnabledFor(logging.DEBUG) | |
93 | |
94 | |
95 _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") | |
96 _logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) | |
97 _logger_validation_warnings.addFilter( | |
98 LogAsDebugFilter("cwltool.validation_warnings", _logger) | |
99 ) | |
100 | |
101 supportedProcessRequirements = [ | |
102 "DockerRequirement", | |
103 "SchemaDefRequirement", | |
104 "EnvVarRequirement", | |
105 "ScatterFeatureRequirement", | |
106 "SubworkflowFeatureRequirement", | |
107 "MultipleInputFeatureRequirement", | |
108 "InlineJavascriptRequirement", | |
109 "ShellCommandRequirement", | |
110 "StepInputExpressionRequirement", | |
111 "ResourceRequirement", | |
112 "InitialWorkDirRequirement", | |
113 "ToolTimeLimit", | |
114 "WorkReuse", | |
115 "NetworkAccess", | |
116 "InplaceUpdateRequirement", | |
117 "LoadListingRequirement", | |
118 MPIRequirementName, | |
119 "http://commonwl.org/cwltool#TimeLimit", | |
120 "http://commonwl.org/cwltool#WorkReuse", | |
121 "http://commonwl.org/cwltool#NetworkAccess", | |
122 "http://commonwl.org/cwltool#LoadListingRequirement", | |
123 "http://commonwl.org/cwltool#InplaceUpdateRequirement", | |
124 ] | |
125 | |
126 cwl_files = ( | |
127 "Workflow.yml", | |
128 "CommandLineTool.yml", | |
129 "CommonWorkflowLanguage.yml", | |
130 "Process.yml", | |
131 "Operation.yml", | |
132 "concepts.md", | |
133 "contrib.md", | |
134 "intro.md", | |
135 "invocation.md", | |
136 ) | |
137 | |
138 salad_files = ( | |
139 "metaschema.yml", | |
140 "metaschema_base.yml", | |
141 "salad.md", | |
142 "field_name.yml", | |
143 "import_include.md", | |
144 "link_res.yml", | |
145 "ident_res.yml", | |
146 "vocab_res.yml", | |
147 "vocab_res.yml", | |
148 "field_name_schema.yml", | |
149 "field_name_src.yml", | |
150 "field_name_proc.yml", | |
151 "ident_res_schema.yml", | |
152 "ident_res_src.yml", | |
153 "ident_res_proc.yml", | |
154 "link_res_schema.yml", | |
155 "link_res_src.yml", | |
156 "link_res_proc.yml", | |
157 "vocab_res_schema.yml", | |
158 "vocab_res_src.yml", | |
159 "vocab_res_proc.yml", | |
160 ) | |
161 | |
162 SCHEMA_CACHE = ( | |
163 {} | |
164 ) # type: Dict[str, Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]] | |
165 SCHEMA_FILE = None # type: Optional[CWLObjectType] | |
166 SCHEMA_DIR = None # type: Optional[CWLObjectType] | |
167 SCHEMA_ANY = None # type: Optional[CWLObjectType] | |
168 | |
169 custom_schemas = {} # type: Dict[str, Tuple[str, str]] | |
170 | |
171 | |
172 def use_standard_schema(version: str) -> None: | |
173 if version in custom_schemas: | |
174 del custom_schemas[version] | |
175 if version in SCHEMA_CACHE: | |
176 del SCHEMA_CACHE[version] | |
177 | |
178 | |
179 def use_custom_schema(version: str, name: str, text: str) -> None: | |
180 custom_schemas[version] = (name, text) | |
181 if version in SCHEMA_CACHE: | |
182 del SCHEMA_CACHE[version] | |
183 | |
184 | |
185 def get_schema( | |
186 version: str, | |
187 ) -> Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]: | |
188 | |
189 if version in SCHEMA_CACHE: | |
190 return SCHEMA_CACHE[version] | |
191 | |
192 cache = {} # type: Dict[str, Union[str, Graph, bool]] | |
193 version = version.split("#")[-1] | |
194 if ".dev" in version: | |
195 version = ".".join(version.split(".")[:-1]) | |
196 for f in cwl_files: | |
197 try: | |
198 res = resource_stream(__name__, f"schemas/{version}/{f}") | |
199 cache["https://w3id.org/cwl/" + f] = res.read().decode("UTF-8") | |
200 res.close() | |
201 except OSError: | |
202 pass | |
203 | |
204 for f in salad_files: | |
205 try: | |
206 res = resource_stream( | |
207 __name__, | |
208 f"schemas/{version}/salad/schema_salad/metaschema/{f}", | |
209 ) | |
210 cache[ | |
211 "https://w3id.org/cwl/salad/schema_salad/metaschema/" + f | |
212 ] = res.read().decode("UTF-8") | |
213 res.close() | |
214 except OSError: | |
215 pass | |
216 | |
217 if version in custom_schemas: | |
218 cache[custom_schemas[version][0]] = custom_schemas[version][1] | |
219 SCHEMA_CACHE[version] = load_schema(custom_schemas[version][0], cache=cache) | |
220 else: | |
221 SCHEMA_CACHE[version] = load_schema( | |
222 "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache | |
223 ) | |
224 | |
225 return SCHEMA_CACHE[version] | |
226 | |
227 | |
228 def shortname(inputid: str) -> str: | |
229 d = urllib.parse.urlparse(inputid) | |
230 if d.fragment: | |
231 return d.fragment.split("/")[-1] | |
232 return d.path.split("/")[-1] | |
233 | |
234 | |
235 def checkRequirements( | |
236 rec: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType, None], | |
237 supported_process_requirements: Iterable[str], | |
238 ) -> None: | |
239 if isinstance(rec, MutableMapping): | |
240 if "requirements" in rec: | |
241 for i, entry in enumerate( | |
242 cast(MutableSequence[CWLObjectType], rec["requirements"]) | |
243 ): | |
244 with SourceLine(rec["requirements"], i, UnsupportedRequirement): | |
245 if cast(str, entry["class"]) not in supported_process_requirements: | |
246 raise UnsupportedRequirement( | |
247 "Unsupported requirement {}".format(entry["class"]) | |
248 ) | |
249 for key in rec: | |
250 checkRequirements(rec[key], supported_process_requirements) | |
251 if isinstance(rec, MutableSequence): | |
252 for entry2 in rec: | |
253 checkRequirements(entry2, supported_process_requirements) | |
254 | |
255 | |
256 def stage_files( | |
257 pathmapper: PathMapper, | |
258 stage_func: Optional[Callable[[str, str], None]] = None, | |
259 ignore_writable: bool = False, | |
260 symlink: bool = True, | |
261 secret_store: Optional[SecretStore] = None, | |
262 fix_conflicts: bool = False, | |
263 ) -> None: | |
264 """Link or copy files to their targets. Create them as needed.""" | |
265 targets = {} # type: Dict[str, MapperEnt] | |
266 for key, entry in pathmapper.items(): | |
267 if "File" not in entry.type: | |
268 continue | |
269 if entry.target not in targets: | |
270 targets[entry.target] = entry | |
271 elif targets[entry.target].resolved != entry.resolved: | |
272 if fix_conflicts: | |
273 # find first key that does not clash with an existing entry in targets | |
274 # start with entry.target + '_' + 2 and then keep incrementing the number till there is no clash | |
275 i = 2 | |
276 tgt = f"{entry.target}_{i}" | |
277 while tgt in targets: | |
278 i += 1 | |
279 tgt = f"{entry.target}_{i}" | |
280 targets[tgt] = pathmapper.update( | |
281 key, entry.resolved, tgt, entry.type, entry.staged | |
282 ) | |
283 else: | |
284 raise WorkflowException( | |
285 "File staging conflict, trying to stage both %s and %s to the same target %s" | |
286 % (targets[entry.target].resolved, entry.resolved, entry.target) | |
287 ) | |
288 | |
289 for key, entry in pathmapper.items(): | |
290 if not entry.staged: | |
291 continue | |
292 if not os.path.exists(os.path.dirname(entry.target)): | |
293 os.makedirs(os.path.dirname(entry.target)) | |
294 if entry.type in ("File", "Directory") and os.path.exists(entry.resolved): | |
295 if symlink: # Use symlink func if allowed | |
296 if onWindows(): | |
297 if entry.type == "File": | |
298 shutil.copy(entry.resolved, entry.target) | |
299 elif entry.type == "Directory": | |
300 if os.path.exists(entry.target) and os.path.isdir(entry.target): | |
301 shutil.rmtree(entry.target) | |
302 copytree_with_merge(entry.resolved, entry.target) | |
303 else: | |
304 os.symlink(entry.resolved, entry.target) | |
305 elif stage_func is not None: | |
306 stage_func(entry.resolved, entry.target) | |
307 elif ( | |
308 entry.type == "Directory" | |
309 and not os.path.exists(entry.target) | |
310 and entry.resolved.startswith("_:") | |
311 ): | |
312 os.makedirs(entry.target) | |
313 elif entry.type == "WritableFile" and not ignore_writable: | |
314 shutil.copy(entry.resolved, entry.target) | |
315 ensure_writable(entry.target) | |
316 elif entry.type == "WritableDirectory" and not ignore_writable: | |
317 if entry.resolved.startswith("_:"): | |
318 os.makedirs(entry.target) | |
319 else: | |
320 shutil.copytree(entry.resolved, entry.target) | |
321 ensure_writable(entry.target) | |
322 elif entry.type == "CreateFile" or entry.type == "CreateWritableFile": | |
323 with open(entry.target, "wb") as new: | |
324 if secret_store is not None: | |
325 new.write( | |
326 cast(str, secret_store.retrieve(entry.resolved)).encode("utf-8") | |
327 ) | |
328 else: | |
329 new.write(entry.resolved.encode("utf-8")) | |
330 if entry.type == "CreateFile": | |
331 os.chmod(entry.target, stat.S_IRUSR) # Read only | |
332 else: # it is a "CreateWritableFile" | |
333 ensure_writable(entry.target) | |
334 pathmapper.update(key, entry.target, entry.target, entry.type, entry.staged) | |
335 | |
336 | |
337 def relocateOutputs( | |
338 outputObj: CWLObjectType, | |
339 destination_path: str, | |
340 source_directories: Set[str], | |
341 action: str, | |
342 fs_access: StdFsAccess, | |
343 compute_checksum: bool = True, | |
344 path_mapper: Type[PathMapper] = PathMapper, | |
345 ) -> CWLObjectType: | |
346 adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True)) | |
347 | |
348 if action not in ("move", "copy"): | |
349 return outputObj | |
350 | |
351 def _collectDirEntries( | |
352 obj: Union[CWLObjectType, MutableSequence[CWLObjectType], None] | |
353 ) -> Iterator[CWLObjectType]: | |
354 if isinstance(obj, dict): | |
355 if obj.get("class") in ("File", "Directory"): | |
356 yield obj | |
357 else: | |
358 for sub_obj in obj.values(): | |
359 yield from _collectDirEntries(sub_obj) | |
360 elif isinstance(obj, MutableSequence): | |
361 for sub_obj in obj: | |
362 yield from _collectDirEntries(sub_obj) | |
363 | |
364 def _relocate(src: str, dst: str) -> None: | |
365 if src == dst: | |
366 return | |
367 | |
368 # If the source is not contained in source_directories we're not allowed to delete it | |
369 src = fs_access.realpath(src) | |
370 src_can_deleted = any( | |
371 os.path.commonprefix([p, src]) == p for p in source_directories | |
372 ) | |
373 | |
374 _action = "move" if action == "move" and src_can_deleted else "copy" | |
375 | |
376 if _action == "move": | |
377 _logger.debug("Moving %s to %s", src, dst) | |
378 if fs_access.isdir(src) and fs_access.isdir(dst): | |
379 # merge directories | |
380 for dir_entry in scandir(src): | |
381 _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name)) | |
382 else: | |
383 shutil.move(src, dst) | |
384 | |
385 elif _action == "copy": | |
386 _logger.debug("Copying %s to %s", src, dst) | |
387 if fs_access.isdir(src): | |
388 if os.path.isdir(dst): | |
389 shutil.rmtree(dst) | |
390 elif os.path.isfile(dst): | |
391 os.unlink(dst) | |
392 shutil.copytree(src, dst) | |
393 else: | |
394 shutil.copy2(src, dst) | |
395 | |
396 def _realpath( | |
397 ob: CWLObjectType, | |
398 ) -> None: # should be type Union[CWLFile, CWLDirectory] | |
399 location = cast(str, ob["location"]) | |
400 if location.startswith("file:"): | |
401 ob["location"] = file_uri(os.path.realpath(uri_file_path(location))) | |
402 elif location.startswith("/"): | |
403 ob["location"] = os.path.realpath(location) | |
404 elif not location.startswith("_:") and ":" in location: | |
405 ob["location"] = file_uri(fs_access.realpath(location)) | |
406 | |
407 outfiles = list(_collectDirEntries(outputObj)) | |
408 visit_class(outfiles, ("File", "Directory"), _realpath) | |
409 pm = path_mapper(outfiles, "", destination_path, separateDirs=False) | |
410 stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True) | |
411 | |
412 def _check_adjust(a_file: CWLObjectType) -> CWLObjectType: | |
413 a_file["location"] = file_uri(pm.mapper(cast(str, a_file["location"]))[1]) | |
414 if "contents" in a_file: | |
415 del a_file["contents"] | |
416 return a_file | |
417 | |
418 visit_class(outputObj, ("File", "Directory"), _check_adjust) | |
419 | |
420 if compute_checksum: | |
421 visit_class( | |
422 outputObj, ("File",), functools.partial(compute_checksums, fs_access) | |
423 ) | |
424 return outputObj | |
425 | |
426 | |
427 def cleanIntermediate(output_dirs: Iterable[str]) -> None: | |
428 for a in output_dirs: | |
429 if os.path.exists(a): | |
430 _logger.debug("Removing intermediate output directory %s", a) | |
431 shutil.rmtree(a, True) | |
432 | |
433 | |
434 def add_sizes(fsaccess: StdFsAccess, obj: CWLObjectType) -> None: | |
435 if "location" in obj: | |
436 try: | |
437 if "size" not in obj: | |
438 obj["size"] = fsaccess.size(cast(str, obj["location"])) | |
439 except OSError: | |
440 pass | |
441 elif "contents" in obj: | |
442 obj["size"] = len(cast(Sized, obj["contents"])) | |
443 return # best effort | |
444 | |
445 | |
446 def fill_in_defaults( | |
447 inputs: List[CWLObjectType], | |
448 job: CWLObjectType, | |
449 fsaccess: StdFsAccess, | |
450 ) -> None: | |
451 for e, inp in enumerate(inputs): | |
452 with SourceLine( | |
453 inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG) | |
454 ): | |
455 fieldname = shortname(cast(str, inp["id"])) | |
456 if job.get(fieldname) is not None: | |
457 pass | |
458 elif job.get(fieldname) is None and "default" in inp: | |
459 job[fieldname] = copy.deepcopy(inp["default"]) | |
460 elif job.get(fieldname) is None and "null" in aslist(inp["type"]): | |
461 job[fieldname] = None | |
462 else: | |
463 raise WorkflowException( | |
464 "Missing required input parameter '%s'" | |
465 % shortname(cast(str, inp["id"])) | |
466 ) | |
467 | |
468 | |
469 def avroize_type( | |
470 field_type: Union[ | |
471 CWLObjectType, MutableSequence[CWLOutputType], CWLOutputType, None | |
472 ], | |
473 name_prefix: str = "", | |
474 ) -> None: | |
475 """Add missing information to a type so that CWL types are valid.""" | |
476 if isinstance(field_type, MutableSequence): | |
477 for field in field_type: | |
478 avroize_type(field, name_prefix) | |
479 elif isinstance(field_type, MutableMapping): | |
480 if field_type["type"] in ("enum", "record"): | |
481 if "name" not in field_type: | |
482 field_type["name"] = name_prefix + str(uuid.uuid4()) | |
483 if field_type["type"] == "record": | |
484 avroize_type( | |
485 cast(MutableSequence[CWLOutputType], field_type["fields"]), name_prefix | |
486 ) | |
487 if field_type["type"] == "array": | |
488 avroize_type( | |
489 cast(MutableSequence[CWLOutputType], field_type["items"]), name_prefix | |
490 ) | |
491 if isinstance(field_type["type"], MutableSequence): | |
492 for ctype in field_type["type"]: | |
493 avroize_type(cast(CWLOutputType, ctype), name_prefix) | |
494 | |
495 | |
496 def get_overrides( | |
497 overrides: MutableSequence[CWLObjectType], toolid: str | |
498 ) -> CWLObjectType: | |
499 req = {} # type: CWLObjectType | |
500 if not isinstance(overrides, MutableSequence): | |
501 raise ValidationException( | |
502 "Expected overrides to be a list, but was %s" % type(overrides) | |
503 ) | |
504 for ov in overrides: | |
505 if ov["overrideTarget"] == toolid: | |
506 req.update(ov) | |
507 return req | |
508 | |
509 | |
510 _VAR_SPOOL_ERROR = textwrap.dedent( | |
511 """ | |
512 Non-portable reference to /var/spool/cwl detected: '{}'. | |
513 To fix, replace /var/spool/cwl with $(runtime.outdir) or add | |
514 DockerRequirement to the 'requirements' section and declare | |
515 'dockerOutputDirectory: /var/spool/cwl'. | |
516 """ | |
517 ) | |
518 | |
519 | |
520 def var_spool_cwl_detector( | |
521 obj: CWLOutputType, | |
522 item: Optional[Any] = None, | |
523 obj_key: Optional[Any] = None, | |
524 ) -> bool: | |
525 """Detect any textual reference to /var/spool/cwl.""" | |
526 r = False | |
527 if isinstance(obj, str): | |
528 if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory": | |
529 _logger.warning( | |
530 SourceLine(item=item, key=obj_key, raise_type=str).makeError( | |
531 _VAR_SPOOL_ERROR.format(obj) | |
532 ) | |
533 ) | |
534 r = True | |
535 elif isinstance(obj, MutableMapping): | |
536 for mkey, mvalue in obj.items(): | |
537 r = var_spool_cwl_detector(cast(CWLOutputType, mvalue), obj, mkey) or r | |
538 elif isinstance(obj, MutableSequence): | |
539 for lkey, lvalue in enumerate(obj): | |
540 r = var_spool_cwl_detector(cast(CWLOutputType, lvalue), obj, lkey) or r | |
541 return r | |
542 | |
543 | |
544 def eval_resource( | |
545 builder: Builder, resource_req: Union[str, int, float] | |
546 ) -> Optional[Union[str, int, float]]: | |
547 if isinstance(resource_req, str) and expression.needs_parsing(resource_req): | |
548 result = builder.do_eval(resource_req) | |
549 if isinstance(result, (str, int)) or result is None: | |
550 return result | |
551 raise WorkflowException( | |
552 "Got incorrect return type {} from resource expression evaluation of {}.".format( | |
553 type(result), resource_req | |
554 ) | |
555 ) | |
556 return resource_req | |
557 | |
558 | |
559 # Threshold where the "too many files" warning kicks in | |
560 FILE_COUNT_WARNING = 5000 | |
561 | |
562 | |
563 class Process(HasReqsHints, metaclass=abc.ABCMeta): | |
564 def __init__( | |
565 self, toolpath_object: CommentedMap, loadingContext: LoadingContext | |
566 ) -> None: | |
567 """Build a Process object from the provided dictionary.""" | |
568 super().__init__() | |
569 self.metadata = getdefault(loadingContext.metadata, {}) # type: CWLObjectType | |
570 self.provenance_object = None # type: Optional[ProvenanceProfile] | |
571 self.parent_wf = None # type: Optional[ProvenanceProfile] | |
572 global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement | |
573 if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None: | |
574 get_schema("v1.0") | |
575 SCHEMA_ANY = cast( | |
576 CWLObjectType, | |
577 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"], | |
578 ) | |
579 SCHEMA_FILE = cast( | |
580 CWLObjectType, | |
581 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"], | |
582 ) | |
583 SCHEMA_DIR = cast( | |
584 CWLObjectType, | |
585 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"], | |
586 ) | |
587 | |
588 self.names = make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], Loader({})) | |
589 self.tool = toolpath_object | |
590 self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, [])) | |
591 self.requirements.extend(self.tool.get("requirements", [])) | |
592 if "id" not in self.tool: | |
593 self.tool["id"] = "_:" + str(uuid.uuid4()) | |
594 self.requirements.extend( | |
595 cast( | |
596 List[CWLObjectType], | |
597 get_overrides( | |
598 getdefault(loadingContext.overrides_list, []), self.tool["id"] | |
599 ).get("requirements", []), | |
600 ) | |
601 ) | |
602 self.hints = copy.deepcopy(getdefault(loadingContext.hints, [])) | |
603 self.hints.extend(self.tool.get("hints", [])) | |
604 # Versions of requirements and hints which aren't mutated. | |
605 self.original_requirements = copy.deepcopy(self.requirements) | |
606 self.original_hints = copy.deepcopy(self.hints) | |
607 self.doc_loader = loadingContext.loader | |
608 self.doc_schema = loadingContext.avsc_names | |
609 | |
610 self.formatgraph = None # type: Optional[Graph] | |
611 if self.doc_loader is not None: | |
612 self.formatgraph = self.doc_loader.graph | |
613 | |
614 checkRequirements(self.tool, supportedProcessRequirements) | |
615 self.validate_hints( | |
616 cast(Names, loadingContext.avsc_names), | |
617 self.tool.get("hints", []), | |
618 strict=getdefault(loadingContext.strict, False), | |
619 ) | |
620 | |
621 self.schemaDefs = {} # type: MutableMapping[str, CWLObjectType] | |
622 | |
623 sd, _ = self.get_requirement("SchemaDefRequirement") | |
624 | |
625 if sd is not None: | |
626 sdtypes = cast(MutableSequence[CWLObjectType], sd["types"]) | |
627 avroize_type(cast(MutableSequence[CWLOutputType], sdtypes)) | |
628 av = make_valid_avro( | |
629 sdtypes, | |
630 {cast(str, t["name"]): cast(Dict[str, Any], t) for t in sdtypes}, | |
631 set(), | |
632 ) | |
633 for i in av: | |
634 self.schemaDefs[i["name"]] = i # type: ignore | |
635 make_avsc_object(convert_to_dict(av), self.names) | |
636 | |
637 # Build record schema from inputs | |
638 self.inputs_record_schema = { | |
639 "name": "input_record_schema", | |
640 "type": "record", | |
641 "fields": [], | |
642 } # type: CWLObjectType | |
643 self.outputs_record_schema = { | |
644 "name": "outputs_record_schema", | |
645 "type": "record", | |
646 "fields": [], | |
647 } # type: CWLObjectType | |
648 | |
649 for key in ("inputs", "outputs"): | |
650 for i in self.tool[key]: | |
651 c = copy.deepcopy(i) | |
652 c["name"] = shortname(c["id"]) | |
653 del c["id"] | |
654 | |
655 if "type" not in c: | |
656 raise ValidationException( | |
657 "Missing 'type' in parameter '{}'".format(c["name"]) | |
658 ) | |
659 | |
660 if "default" in c and "null" not in aslist(c["type"]): | |
661 nullable = ["null"] | |
662 nullable.extend(aslist(c["type"])) | |
663 c["type"] = nullable | |
664 else: | |
665 c["type"] = c["type"] | |
666 avroize_type(c["type"], c["name"]) | |
667 if key == "inputs": | |
668 cast( | |
669 List[CWLObjectType], self.inputs_record_schema["fields"] | |
670 ).append(c) | |
671 elif key == "outputs": | |
672 cast( | |
673 List[CWLObjectType], self.outputs_record_schema["fields"] | |
674 ).append(c) | |
675 | |
676 with SourceLine(toolpath_object, "inputs", ValidationException): | |
677 self.inputs_record_schema = cast( | |
678 CWLObjectType, | |
679 make_valid_avro(self.inputs_record_schema, {}, set()), | |
680 ) | |
681 make_avsc_object(convert_to_dict(self.inputs_record_schema), self.names) | |
682 with SourceLine(toolpath_object, "outputs", ValidationException): | |
683 self.outputs_record_schema = cast( | |
684 CWLObjectType, | |
685 make_valid_avro(self.outputs_record_schema, {}, set()), | |
686 ) | |
687 make_avsc_object(convert_to_dict(self.outputs_record_schema), self.names) | |
688 | |
689 if toolpath_object.get("class") is not None and not getdefault( | |
690 loadingContext.disable_js_validation, False | |
691 ): | |
692 validate_js_options = ( | |
693 None | |
694 ) # type: Optional[Dict[str, Union[List[str], str, int]]] | |
695 if loadingContext.js_hint_options_file is not None: | |
696 try: | |
697 with open(loadingContext.js_hint_options_file) as options_file: | |
698 validate_js_options = json.load(options_file) | |
699 except (OSError, ValueError): | |
700 _logger.error( | |
701 "Failed to read options file %s", | |
702 loadingContext.js_hint_options_file, | |
703 ) | |
704 raise | |
705 if self.doc_schema is not None: | |
706 validate_js_expressions( | |
707 toolpath_object, | |
708 self.doc_schema.names[toolpath_object["class"]], | |
709 validate_js_options, | |
710 ) | |
711 | |
712 dockerReq, is_req = self.get_requirement("DockerRequirement") | |
713 | |
714 if ( | |
715 dockerReq is not None | |
716 and "dockerOutputDirectory" in dockerReq | |
717 and is_req is not None | |
718 and not is_req | |
719 ): | |
720 _logger.warning( | |
721 SourceLine(item=dockerReq, raise_type=str).makeError( | |
722 "When 'dockerOutputDirectory' is declared, DockerRequirement " | |
723 "should go in the 'requirements' section, not 'hints'." | |
724 "" | |
725 ) | |
726 ) | |
727 | |
728 if ( | |
729 dockerReq is not None | |
730 and is_req is not None | |
731 and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl" | |
732 ): | |
733 if is_req: | |
734 # In this specific case, it is legal to have /var/spool/cwl, so skip the check. | |
735 pass | |
736 else: | |
737 # Must be a requirement | |
738 var_spool_cwl_detector(self.tool) | |
739 else: | |
740 var_spool_cwl_detector(self.tool) | |
741 | |
742 def _init_job( | |
743 self, joborder: CWLObjectType, runtime_context: RuntimeContext | |
744 ) -> Builder: | |
745 | |
746 if self.metadata.get("cwlVersion") != INTERNAL_VERSION: | |
747 raise WorkflowException( | |
748 "Process object loaded with version '%s', must update to '%s' in order to execute." | |
749 % (self.metadata.get("cwlVersion"), INTERNAL_VERSION) | |
750 ) | |
751 | |
752 job = copy.deepcopy(joborder) | |
753 | |
754 make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) | |
755 fs_access = make_fs_access(runtime_context.basedir) | |
756 | |
757 load_listing_req, _ = self.get_requirement("LoadListingRequirement") | |
758 | |
759 load_listing = ( | |
760 cast(str, load_listing_req.get("loadListing")) | |
761 if load_listing_req is not None | |
762 else "no_listing" | |
763 ) | |
764 | |
765 # Validate job order | |
766 try: | |
767 fill_in_defaults(self.tool["inputs"], job, fs_access) | |
768 | |
769 normalizeFilesDirs(job) | |
770 schema = self.names.get_name("input_record_schema", None) | |
771 if schema is None: | |
772 raise WorkflowException( | |
773 "Missing input record schema: " "{}".format(self.names) | |
774 ) | |
775 validate_ex(schema, job, strict=False, logger=_logger_validation_warnings) | |
776 | |
777 if load_listing and load_listing != "no_listing": | |
778 get_listing(fs_access, job, recursive=(load_listing == "deep_listing")) | |
779 | |
780 visit_class(job, ("File",), functools.partial(add_sizes, fs_access)) | |
781 | |
782 if load_listing == "deep_listing": | |
783 for i, inparm in enumerate(self.tool["inputs"]): | |
784 k = shortname(inparm["id"]) | |
785 if k not in job: | |
786 continue | |
787 v = job[k] | |
788 dircount = [0] | |
789 | |
790 def inc(d): # type: (List[int]) -> None | |
791 d[0] += 1 | |
792 | |
793 visit_class(v, ("Directory",), lambda x: inc(dircount)) | |
794 if dircount[0] == 0: | |
795 continue | |
796 filecount = [0] | |
797 visit_class(v, ("File",), lambda x: inc(filecount)) | |
798 if filecount[0] > FILE_COUNT_WARNING: | |
799 # Long lines in this message are okay, will be reflowed based on terminal columns. | |
800 _logger.warning( | |
801 strip_dup_lineno( | |
802 SourceLine(self.tool["inputs"], i, str).makeError( | |
803 """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use. | |
804 | |
805 If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior: | |
806 | |
807 $namespaces: | |
808 cwltool: "http://commonwl.org/cwltool#" | |
809 hints: | |
810 cwltool:LoadListingRequirement: | |
811 loadListing: shallow_listing | |
812 | |
813 """ | |
814 % (filecount[0], k) | |
815 ) | |
816 ) | |
817 ) | |
818 | |
819 except (ValidationException, WorkflowException) as err: | |
820 raise WorkflowException("Invalid job input record:\n" + str(err)) from err | |
821 | |
822 files = [] # type: List[CWLObjectType] | |
823 bindings = CommentedSeq() | |
824 outdir = "" | |
825 tmpdir = "" | |
826 stagedir = "" | |
827 | |
828 docker_req, _ = self.get_requirement("DockerRequirement") | |
829 default_docker = None | |
830 | |
831 if docker_req is None and runtime_context.default_container: | |
832 default_docker = runtime_context.default_container | |
833 | |
834 if (docker_req or default_docker) and runtime_context.use_container: | |
835 if docker_req is not None: | |
836 # Check if docker output directory is absolute | |
837 if docker_req.get("dockerOutputDirectory") and cast( | |
838 str, docker_req.get("dockerOutputDirectory") | |
839 ).startswith("/"): | |
840 outdir = cast(str, docker_req.get("dockerOutputDirectory")) | |
841 else: | |
842 outdir = cast( | |
843 str, | |
844 docker_req.get("dockerOutputDirectory") | |
845 or runtime_context.docker_outdir | |
846 or random_outdir(), | |
847 ) | |
848 elif default_docker is not None: | |
849 outdir = runtime_context.docker_outdir or random_outdir() | |
850 tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec | |
851 stagedir = runtime_context.docker_stagedir or "/var/lib/cwl" | |
852 else: | |
853 if self.tool["class"] == "CommandLineTool": | |
854 outdir = fs_access.realpath(runtime_context.get_outdir()) | |
855 tmpdir = fs_access.realpath(runtime_context.get_tmpdir()) | |
856 stagedir = fs_access.realpath(runtime_context.get_stagedir()) | |
857 | |
858 cwl_version = cast( | |
859 str, | |
860 self.metadata.get("http://commonwl.org/cwltool#original_cwlVersion", None), | |
861 ) | |
862 builder = Builder( | |
863 job, | |
864 files, | |
865 bindings, | |
866 self.schemaDefs, | |
867 self.names, | |
868 self.requirements, | |
869 self.hints, | |
870 {}, | |
871 runtime_context.mutation_manager, | |
872 self.formatgraph, | |
873 make_fs_access, | |
874 fs_access, | |
875 runtime_context.job_script_provider, | |
876 runtime_context.eval_timeout, | |
877 runtime_context.debug, | |
878 runtime_context.js_console, | |
879 runtime_context.force_docker_pull, | |
880 load_listing, | |
881 outdir, | |
882 tmpdir, | |
883 stagedir, | |
884 cwl_version, | |
885 ) | |
886 | |
887 bindings.extend( | |
888 builder.bind_input( | |
889 self.inputs_record_schema, | |
890 job, | |
891 discover_secondaryFiles=getdefault(runtime_context.toplevel, False), | |
892 ) | |
893 ) | |
894 | |
895 if self.tool.get("baseCommand"): | |
896 for index, command in enumerate(aslist(self.tool["baseCommand"])): | |
897 bindings.append({"position": [-1000000, index], "datum": command}) | |
898 | |
899 if self.tool.get("arguments"): | |
900 for i, arg in enumerate(self.tool["arguments"]): | |
901 lc = self.tool["arguments"].lc.data[i] | |
902 filename = self.tool["arguments"].lc.filename | |
903 bindings.lc.add_kv_line_col(len(bindings), lc) | |
904 if isinstance(arg, MutableMapping): | |
905 arg = copy.deepcopy(arg) | |
906 if arg.get("position"): | |
907 position = arg.get("position") | |
908 if isinstance(position, str): # no need to test the | |
909 # CWLVersion as the v1.0 | |
910 # schema only allows ints | |
911 position = builder.do_eval(position) | |
912 if position is None: | |
913 position = 0 | |
914 arg["position"] = [position, i] | |
915 else: | |
916 arg["position"] = [0, i] | |
917 bindings.append(arg) | |
918 elif ("$(" in arg) or ("${" in arg): | |
919 cm = CommentedMap((("position", [0, i]), ("valueFrom", arg))) | |
920 cm.lc.add_kv_line_col("valueFrom", lc) | |
921 cm.lc.filename = filename | |
922 bindings.append(cm) | |
923 else: | |
924 cm = CommentedMap((("position", [0, i]), ("datum", arg))) | |
925 cm.lc.add_kv_line_col("datum", lc) | |
926 cm.lc.filename = filename | |
927 bindings.append(cm) | |
928 | |
929 # use python2 like sorting of heterogeneous lists | |
930 # (containing str and int types), | |
931 key = functools.cmp_to_key(cmp_like_py2) | |
932 | |
933 # This awkward construction replaces the contents of | |
934 # "bindings" in place (because Builder expects it to be | |
935 # mutated in place, sigh, I'm sorry) with its contents sorted, | |
936 # supporting different versions of Python and ruamel.yaml with | |
937 # different behaviors/bugs in CommentedSeq. | |
938 bindings_copy = copy.deepcopy(bindings) | |
939 del bindings[:] | |
940 bindings.extend(sorted(bindings_copy, key=key)) | |
941 | |
942 if self.tool["class"] != "Workflow": | |
943 builder.resources = self.evalResources(builder, runtime_context) | |
944 return builder | |
945 | |
946 def evalResources( | |
947 self, builder: Builder, runtimeContext: RuntimeContext | |
948 ) -> Dict[str, Union[int, float, str]]: | |
949 resourceReq, _ = self.get_requirement("ResourceRequirement") | |
950 if resourceReq is None: | |
951 resourceReq = {} | |
952 cwl_version = self.metadata.get( | |
953 "http://commonwl.org/cwltool#original_cwlVersion", None | |
954 ) | |
955 if cwl_version == "v1.0": | |
956 ram = 1024 | |
957 else: | |
958 ram = 256 | |
959 request: Dict[str, Union[int, float, str]] = { | |
960 "coresMin": 1, | |
961 "coresMax": 1, | |
962 "ramMin": ram, | |
963 "ramMax": ram, | |
964 "tmpdirMin": 1024, | |
965 "tmpdirMax": 1024, | |
966 "outdirMin": 1024, | |
967 "outdirMax": 1024, | |
968 } | |
969 for a in ("cores", "ram", "tmpdir", "outdir"): | |
970 mn = mx = None # type: Optional[Union[int, float]] | |
971 if resourceReq.get(a + "Min"): | |
972 mn = cast( | |
973 Union[int, float], | |
974 eval_resource( | |
975 builder, cast(Union[str, int, float], resourceReq[a + "Min"]) | |
976 ), | |
977 ) | |
978 if resourceReq.get(a + "Max"): | |
979 mx = cast( | |
980 Union[int, float], | |
981 eval_resource( | |
982 builder, cast(Union[str, int, float], resourceReq[a + "Max"]) | |
983 ), | |
984 ) | |
985 if mn is None: | |
986 mn = mx | |
987 elif mx is None: | |
988 mx = mn | |
989 | |
990 if mn is not None: | |
991 request[a + "Min"] = mn | |
992 request[a + "Max"] = cast(Union[int, float], mx) | |
993 | |
994 if runtimeContext.select_resources is not None: | |
995 return runtimeContext.select_resources(request, runtimeContext) | |
996 return { | |
997 "cores": request["coresMin"], | |
998 "ram": math.ceil(request["ramMin"]) | |
999 if not isinstance(request["ramMin"], str) | |
1000 else request["ramMin"], | |
1001 "tmpdirSize": math.ceil(request["tmpdirMin"]) | |
1002 if not isinstance(request["tmpdirMin"], str) | |
1003 else request["tmpdirMin"], | |
1004 "outdirSize": math.ceil(request["outdirMin"]) | |
1005 if not isinstance(request["outdirMin"], str) | |
1006 else request["outdirMin"], | |
1007 } | |
1008 | |
1009 def validate_hints( | |
1010 self, avsc_names: Names, hints: List[CWLObjectType], strict: bool | |
1011 ) -> None: | |
1012 for i, r in enumerate(hints): | |
1013 sl = SourceLine(hints, i, ValidationException) | |
1014 with sl: | |
1015 if ( | |
1016 avsc_names.get_name(cast(str, r["class"]), None) is not None | |
1017 and self.doc_loader is not None | |
1018 ): | |
1019 plain_hint = { | |
1020 key: r[key] | |
1021 for key in r | |
1022 if key not in self.doc_loader.identifiers | |
1023 } # strip identifiers | |
1024 validate_ex( | |
1025 cast( | |
1026 Schema, | |
1027 avsc_names.get_name(cast(str, plain_hint["class"]), None), | |
1028 ), | |
1029 plain_hint, | |
1030 strict=strict, | |
1031 ) | |
1032 elif r["class"] in ("NetworkAccess", "LoadListingRequirement"): | |
1033 pass | |
1034 else: | |
1035 _logger.info(str(sl.makeError("Unknown hint %s" % (r["class"])))) | |
1036 | |
1037 def visit(self, op: Callable[[CommentedMap], None]) -> None: | |
1038 op(self.tool) | |
1039 | |
1040 @abc.abstractmethod | |
1041 def job( | |
1042 self, | |
1043 job_order: CWLObjectType, | |
1044 output_callbacks: Optional[OutputCallbackType], | |
1045 runtimeContext: RuntimeContext, | |
1046 ) -> JobsGeneratorType: | |
1047 pass | |
1048 | |
1049 | |
1050 _names = set() # type: Set[str] | |
1051 | |
1052 | |
1053 def uniquename(stem: str, names: Optional[Set[str]] = None) -> str: | |
1054 global _names | |
1055 if names is None: | |
1056 names = _names | |
1057 c = 1 | |
1058 u = stem | |
1059 while u in names: | |
1060 c += 1 | |
1061 u = f"{stem}_{c}" | |
1062 names.add(u) | |
1063 return u | |
1064 | |
1065 | |
1066 def nestdir(base: str, deps: CWLObjectType) -> CWLObjectType: | |
1067 dirname = os.path.dirname(base) + "/" | |
1068 subid = cast(str, deps["location"]) | |
1069 if subid.startswith(dirname): | |
1070 s2 = subid[len(dirname) :] | |
1071 sp = s2.split("/") | |
1072 sp.pop() | |
1073 while sp: | |
1074 nx = sp.pop() | |
1075 deps = {"class": "Directory", "basename": nx, "listing": [deps]} | |
1076 return deps | |
1077 | |
1078 | |
1079 def mergedirs(listing: List[CWLObjectType]) -> List[CWLObjectType]: | |
1080 r = [] # type: List[CWLObjectType] | |
1081 ents = {} # type: Dict[str, CWLObjectType] | |
1082 collided = set() # type: Set[str] | |
1083 for e in listing: | |
1084 basename = cast(str, e["basename"]) | |
1085 if basename not in ents: | |
1086 ents[basename] = e | |
1087 elif e["class"] == "Directory": | |
1088 if e.get("listing"): | |
1089 cast( | |
1090 List[CWLObjectType], ents[basename].setdefault("listing", []) | |
1091 ).extend(cast(List[CWLObjectType], e["listing"])) | |
1092 if cast(str, ents[basename]["location"]).startswith("_:"): | |
1093 ents[basename]["location"] = e["location"] | |
1094 elif e["location"] != ents[basename]["location"]: | |
1095 # same basename, different location, collision, | |
1096 # rename both. | |
1097 collided.add(basename) | |
1098 e2 = ents[basename] | |
1099 | |
1100 e["basename"] = urllib.parse.quote(cast(str, e["location"]), safe="") | |
1101 e2["basename"] = urllib.parse.quote(cast(str, e2["location"]), safe="") | |
1102 | |
1103 e["nameroot"], e["nameext"] = os.path.splitext(cast(str, e["basename"])) | |
1104 e2["nameroot"], e2["nameext"] = os.path.splitext(cast(str, e2["basename"])) | |
1105 | |
1106 ents[cast(str, e["basename"])] = e | |
1107 ents[cast(str, e2["basename"])] = e2 | |
1108 for c in collided: | |
1109 del ents[c] | |
1110 for e in ents.values(): | |
1111 if e["class"] == "Directory" and "listing" in e: | |
1112 e["listing"] = cast( | |
1113 MutableSequence[CWLOutputAtomType], | |
1114 mergedirs(cast(List[CWLObjectType], e["listing"])), | |
1115 ) | |
1116 r.extend(ents.values()) | |
1117 return r | |
1118 | |
1119 | |
1120 CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl" | |
1121 | |
1122 | |
1123 def scandeps( | |
1124 base: str, | |
1125 doc: Union[CWLObjectType, MutableSequence[CWLObjectType]], | |
1126 reffields: Set[str], | |
1127 urlfields: Set[str], | |
1128 loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]], | |
1129 urljoin: Callable[[str, str], str] = urllib.parse.urljoin, | |
1130 nestdirs: bool = True, | |
1131 ) -> MutableSequence[CWLObjectType]: | |
1132 r = [] # type: MutableSequence[CWLObjectType] | |
1133 if isinstance(doc, MutableMapping): | |
1134 if "id" in doc: | |
1135 if cast(str, doc["id"]).startswith("file://"): | |
1136 df, _ = urllib.parse.urldefrag(cast(str, doc["id"])) | |
1137 if base != df: | |
1138 r.append({"class": "File", "location": df, "format": CWL_IANA}) | |
1139 base = df | |
1140 | |
1141 if doc.get("class") in ("File", "Directory") and "location" in urlfields: | |
1142 u = cast(Optional[str], doc.get("location", doc.get("path"))) | |
1143 if u and not u.startswith("_:"): | |
1144 deps = { | |
1145 "class": doc["class"], | |
1146 "location": urljoin(base, u), | |
1147 } # type: CWLObjectType | |
1148 if "basename" in doc: | |
1149 deps["basename"] = doc["basename"] | |
1150 if doc["class"] == "Directory" and "listing" in doc: | |
1151 deps["listing"] = doc["listing"] | |
1152 if doc["class"] == "File" and "secondaryFiles" in doc: | |
1153 deps["secondaryFiles"] = cast( | |
1154 CWLOutputAtomType, | |
1155 scandeps( | |
1156 base, | |
1157 cast( | |
1158 Union[CWLObjectType, MutableSequence[CWLObjectType]], | |
1159 doc["secondaryFiles"], | |
1160 ), | |
1161 reffields, | |
1162 urlfields, | |
1163 loadref, | |
1164 urljoin=urljoin, | |
1165 nestdirs=nestdirs, | |
1166 ), | |
1167 ) | |
1168 if nestdirs: | |
1169 deps = nestdir(base, deps) | |
1170 r.append(deps) | |
1171 else: | |
1172 if doc["class"] == "Directory" and "listing" in doc: | |
1173 r.extend( | |
1174 scandeps( | |
1175 base, | |
1176 cast(MutableSequence[CWLObjectType], doc["listing"]), | |
1177 reffields, | |
1178 urlfields, | |
1179 loadref, | |
1180 urljoin=urljoin, | |
1181 nestdirs=nestdirs, | |
1182 ) | |
1183 ) | |
1184 elif doc["class"] == "File" and "secondaryFiles" in doc: | |
1185 r.extend( | |
1186 scandeps( | |
1187 base, | |
1188 cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]), | |
1189 reffields, | |
1190 urlfields, | |
1191 loadref, | |
1192 urljoin=urljoin, | |
1193 nestdirs=nestdirs, | |
1194 ) | |
1195 ) | |
1196 | |
1197 for k, v in doc.items(): | |
1198 if k in reffields: | |
1199 for u2 in aslist(v): | |
1200 if isinstance(u2, MutableMapping): | |
1201 r.extend( | |
1202 scandeps( | |
1203 base, | |
1204 u2, | |
1205 reffields, | |
1206 urlfields, | |
1207 loadref, | |
1208 urljoin=urljoin, | |
1209 nestdirs=nestdirs, | |
1210 ) | |
1211 ) | |
1212 else: | |
1213 subid = urljoin(base, u2) | |
1214 basedf, _ = urllib.parse.urldefrag(base) | |
1215 subiddf, _ = urllib.parse.urldefrag(subid) | |
1216 if basedf == subiddf: | |
1217 continue | |
1218 sub = cast( | |
1219 Union[MutableSequence[CWLObjectType], CWLObjectType], | |
1220 loadref(base, u2), | |
1221 ) | |
1222 deps2 = { | |
1223 "class": "File", | |
1224 "location": subid, | |
1225 "format": CWL_IANA, | |
1226 } # type: CWLObjectType | |
1227 sf = scandeps( | |
1228 subid, | |
1229 sub, | |
1230 reffields, | |
1231 urlfields, | |
1232 loadref, | |
1233 urljoin=urljoin, | |
1234 nestdirs=nestdirs, | |
1235 ) | |
1236 if sf: | |
1237 deps2["secondaryFiles"] = cast( | |
1238 MutableSequence[CWLOutputAtomType], sf | |
1239 ) | |
1240 if nestdirs: | |
1241 deps2 = nestdir(base, deps2) | |
1242 r.append(deps2) | |
1243 elif k in urlfields and k != "location": | |
1244 for u3 in aslist(v): | |
1245 deps = {"class": "File", "location": urljoin(base, u3)} | |
1246 if nestdirs: | |
1247 deps = nestdir(base, deps) | |
1248 r.append(deps) | |
1249 elif doc.get("class") in ("File", "Directory") and k in ( | |
1250 "listing", | |
1251 "secondaryFiles", | |
1252 ): | |
1253 # should be handled earlier. | |
1254 pass | |
1255 else: | |
1256 r.extend( | |
1257 scandeps( | |
1258 base, | |
1259 cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v), | |
1260 reffields, | |
1261 urlfields, | |
1262 loadref, | |
1263 urljoin=urljoin, | |
1264 nestdirs=nestdirs, | |
1265 ) | |
1266 ) | |
1267 elif isinstance(doc, MutableSequence): | |
1268 for d in doc: | |
1269 r.extend( | |
1270 scandeps( | |
1271 base, | |
1272 d, | |
1273 reffields, | |
1274 urlfields, | |
1275 loadref, | |
1276 urljoin=urljoin, | |
1277 nestdirs=nestdirs, | |
1278 ) | |
1279 ) | |
1280 | |
1281 if r: | |
1282 normalizeFilesDirs(r) | |
1283 r = mergedirs(cast(List[CWLObjectType], r)) | |
1284 | |
1285 return r | |
1286 | |
1287 | |
1288 def compute_checksums(fs_access: StdFsAccess, fileobj: CWLObjectType) -> None: | |
1289 if "checksum" not in fileobj: | |
1290 checksum = hashlib.sha1() # nosec | |
1291 location = cast(str, fileobj["location"]) | |
1292 with fs_access.open(location, "rb") as f: | |
1293 contents = f.read(1024 * 1024) | |
1294 while contents != b"": | |
1295 checksum.update(contents) | |
1296 contents = f.read(1024 * 1024) | |
1297 fileobj["checksum"] = "sha1$%s" % checksum.hexdigest() | |
1298 fileobj["size"] = fs_access.size(location) |