Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/process.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """Classes and methods relevant for all CWL Proccess types.""" | |
| 2 import abc | |
| 3 import copy | |
| 4 import functools | |
| 5 import hashlib | |
| 6 import json | |
| 7 import logging | |
| 8 import math | |
| 9 import os | |
| 10 import shutil | |
| 11 import stat | |
| 12 import textwrap | |
| 13 import urllib | |
| 14 import uuid | |
| 15 from os import scandir | |
| 16 from typing import ( | |
| 17 Any, | |
| 18 Callable, | |
| 19 Dict, | |
| 20 Iterable, | |
| 21 Iterator, | |
| 22 List, | |
| 23 MutableMapping, | |
| 24 MutableSequence, | |
| 25 Optional, | |
| 26 Set, | |
| 27 Sized, | |
| 28 Tuple, | |
| 29 Type, | |
| 30 Union, | |
| 31 cast, | |
| 32 ) | |
| 33 | |
| 34 from pkg_resources import resource_stream | |
| 35 from rdflib import Graph | |
| 36 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
| 37 from schema_salad.avro.schema import ( | |
| 38 Names, | |
| 39 Schema, | |
| 40 SchemaParseException, | |
| 41 make_avsc_object, | |
| 42 ) | |
| 43 from schema_salad.exceptions import ValidationException | |
| 44 from schema_salad.ref_resolver import Loader, file_uri, uri_file_path | |
| 45 from schema_salad.schema import load_schema, make_avro_schema, make_valid_avro | |
| 46 from schema_salad.sourceline import SourceLine, strip_dup_lineno | |
| 47 from schema_salad.utils import convert_to_dict | |
| 48 from schema_salad.validate import validate_ex | |
| 49 from typing_extensions import TYPE_CHECKING | |
| 50 | |
| 51 from . import expression | |
| 52 from .builder import Builder, HasReqsHints | |
| 53 from .context import LoadingContext, RuntimeContext, getdefault | |
| 54 from .errors import UnsupportedRequirement, WorkflowException | |
| 55 from .loghandler import _logger | |
| 56 from .mpi import MPIRequirementName | |
| 57 from .pathmapper import MapperEnt, PathMapper | |
| 58 from .secrets import SecretStore | |
| 59 from .stdfsaccess import StdFsAccess | |
| 60 from .update import INTERNAL_VERSION | |
| 61 from .utils import ( | |
| 62 CWLObjectType, | |
| 63 CWLOutputAtomType, | |
| 64 CWLOutputType, | |
| 65 JobsGeneratorType, | |
| 66 OutputCallbackType, | |
| 67 adjustDirObjs, | |
| 68 aslist, | |
| 69 cmp_like_py2, | |
| 70 copytree_with_merge, | |
| 71 ensure_writable, | |
| 72 get_listing, | |
| 73 normalizeFilesDirs, | |
| 74 onWindows, | |
| 75 random_outdir, | |
| 76 visit_class, | |
| 77 ) | |
| 78 from .validate_js import validate_js_expressions | |
| 79 | |
| 80 if TYPE_CHECKING: | |
| 81 from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import | |
| 82 | |
| 83 | |
| 84 class LogAsDebugFilter(logging.Filter): | |
| 85 def __init__(self, name: str, parent: logging.Logger) -> None: | |
| 86 """Initialize.""" | |
| 87 name = str(name) | |
| 88 super().__init__(name) | |
| 89 self.parent = parent | |
| 90 | |
| 91 def filter(self, record: logging.LogRecord) -> bool: | |
| 92 return self.parent.isEnabledFor(logging.DEBUG) | |
| 93 | |
| 94 | |
| 95 _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") | |
| 96 _logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) | |
| 97 _logger_validation_warnings.addFilter( | |
| 98 LogAsDebugFilter("cwltool.validation_warnings", _logger) | |
| 99 ) | |
| 100 | |
| 101 supportedProcessRequirements = [ | |
| 102 "DockerRequirement", | |
| 103 "SchemaDefRequirement", | |
| 104 "EnvVarRequirement", | |
| 105 "ScatterFeatureRequirement", | |
| 106 "SubworkflowFeatureRequirement", | |
| 107 "MultipleInputFeatureRequirement", | |
| 108 "InlineJavascriptRequirement", | |
| 109 "ShellCommandRequirement", | |
| 110 "StepInputExpressionRequirement", | |
| 111 "ResourceRequirement", | |
| 112 "InitialWorkDirRequirement", | |
| 113 "ToolTimeLimit", | |
| 114 "WorkReuse", | |
| 115 "NetworkAccess", | |
| 116 "InplaceUpdateRequirement", | |
| 117 "LoadListingRequirement", | |
| 118 MPIRequirementName, | |
| 119 "http://commonwl.org/cwltool#TimeLimit", | |
| 120 "http://commonwl.org/cwltool#WorkReuse", | |
| 121 "http://commonwl.org/cwltool#NetworkAccess", | |
| 122 "http://commonwl.org/cwltool#LoadListingRequirement", | |
| 123 "http://commonwl.org/cwltool#InplaceUpdateRequirement", | |
| 124 ] | |
| 125 | |
| 126 cwl_files = ( | |
| 127 "Workflow.yml", | |
| 128 "CommandLineTool.yml", | |
| 129 "CommonWorkflowLanguage.yml", | |
| 130 "Process.yml", | |
| 131 "Operation.yml", | |
| 132 "concepts.md", | |
| 133 "contrib.md", | |
| 134 "intro.md", | |
| 135 "invocation.md", | |
| 136 ) | |
| 137 | |
| 138 salad_files = ( | |
| 139 "metaschema.yml", | |
| 140 "metaschema_base.yml", | |
| 141 "salad.md", | |
| 142 "field_name.yml", | |
| 143 "import_include.md", | |
| 144 "link_res.yml", | |
| 145 "ident_res.yml", | |
| 146 "vocab_res.yml", | |
| 147 "vocab_res.yml", | |
| 148 "field_name_schema.yml", | |
| 149 "field_name_src.yml", | |
| 150 "field_name_proc.yml", | |
| 151 "ident_res_schema.yml", | |
| 152 "ident_res_src.yml", | |
| 153 "ident_res_proc.yml", | |
| 154 "link_res_schema.yml", | |
| 155 "link_res_src.yml", | |
| 156 "link_res_proc.yml", | |
| 157 "vocab_res_schema.yml", | |
| 158 "vocab_res_src.yml", | |
| 159 "vocab_res_proc.yml", | |
| 160 ) | |
| 161 | |
| 162 SCHEMA_CACHE = ( | |
| 163 {} | |
| 164 ) # type: Dict[str, Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]] | |
| 165 SCHEMA_FILE = None # type: Optional[CWLObjectType] | |
| 166 SCHEMA_DIR = None # type: Optional[CWLObjectType] | |
| 167 SCHEMA_ANY = None # type: Optional[CWLObjectType] | |
| 168 | |
| 169 custom_schemas = {} # type: Dict[str, Tuple[str, str]] | |
| 170 | |
| 171 | |
| 172 def use_standard_schema(version: str) -> None: | |
| 173 if version in custom_schemas: | |
| 174 del custom_schemas[version] | |
| 175 if version in SCHEMA_CACHE: | |
| 176 del SCHEMA_CACHE[version] | |
| 177 | |
| 178 | |
| 179 def use_custom_schema(version: str, name: str, text: str) -> None: | |
| 180 custom_schemas[version] = (name, text) | |
| 181 if version in SCHEMA_CACHE: | |
| 182 del SCHEMA_CACHE[version] | |
| 183 | |
| 184 | |
| 185 def get_schema( | |
| 186 version: str, | |
| 187 ) -> Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]: | |
| 188 | |
| 189 if version in SCHEMA_CACHE: | |
| 190 return SCHEMA_CACHE[version] | |
| 191 | |
| 192 cache = {} # type: Dict[str, Union[str, Graph, bool]] | |
| 193 version = version.split("#")[-1] | |
| 194 if ".dev" in version: | |
| 195 version = ".".join(version.split(".")[:-1]) | |
| 196 for f in cwl_files: | |
| 197 try: | |
| 198 res = resource_stream(__name__, f"schemas/{version}/{f}") | |
| 199 cache["https://w3id.org/cwl/" + f] = res.read().decode("UTF-8") | |
| 200 res.close() | |
| 201 except OSError: | |
| 202 pass | |
| 203 | |
| 204 for f in salad_files: | |
| 205 try: | |
| 206 res = resource_stream( | |
| 207 __name__, | |
| 208 f"schemas/{version}/salad/schema_salad/metaschema/{f}", | |
| 209 ) | |
| 210 cache[ | |
| 211 "https://w3id.org/cwl/salad/schema_salad/metaschema/" + f | |
| 212 ] = res.read().decode("UTF-8") | |
| 213 res.close() | |
| 214 except OSError: | |
| 215 pass | |
| 216 | |
| 217 if version in custom_schemas: | |
| 218 cache[custom_schemas[version][0]] = custom_schemas[version][1] | |
| 219 SCHEMA_CACHE[version] = load_schema(custom_schemas[version][0], cache=cache) | |
| 220 else: | |
| 221 SCHEMA_CACHE[version] = load_schema( | |
| 222 "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache | |
| 223 ) | |
| 224 | |
| 225 return SCHEMA_CACHE[version] | |
| 226 | |
| 227 | |
| 228 def shortname(inputid: str) -> str: | |
| 229 d = urllib.parse.urlparse(inputid) | |
| 230 if d.fragment: | |
| 231 return d.fragment.split("/")[-1] | |
| 232 return d.path.split("/")[-1] | |
| 233 | |
| 234 | |
| 235 def checkRequirements( | |
| 236 rec: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType, None], | |
| 237 supported_process_requirements: Iterable[str], | |
| 238 ) -> None: | |
| 239 if isinstance(rec, MutableMapping): | |
| 240 if "requirements" in rec: | |
| 241 for i, entry in enumerate( | |
| 242 cast(MutableSequence[CWLObjectType], rec["requirements"]) | |
| 243 ): | |
| 244 with SourceLine(rec["requirements"], i, UnsupportedRequirement): | |
| 245 if cast(str, entry["class"]) not in supported_process_requirements: | |
| 246 raise UnsupportedRequirement( | |
| 247 "Unsupported requirement {}".format(entry["class"]) | |
| 248 ) | |
| 249 for key in rec: | |
| 250 checkRequirements(rec[key], supported_process_requirements) | |
| 251 if isinstance(rec, MutableSequence): | |
| 252 for entry2 in rec: | |
| 253 checkRequirements(entry2, supported_process_requirements) | |
| 254 | |
| 255 | |
| 256 def stage_files( | |
| 257 pathmapper: PathMapper, | |
| 258 stage_func: Optional[Callable[[str, str], None]] = None, | |
| 259 ignore_writable: bool = False, | |
| 260 symlink: bool = True, | |
| 261 secret_store: Optional[SecretStore] = None, | |
| 262 fix_conflicts: bool = False, | |
| 263 ) -> None: | |
| 264 """Link or copy files to their targets. Create them as needed.""" | |
| 265 targets = {} # type: Dict[str, MapperEnt] | |
| 266 for key, entry in pathmapper.items(): | |
| 267 if "File" not in entry.type: | |
| 268 continue | |
| 269 if entry.target not in targets: | |
| 270 targets[entry.target] = entry | |
| 271 elif targets[entry.target].resolved != entry.resolved: | |
| 272 if fix_conflicts: | |
| 273 # find first key that does not clash with an existing entry in targets | |
| 274 # start with entry.target + '_' + 2 and then keep incrementing the number till there is no clash | |
| 275 i = 2 | |
| 276 tgt = f"{entry.target}_{i}" | |
| 277 while tgt in targets: | |
| 278 i += 1 | |
| 279 tgt = f"{entry.target}_{i}" | |
| 280 targets[tgt] = pathmapper.update( | |
| 281 key, entry.resolved, tgt, entry.type, entry.staged | |
| 282 ) | |
| 283 else: | |
| 284 raise WorkflowException( | |
| 285 "File staging conflict, trying to stage both %s and %s to the same target %s" | |
| 286 % (targets[entry.target].resolved, entry.resolved, entry.target) | |
| 287 ) | |
| 288 | |
| 289 for key, entry in pathmapper.items(): | |
| 290 if not entry.staged: | |
| 291 continue | |
| 292 if not os.path.exists(os.path.dirname(entry.target)): | |
| 293 os.makedirs(os.path.dirname(entry.target)) | |
| 294 if entry.type in ("File", "Directory") and os.path.exists(entry.resolved): | |
| 295 if symlink: # Use symlink func if allowed | |
| 296 if onWindows(): | |
| 297 if entry.type == "File": | |
| 298 shutil.copy(entry.resolved, entry.target) | |
| 299 elif entry.type == "Directory": | |
| 300 if os.path.exists(entry.target) and os.path.isdir(entry.target): | |
| 301 shutil.rmtree(entry.target) | |
| 302 copytree_with_merge(entry.resolved, entry.target) | |
| 303 else: | |
| 304 os.symlink(entry.resolved, entry.target) | |
| 305 elif stage_func is not None: | |
| 306 stage_func(entry.resolved, entry.target) | |
| 307 elif ( | |
| 308 entry.type == "Directory" | |
| 309 and not os.path.exists(entry.target) | |
| 310 and entry.resolved.startswith("_:") | |
| 311 ): | |
| 312 os.makedirs(entry.target) | |
| 313 elif entry.type == "WritableFile" and not ignore_writable: | |
| 314 shutil.copy(entry.resolved, entry.target) | |
| 315 ensure_writable(entry.target) | |
| 316 elif entry.type == "WritableDirectory" and not ignore_writable: | |
| 317 if entry.resolved.startswith("_:"): | |
| 318 os.makedirs(entry.target) | |
| 319 else: | |
| 320 shutil.copytree(entry.resolved, entry.target) | |
| 321 ensure_writable(entry.target) | |
| 322 elif entry.type == "CreateFile" or entry.type == "CreateWritableFile": | |
| 323 with open(entry.target, "wb") as new: | |
| 324 if secret_store is not None: | |
| 325 new.write( | |
| 326 cast(str, secret_store.retrieve(entry.resolved)).encode("utf-8") | |
| 327 ) | |
| 328 else: | |
| 329 new.write(entry.resolved.encode("utf-8")) | |
| 330 if entry.type == "CreateFile": | |
| 331 os.chmod(entry.target, stat.S_IRUSR) # Read only | |
| 332 else: # it is a "CreateWritableFile" | |
| 333 ensure_writable(entry.target) | |
| 334 pathmapper.update(key, entry.target, entry.target, entry.type, entry.staged) | |
| 335 | |
| 336 | |
| 337 def relocateOutputs( | |
| 338 outputObj: CWLObjectType, | |
| 339 destination_path: str, | |
| 340 source_directories: Set[str], | |
| 341 action: str, | |
| 342 fs_access: StdFsAccess, | |
| 343 compute_checksum: bool = True, | |
| 344 path_mapper: Type[PathMapper] = PathMapper, | |
| 345 ) -> CWLObjectType: | |
| 346 adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True)) | |
| 347 | |
| 348 if action not in ("move", "copy"): | |
| 349 return outputObj | |
| 350 | |
| 351 def _collectDirEntries( | |
| 352 obj: Union[CWLObjectType, MutableSequence[CWLObjectType], None] | |
| 353 ) -> Iterator[CWLObjectType]: | |
| 354 if isinstance(obj, dict): | |
| 355 if obj.get("class") in ("File", "Directory"): | |
| 356 yield obj | |
| 357 else: | |
| 358 for sub_obj in obj.values(): | |
| 359 yield from _collectDirEntries(sub_obj) | |
| 360 elif isinstance(obj, MutableSequence): | |
| 361 for sub_obj in obj: | |
| 362 yield from _collectDirEntries(sub_obj) | |
| 363 | |
| 364 def _relocate(src: str, dst: str) -> None: | |
| 365 if src == dst: | |
| 366 return | |
| 367 | |
| 368 # If the source is not contained in source_directories we're not allowed to delete it | |
| 369 src = fs_access.realpath(src) | |
| 370 src_can_deleted = any( | |
| 371 os.path.commonprefix([p, src]) == p for p in source_directories | |
| 372 ) | |
| 373 | |
| 374 _action = "move" if action == "move" and src_can_deleted else "copy" | |
| 375 | |
| 376 if _action == "move": | |
| 377 _logger.debug("Moving %s to %s", src, dst) | |
| 378 if fs_access.isdir(src) and fs_access.isdir(dst): | |
| 379 # merge directories | |
| 380 for dir_entry in scandir(src): | |
| 381 _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name)) | |
| 382 else: | |
| 383 shutil.move(src, dst) | |
| 384 | |
| 385 elif _action == "copy": | |
| 386 _logger.debug("Copying %s to %s", src, dst) | |
| 387 if fs_access.isdir(src): | |
| 388 if os.path.isdir(dst): | |
| 389 shutil.rmtree(dst) | |
| 390 elif os.path.isfile(dst): | |
| 391 os.unlink(dst) | |
| 392 shutil.copytree(src, dst) | |
| 393 else: | |
| 394 shutil.copy2(src, dst) | |
| 395 | |
| 396 def _realpath( | |
| 397 ob: CWLObjectType, | |
| 398 ) -> None: # should be type Union[CWLFile, CWLDirectory] | |
| 399 location = cast(str, ob["location"]) | |
| 400 if location.startswith("file:"): | |
| 401 ob["location"] = file_uri(os.path.realpath(uri_file_path(location))) | |
| 402 elif location.startswith("/"): | |
| 403 ob["location"] = os.path.realpath(location) | |
| 404 elif not location.startswith("_:") and ":" in location: | |
| 405 ob["location"] = file_uri(fs_access.realpath(location)) | |
| 406 | |
| 407 outfiles = list(_collectDirEntries(outputObj)) | |
| 408 visit_class(outfiles, ("File", "Directory"), _realpath) | |
| 409 pm = path_mapper(outfiles, "", destination_path, separateDirs=False) | |
| 410 stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True) | |
| 411 | |
| 412 def _check_adjust(a_file: CWLObjectType) -> CWLObjectType: | |
| 413 a_file["location"] = file_uri(pm.mapper(cast(str, a_file["location"]))[1]) | |
| 414 if "contents" in a_file: | |
| 415 del a_file["contents"] | |
| 416 return a_file | |
| 417 | |
| 418 visit_class(outputObj, ("File", "Directory"), _check_adjust) | |
| 419 | |
| 420 if compute_checksum: | |
| 421 visit_class( | |
| 422 outputObj, ("File",), functools.partial(compute_checksums, fs_access) | |
| 423 ) | |
| 424 return outputObj | |
| 425 | |
| 426 | |
| 427 def cleanIntermediate(output_dirs: Iterable[str]) -> None: | |
| 428 for a in output_dirs: | |
| 429 if os.path.exists(a): | |
| 430 _logger.debug("Removing intermediate output directory %s", a) | |
| 431 shutil.rmtree(a, True) | |
| 432 | |
| 433 | |
| 434 def add_sizes(fsaccess: StdFsAccess, obj: CWLObjectType) -> None: | |
| 435 if "location" in obj: | |
| 436 try: | |
| 437 if "size" not in obj: | |
| 438 obj["size"] = fsaccess.size(cast(str, obj["location"])) | |
| 439 except OSError: | |
| 440 pass | |
| 441 elif "contents" in obj: | |
| 442 obj["size"] = len(cast(Sized, obj["contents"])) | |
| 443 return # best effort | |
| 444 | |
| 445 | |
| 446 def fill_in_defaults( | |
| 447 inputs: List[CWLObjectType], | |
| 448 job: CWLObjectType, | |
| 449 fsaccess: StdFsAccess, | |
| 450 ) -> None: | |
| 451 for e, inp in enumerate(inputs): | |
| 452 with SourceLine( | |
| 453 inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG) | |
| 454 ): | |
| 455 fieldname = shortname(cast(str, inp["id"])) | |
| 456 if job.get(fieldname) is not None: | |
| 457 pass | |
| 458 elif job.get(fieldname) is None and "default" in inp: | |
| 459 job[fieldname] = copy.deepcopy(inp["default"]) | |
| 460 elif job.get(fieldname) is None and "null" in aslist(inp["type"]): | |
| 461 job[fieldname] = None | |
| 462 else: | |
| 463 raise WorkflowException( | |
| 464 "Missing required input parameter '%s'" | |
| 465 % shortname(cast(str, inp["id"])) | |
| 466 ) | |
| 467 | |
| 468 | |
| 469 def avroize_type( | |
| 470 field_type: Union[ | |
| 471 CWLObjectType, MutableSequence[CWLOutputType], CWLOutputType, None | |
| 472 ], | |
| 473 name_prefix: str = "", | |
| 474 ) -> None: | |
| 475 """Add missing information to a type so that CWL types are valid.""" | |
| 476 if isinstance(field_type, MutableSequence): | |
| 477 for field in field_type: | |
| 478 avroize_type(field, name_prefix) | |
| 479 elif isinstance(field_type, MutableMapping): | |
| 480 if field_type["type"] in ("enum", "record"): | |
| 481 if "name" not in field_type: | |
| 482 field_type["name"] = name_prefix + str(uuid.uuid4()) | |
| 483 if field_type["type"] == "record": | |
| 484 avroize_type( | |
| 485 cast(MutableSequence[CWLOutputType], field_type["fields"]), name_prefix | |
| 486 ) | |
| 487 if field_type["type"] == "array": | |
| 488 avroize_type( | |
| 489 cast(MutableSequence[CWLOutputType], field_type["items"]), name_prefix | |
| 490 ) | |
| 491 if isinstance(field_type["type"], MutableSequence): | |
| 492 for ctype in field_type["type"]: | |
| 493 avroize_type(cast(CWLOutputType, ctype), name_prefix) | |
| 494 | |
| 495 | |
| 496 def get_overrides( | |
| 497 overrides: MutableSequence[CWLObjectType], toolid: str | |
| 498 ) -> CWLObjectType: | |
| 499 req = {} # type: CWLObjectType | |
| 500 if not isinstance(overrides, MutableSequence): | |
| 501 raise ValidationException( | |
| 502 "Expected overrides to be a list, but was %s" % type(overrides) | |
| 503 ) | |
| 504 for ov in overrides: | |
| 505 if ov["overrideTarget"] == toolid: | |
| 506 req.update(ov) | |
| 507 return req | |
| 508 | |
| 509 | |
| 510 _VAR_SPOOL_ERROR = textwrap.dedent( | |
| 511 """ | |
| 512 Non-portable reference to /var/spool/cwl detected: '{}'. | |
| 513 To fix, replace /var/spool/cwl with $(runtime.outdir) or add | |
| 514 DockerRequirement to the 'requirements' section and declare | |
| 515 'dockerOutputDirectory: /var/spool/cwl'. | |
| 516 """ | |
| 517 ) | |
| 518 | |
| 519 | |
| 520 def var_spool_cwl_detector( | |
| 521 obj: CWLOutputType, | |
| 522 item: Optional[Any] = None, | |
| 523 obj_key: Optional[Any] = None, | |
| 524 ) -> bool: | |
| 525 """Detect any textual reference to /var/spool/cwl.""" | |
| 526 r = False | |
| 527 if isinstance(obj, str): | |
| 528 if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory": | |
| 529 _logger.warning( | |
| 530 SourceLine(item=item, key=obj_key, raise_type=str).makeError( | |
| 531 _VAR_SPOOL_ERROR.format(obj) | |
| 532 ) | |
| 533 ) | |
| 534 r = True | |
| 535 elif isinstance(obj, MutableMapping): | |
| 536 for mkey, mvalue in obj.items(): | |
| 537 r = var_spool_cwl_detector(cast(CWLOutputType, mvalue), obj, mkey) or r | |
| 538 elif isinstance(obj, MutableSequence): | |
| 539 for lkey, lvalue in enumerate(obj): | |
| 540 r = var_spool_cwl_detector(cast(CWLOutputType, lvalue), obj, lkey) or r | |
| 541 return r | |
| 542 | |
| 543 | |
| 544 def eval_resource( | |
| 545 builder: Builder, resource_req: Union[str, int, float] | |
| 546 ) -> Optional[Union[str, int, float]]: | |
| 547 if isinstance(resource_req, str) and expression.needs_parsing(resource_req): | |
| 548 result = builder.do_eval(resource_req) | |
| 549 if isinstance(result, (str, int)) or result is None: | |
| 550 return result | |
| 551 raise WorkflowException( | |
| 552 "Got incorrect return type {} from resource expression evaluation of {}.".format( | |
| 553 type(result), resource_req | |
| 554 ) | |
| 555 ) | |
| 556 return resource_req | |
| 557 | |
| 558 | |
| 559 # Threshold where the "too many files" warning kicks in | |
| 560 FILE_COUNT_WARNING = 5000 | |
| 561 | |
| 562 | |
| 563 class Process(HasReqsHints, metaclass=abc.ABCMeta): | |
| 564 def __init__( | |
| 565 self, toolpath_object: CommentedMap, loadingContext: LoadingContext | |
| 566 ) -> None: | |
| 567 """Build a Process object from the provided dictionary.""" | |
| 568 super().__init__() | |
| 569 self.metadata = getdefault(loadingContext.metadata, {}) # type: CWLObjectType | |
| 570 self.provenance_object = None # type: Optional[ProvenanceProfile] | |
| 571 self.parent_wf = None # type: Optional[ProvenanceProfile] | |
| 572 global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement | |
| 573 if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None: | |
| 574 get_schema("v1.0") | |
| 575 SCHEMA_ANY = cast( | |
| 576 CWLObjectType, | |
| 577 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"], | |
| 578 ) | |
| 579 SCHEMA_FILE = cast( | |
| 580 CWLObjectType, | |
| 581 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"], | |
| 582 ) | |
| 583 SCHEMA_DIR = cast( | |
| 584 CWLObjectType, | |
| 585 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"], | |
| 586 ) | |
| 587 | |
| 588 self.names = make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], Loader({})) | |
| 589 self.tool = toolpath_object | |
| 590 self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, [])) | |
| 591 self.requirements.extend(self.tool.get("requirements", [])) | |
| 592 if "id" not in self.tool: | |
| 593 self.tool["id"] = "_:" + str(uuid.uuid4()) | |
| 594 self.requirements.extend( | |
| 595 cast( | |
| 596 List[CWLObjectType], | |
| 597 get_overrides( | |
| 598 getdefault(loadingContext.overrides_list, []), self.tool["id"] | |
| 599 ).get("requirements", []), | |
| 600 ) | |
| 601 ) | |
| 602 self.hints = copy.deepcopy(getdefault(loadingContext.hints, [])) | |
| 603 self.hints.extend(self.tool.get("hints", [])) | |
| 604 # Versions of requirements and hints which aren't mutated. | |
| 605 self.original_requirements = copy.deepcopy(self.requirements) | |
| 606 self.original_hints = copy.deepcopy(self.hints) | |
| 607 self.doc_loader = loadingContext.loader | |
| 608 self.doc_schema = loadingContext.avsc_names | |
| 609 | |
| 610 self.formatgraph = None # type: Optional[Graph] | |
| 611 if self.doc_loader is not None: | |
| 612 self.formatgraph = self.doc_loader.graph | |
| 613 | |
| 614 checkRequirements(self.tool, supportedProcessRequirements) | |
| 615 self.validate_hints( | |
| 616 cast(Names, loadingContext.avsc_names), | |
| 617 self.tool.get("hints", []), | |
| 618 strict=getdefault(loadingContext.strict, False), | |
| 619 ) | |
| 620 | |
| 621 self.schemaDefs = {} # type: MutableMapping[str, CWLObjectType] | |
| 622 | |
| 623 sd, _ = self.get_requirement("SchemaDefRequirement") | |
| 624 | |
| 625 if sd is not None: | |
| 626 sdtypes = cast(MutableSequence[CWLObjectType], sd["types"]) | |
| 627 avroize_type(cast(MutableSequence[CWLOutputType], sdtypes)) | |
| 628 av = make_valid_avro( | |
| 629 sdtypes, | |
| 630 {cast(str, t["name"]): cast(Dict[str, Any], t) for t in sdtypes}, | |
| 631 set(), | |
| 632 ) | |
| 633 for i in av: | |
| 634 self.schemaDefs[i["name"]] = i # type: ignore | |
| 635 make_avsc_object(convert_to_dict(av), self.names) | |
| 636 | |
| 637 # Build record schema from inputs | |
| 638 self.inputs_record_schema = { | |
| 639 "name": "input_record_schema", | |
| 640 "type": "record", | |
| 641 "fields": [], | |
| 642 } # type: CWLObjectType | |
| 643 self.outputs_record_schema = { | |
| 644 "name": "outputs_record_schema", | |
| 645 "type": "record", | |
| 646 "fields": [], | |
| 647 } # type: CWLObjectType | |
| 648 | |
| 649 for key in ("inputs", "outputs"): | |
| 650 for i in self.tool[key]: | |
| 651 c = copy.deepcopy(i) | |
| 652 c["name"] = shortname(c["id"]) | |
| 653 del c["id"] | |
| 654 | |
| 655 if "type" not in c: | |
| 656 raise ValidationException( | |
| 657 "Missing 'type' in parameter '{}'".format(c["name"]) | |
| 658 ) | |
| 659 | |
| 660 if "default" in c and "null" not in aslist(c["type"]): | |
| 661 nullable = ["null"] | |
| 662 nullable.extend(aslist(c["type"])) | |
| 663 c["type"] = nullable | |
| 664 else: | |
| 665 c["type"] = c["type"] | |
| 666 avroize_type(c["type"], c["name"]) | |
| 667 if key == "inputs": | |
| 668 cast( | |
| 669 List[CWLObjectType], self.inputs_record_schema["fields"] | |
| 670 ).append(c) | |
| 671 elif key == "outputs": | |
| 672 cast( | |
| 673 List[CWLObjectType], self.outputs_record_schema["fields"] | |
| 674 ).append(c) | |
| 675 | |
| 676 with SourceLine(toolpath_object, "inputs", ValidationException): | |
| 677 self.inputs_record_schema = cast( | |
| 678 CWLObjectType, | |
| 679 make_valid_avro(self.inputs_record_schema, {}, set()), | |
| 680 ) | |
| 681 make_avsc_object(convert_to_dict(self.inputs_record_schema), self.names) | |
| 682 with SourceLine(toolpath_object, "outputs", ValidationException): | |
| 683 self.outputs_record_schema = cast( | |
| 684 CWLObjectType, | |
| 685 make_valid_avro(self.outputs_record_schema, {}, set()), | |
| 686 ) | |
| 687 make_avsc_object(convert_to_dict(self.outputs_record_schema), self.names) | |
| 688 | |
| 689 if toolpath_object.get("class") is not None and not getdefault( | |
| 690 loadingContext.disable_js_validation, False | |
| 691 ): | |
| 692 validate_js_options = ( | |
| 693 None | |
| 694 ) # type: Optional[Dict[str, Union[List[str], str, int]]] | |
| 695 if loadingContext.js_hint_options_file is not None: | |
| 696 try: | |
| 697 with open(loadingContext.js_hint_options_file) as options_file: | |
| 698 validate_js_options = json.load(options_file) | |
| 699 except (OSError, ValueError): | |
| 700 _logger.error( | |
| 701 "Failed to read options file %s", | |
| 702 loadingContext.js_hint_options_file, | |
| 703 ) | |
| 704 raise | |
| 705 if self.doc_schema is not None: | |
| 706 validate_js_expressions( | |
| 707 toolpath_object, | |
| 708 self.doc_schema.names[toolpath_object["class"]], | |
| 709 validate_js_options, | |
| 710 ) | |
| 711 | |
| 712 dockerReq, is_req = self.get_requirement("DockerRequirement") | |
| 713 | |
| 714 if ( | |
| 715 dockerReq is not None | |
| 716 and "dockerOutputDirectory" in dockerReq | |
| 717 and is_req is not None | |
| 718 and not is_req | |
| 719 ): | |
| 720 _logger.warning( | |
| 721 SourceLine(item=dockerReq, raise_type=str).makeError( | |
| 722 "When 'dockerOutputDirectory' is declared, DockerRequirement " | |
| 723 "should go in the 'requirements' section, not 'hints'." | |
| 724 "" | |
| 725 ) | |
| 726 ) | |
| 727 | |
| 728 if ( | |
| 729 dockerReq is not None | |
| 730 and is_req is not None | |
| 731 and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl" | |
| 732 ): | |
| 733 if is_req: | |
| 734 # In this specific case, it is legal to have /var/spool/cwl, so skip the check. | |
| 735 pass | |
| 736 else: | |
| 737 # Must be a requirement | |
| 738 var_spool_cwl_detector(self.tool) | |
| 739 else: | |
| 740 var_spool_cwl_detector(self.tool) | |
| 741 | |
| 742 def _init_job( | |
| 743 self, joborder: CWLObjectType, runtime_context: RuntimeContext | |
| 744 ) -> Builder: | |
| 745 | |
| 746 if self.metadata.get("cwlVersion") != INTERNAL_VERSION: | |
| 747 raise WorkflowException( | |
| 748 "Process object loaded with version '%s', must update to '%s' in order to execute." | |
| 749 % (self.metadata.get("cwlVersion"), INTERNAL_VERSION) | |
| 750 ) | |
| 751 | |
| 752 job = copy.deepcopy(joborder) | |
| 753 | |
| 754 make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) | |
| 755 fs_access = make_fs_access(runtime_context.basedir) | |
| 756 | |
| 757 load_listing_req, _ = self.get_requirement("LoadListingRequirement") | |
| 758 | |
| 759 load_listing = ( | |
| 760 cast(str, load_listing_req.get("loadListing")) | |
| 761 if load_listing_req is not None | |
| 762 else "no_listing" | |
| 763 ) | |
| 764 | |
| 765 # Validate job order | |
| 766 try: | |
| 767 fill_in_defaults(self.tool["inputs"], job, fs_access) | |
| 768 | |
| 769 normalizeFilesDirs(job) | |
| 770 schema = self.names.get_name("input_record_schema", None) | |
| 771 if schema is None: | |
| 772 raise WorkflowException( | |
| 773 "Missing input record schema: " "{}".format(self.names) | |
| 774 ) | |
| 775 validate_ex(schema, job, strict=False, logger=_logger_validation_warnings) | |
| 776 | |
| 777 if load_listing and load_listing != "no_listing": | |
| 778 get_listing(fs_access, job, recursive=(load_listing == "deep_listing")) | |
| 779 | |
| 780 visit_class(job, ("File",), functools.partial(add_sizes, fs_access)) | |
| 781 | |
| 782 if load_listing == "deep_listing": | |
| 783 for i, inparm in enumerate(self.tool["inputs"]): | |
| 784 k = shortname(inparm["id"]) | |
| 785 if k not in job: | |
| 786 continue | |
| 787 v = job[k] | |
| 788 dircount = [0] | |
| 789 | |
| 790 def inc(d): # type: (List[int]) -> None | |
| 791 d[0] += 1 | |
| 792 | |
| 793 visit_class(v, ("Directory",), lambda x: inc(dircount)) | |
| 794 if dircount[0] == 0: | |
| 795 continue | |
| 796 filecount = [0] | |
| 797 visit_class(v, ("File",), lambda x: inc(filecount)) | |
| 798 if filecount[0] > FILE_COUNT_WARNING: | |
| 799 # Long lines in this message are okay, will be reflowed based on terminal columns. | |
| 800 _logger.warning( | |
| 801 strip_dup_lineno( | |
| 802 SourceLine(self.tool["inputs"], i, str).makeError( | |
| 803 """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use. | |
| 804 | |
| 805 If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior: | |
| 806 | |
| 807 $namespaces: | |
| 808 cwltool: "http://commonwl.org/cwltool#" | |
| 809 hints: | |
| 810 cwltool:LoadListingRequirement: | |
| 811 loadListing: shallow_listing | |
| 812 | |
| 813 """ | |
| 814 % (filecount[0], k) | |
| 815 ) | |
| 816 ) | |
| 817 ) | |
| 818 | |
| 819 except (ValidationException, WorkflowException) as err: | |
| 820 raise WorkflowException("Invalid job input record:\n" + str(err)) from err | |
| 821 | |
| 822 files = [] # type: List[CWLObjectType] | |
| 823 bindings = CommentedSeq() | |
| 824 outdir = "" | |
| 825 tmpdir = "" | |
| 826 stagedir = "" | |
| 827 | |
| 828 docker_req, _ = self.get_requirement("DockerRequirement") | |
| 829 default_docker = None | |
| 830 | |
| 831 if docker_req is None and runtime_context.default_container: | |
| 832 default_docker = runtime_context.default_container | |
| 833 | |
| 834 if (docker_req or default_docker) and runtime_context.use_container: | |
| 835 if docker_req is not None: | |
| 836 # Check if docker output directory is absolute | |
| 837 if docker_req.get("dockerOutputDirectory") and cast( | |
| 838 str, docker_req.get("dockerOutputDirectory") | |
| 839 ).startswith("/"): | |
| 840 outdir = cast(str, docker_req.get("dockerOutputDirectory")) | |
| 841 else: | |
| 842 outdir = cast( | |
| 843 str, | |
| 844 docker_req.get("dockerOutputDirectory") | |
| 845 or runtime_context.docker_outdir | |
| 846 or random_outdir(), | |
| 847 ) | |
| 848 elif default_docker is not None: | |
| 849 outdir = runtime_context.docker_outdir or random_outdir() | |
| 850 tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec | |
| 851 stagedir = runtime_context.docker_stagedir or "/var/lib/cwl" | |
| 852 else: | |
| 853 if self.tool["class"] == "CommandLineTool": | |
| 854 outdir = fs_access.realpath(runtime_context.get_outdir()) | |
| 855 tmpdir = fs_access.realpath(runtime_context.get_tmpdir()) | |
| 856 stagedir = fs_access.realpath(runtime_context.get_stagedir()) | |
| 857 | |
| 858 cwl_version = cast( | |
| 859 str, | |
| 860 self.metadata.get("http://commonwl.org/cwltool#original_cwlVersion", None), | |
| 861 ) | |
| 862 builder = Builder( | |
| 863 job, | |
| 864 files, | |
| 865 bindings, | |
| 866 self.schemaDefs, | |
| 867 self.names, | |
| 868 self.requirements, | |
| 869 self.hints, | |
| 870 {}, | |
| 871 runtime_context.mutation_manager, | |
| 872 self.formatgraph, | |
| 873 make_fs_access, | |
| 874 fs_access, | |
| 875 runtime_context.job_script_provider, | |
| 876 runtime_context.eval_timeout, | |
| 877 runtime_context.debug, | |
| 878 runtime_context.js_console, | |
| 879 runtime_context.force_docker_pull, | |
| 880 load_listing, | |
| 881 outdir, | |
| 882 tmpdir, | |
| 883 stagedir, | |
| 884 cwl_version, | |
| 885 ) | |
| 886 | |
| 887 bindings.extend( | |
| 888 builder.bind_input( | |
| 889 self.inputs_record_schema, | |
| 890 job, | |
| 891 discover_secondaryFiles=getdefault(runtime_context.toplevel, False), | |
| 892 ) | |
| 893 ) | |
| 894 | |
| 895 if self.tool.get("baseCommand"): | |
| 896 for index, command in enumerate(aslist(self.tool["baseCommand"])): | |
| 897 bindings.append({"position": [-1000000, index], "datum": command}) | |
| 898 | |
| 899 if self.tool.get("arguments"): | |
| 900 for i, arg in enumerate(self.tool["arguments"]): | |
| 901 lc = self.tool["arguments"].lc.data[i] | |
| 902 filename = self.tool["arguments"].lc.filename | |
| 903 bindings.lc.add_kv_line_col(len(bindings), lc) | |
| 904 if isinstance(arg, MutableMapping): | |
| 905 arg = copy.deepcopy(arg) | |
| 906 if arg.get("position"): | |
| 907 position = arg.get("position") | |
| 908 if isinstance(position, str): # no need to test the | |
| 909 # CWLVersion as the v1.0 | |
| 910 # schema only allows ints | |
| 911 position = builder.do_eval(position) | |
| 912 if position is None: | |
| 913 position = 0 | |
| 914 arg["position"] = [position, i] | |
| 915 else: | |
| 916 arg["position"] = [0, i] | |
| 917 bindings.append(arg) | |
| 918 elif ("$(" in arg) or ("${" in arg): | |
| 919 cm = CommentedMap((("position", [0, i]), ("valueFrom", arg))) | |
| 920 cm.lc.add_kv_line_col("valueFrom", lc) | |
| 921 cm.lc.filename = filename | |
| 922 bindings.append(cm) | |
| 923 else: | |
| 924 cm = CommentedMap((("position", [0, i]), ("datum", arg))) | |
| 925 cm.lc.add_kv_line_col("datum", lc) | |
| 926 cm.lc.filename = filename | |
| 927 bindings.append(cm) | |
| 928 | |
| 929 # use python2 like sorting of heterogeneous lists | |
| 930 # (containing str and int types), | |
| 931 key = functools.cmp_to_key(cmp_like_py2) | |
| 932 | |
| 933 # This awkward construction replaces the contents of | |
| 934 # "bindings" in place (because Builder expects it to be | |
| 935 # mutated in place, sigh, I'm sorry) with its contents sorted, | |
| 936 # supporting different versions of Python and ruamel.yaml with | |
| 937 # different behaviors/bugs in CommentedSeq. | |
| 938 bindings_copy = copy.deepcopy(bindings) | |
| 939 del bindings[:] | |
| 940 bindings.extend(sorted(bindings_copy, key=key)) | |
| 941 | |
| 942 if self.tool["class"] != "Workflow": | |
| 943 builder.resources = self.evalResources(builder, runtime_context) | |
| 944 return builder | |
| 945 | |
| 946 def evalResources( | |
| 947 self, builder: Builder, runtimeContext: RuntimeContext | |
| 948 ) -> Dict[str, Union[int, float, str]]: | |
| 949 resourceReq, _ = self.get_requirement("ResourceRequirement") | |
| 950 if resourceReq is None: | |
| 951 resourceReq = {} | |
| 952 cwl_version = self.metadata.get( | |
| 953 "http://commonwl.org/cwltool#original_cwlVersion", None | |
| 954 ) | |
| 955 if cwl_version == "v1.0": | |
| 956 ram = 1024 | |
| 957 else: | |
| 958 ram = 256 | |
| 959 request: Dict[str, Union[int, float, str]] = { | |
| 960 "coresMin": 1, | |
| 961 "coresMax": 1, | |
| 962 "ramMin": ram, | |
| 963 "ramMax": ram, | |
| 964 "tmpdirMin": 1024, | |
| 965 "tmpdirMax": 1024, | |
| 966 "outdirMin": 1024, | |
| 967 "outdirMax": 1024, | |
| 968 } | |
| 969 for a in ("cores", "ram", "tmpdir", "outdir"): | |
| 970 mn = mx = None # type: Optional[Union[int, float]] | |
| 971 if resourceReq.get(a + "Min"): | |
| 972 mn = cast( | |
| 973 Union[int, float], | |
| 974 eval_resource( | |
| 975 builder, cast(Union[str, int, float], resourceReq[a + "Min"]) | |
| 976 ), | |
| 977 ) | |
| 978 if resourceReq.get(a + "Max"): | |
| 979 mx = cast( | |
| 980 Union[int, float], | |
| 981 eval_resource( | |
| 982 builder, cast(Union[str, int, float], resourceReq[a + "Max"]) | |
| 983 ), | |
| 984 ) | |
| 985 if mn is None: | |
| 986 mn = mx | |
| 987 elif mx is None: | |
| 988 mx = mn | |
| 989 | |
| 990 if mn is not None: | |
| 991 request[a + "Min"] = mn | |
| 992 request[a + "Max"] = cast(Union[int, float], mx) | |
| 993 | |
| 994 if runtimeContext.select_resources is not None: | |
| 995 return runtimeContext.select_resources(request, runtimeContext) | |
| 996 return { | |
| 997 "cores": request["coresMin"], | |
| 998 "ram": math.ceil(request["ramMin"]) | |
| 999 if not isinstance(request["ramMin"], str) | |
| 1000 else request["ramMin"], | |
| 1001 "tmpdirSize": math.ceil(request["tmpdirMin"]) | |
| 1002 if not isinstance(request["tmpdirMin"], str) | |
| 1003 else request["tmpdirMin"], | |
| 1004 "outdirSize": math.ceil(request["outdirMin"]) | |
| 1005 if not isinstance(request["outdirMin"], str) | |
| 1006 else request["outdirMin"], | |
| 1007 } | |
| 1008 | |
| 1009 def validate_hints( | |
| 1010 self, avsc_names: Names, hints: List[CWLObjectType], strict: bool | |
| 1011 ) -> None: | |
| 1012 for i, r in enumerate(hints): | |
| 1013 sl = SourceLine(hints, i, ValidationException) | |
| 1014 with sl: | |
| 1015 if ( | |
| 1016 avsc_names.get_name(cast(str, r["class"]), None) is not None | |
| 1017 and self.doc_loader is not None | |
| 1018 ): | |
| 1019 plain_hint = { | |
| 1020 key: r[key] | |
| 1021 for key in r | |
| 1022 if key not in self.doc_loader.identifiers | |
| 1023 } # strip identifiers | |
| 1024 validate_ex( | |
| 1025 cast( | |
| 1026 Schema, | |
| 1027 avsc_names.get_name(cast(str, plain_hint["class"]), None), | |
| 1028 ), | |
| 1029 plain_hint, | |
| 1030 strict=strict, | |
| 1031 ) | |
| 1032 elif r["class"] in ("NetworkAccess", "LoadListingRequirement"): | |
| 1033 pass | |
| 1034 else: | |
| 1035 _logger.info(str(sl.makeError("Unknown hint %s" % (r["class"])))) | |
| 1036 | |
| 1037 def visit(self, op: Callable[[CommentedMap], None]) -> None: | |
| 1038 op(self.tool) | |
| 1039 | |
| 1040 @abc.abstractmethod | |
| 1041 def job( | |
| 1042 self, | |
| 1043 job_order: CWLObjectType, | |
| 1044 output_callbacks: Optional[OutputCallbackType], | |
| 1045 runtimeContext: RuntimeContext, | |
| 1046 ) -> JobsGeneratorType: | |
| 1047 pass | |
| 1048 | |
| 1049 | |
| 1050 _names = set() # type: Set[str] | |
| 1051 | |
| 1052 | |
| 1053 def uniquename(stem: str, names: Optional[Set[str]] = None) -> str: | |
| 1054 global _names | |
| 1055 if names is None: | |
| 1056 names = _names | |
| 1057 c = 1 | |
| 1058 u = stem | |
| 1059 while u in names: | |
| 1060 c += 1 | |
| 1061 u = f"{stem}_{c}" | |
| 1062 names.add(u) | |
| 1063 return u | |
| 1064 | |
| 1065 | |
| 1066 def nestdir(base: str, deps: CWLObjectType) -> CWLObjectType: | |
| 1067 dirname = os.path.dirname(base) + "/" | |
| 1068 subid = cast(str, deps["location"]) | |
| 1069 if subid.startswith(dirname): | |
| 1070 s2 = subid[len(dirname) :] | |
| 1071 sp = s2.split("/") | |
| 1072 sp.pop() | |
| 1073 while sp: | |
| 1074 nx = sp.pop() | |
| 1075 deps = {"class": "Directory", "basename": nx, "listing": [deps]} | |
| 1076 return deps | |
| 1077 | |
| 1078 | |
| 1079 def mergedirs(listing: List[CWLObjectType]) -> List[CWLObjectType]: | |
| 1080 r = [] # type: List[CWLObjectType] | |
| 1081 ents = {} # type: Dict[str, CWLObjectType] | |
| 1082 collided = set() # type: Set[str] | |
| 1083 for e in listing: | |
| 1084 basename = cast(str, e["basename"]) | |
| 1085 if basename not in ents: | |
| 1086 ents[basename] = e | |
| 1087 elif e["class"] == "Directory": | |
| 1088 if e.get("listing"): | |
| 1089 cast( | |
| 1090 List[CWLObjectType], ents[basename].setdefault("listing", []) | |
| 1091 ).extend(cast(List[CWLObjectType], e["listing"])) | |
| 1092 if cast(str, ents[basename]["location"]).startswith("_:"): | |
| 1093 ents[basename]["location"] = e["location"] | |
| 1094 elif e["location"] != ents[basename]["location"]: | |
| 1095 # same basename, different location, collision, | |
| 1096 # rename both. | |
| 1097 collided.add(basename) | |
| 1098 e2 = ents[basename] | |
| 1099 | |
| 1100 e["basename"] = urllib.parse.quote(cast(str, e["location"]), safe="") | |
| 1101 e2["basename"] = urllib.parse.quote(cast(str, e2["location"]), safe="") | |
| 1102 | |
| 1103 e["nameroot"], e["nameext"] = os.path.splitext(cast(str, e["basename"])) | |
| 1104 e2["nameroot"], e2["nameext"] = os.path.splitext(cast(str, e2["basename"])) | |
| 1105 | |
| 1106 ents[cast(str, e["basename"])] = e | |
| 1107 ents[cast(str, e2["basename"])] = e2 | |
| 1108 for c in collided: | |
| 1109 del ents[c] | |
| 1110 for e in ents.values(): | |
| 1111 if e["class"] == "Directory" and "listing" in e: | |
| 1112 e["listing"] = cast( | |
| 1113 MutableSequence[CWLOutputAtomType], | |
| 1114 mergedirs(cast(List[CWLObjectType], e["listing"])), | |
| 1115 ) | |
| 1116 r.extend(ents.values()) | |
| 1117 return r | |
| 1118 | |
| 1119 | |
| 1120 CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl" | |
| 1121 | |
| 1122 | |
| 1123 def scandeps( | |
| 1124 base: str, | |
| 1125 doc: Union[CWLObjectType, MutableSequence[CWLObjectType]], | |
| 1126 reffields: Set[str], | |
| 1127 urlfields: Set[str], | |
| 1128 loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]], | |
| 1129 urljoin: Callable[[str, str], str] = urllib.parse.urljoin, | |
| 1130 nestdirs: bool = True, | |
| 1131 ) -> MutableSequence[CWLObjectType]: | |
| 1132 r = [] # type: MutableSequence[CWLObjectType] | |
| 1133 if isinstance(doc, MutableMapping): | |
| 1134 if "id" in doc: | |
| 1135 if cast(str, doc["id"]).startswith("file://"): | |
| 1136 df, _ = urllib.parse.urldefrag(cast(str, doc["id"])) | |
| 1137 if base != df: | |
| 1138 r.append({"class": "File", "location": df, "format": CWL_IANA}) | |
| 1139 base = df | |
| 1140 | |
| 1141 if doc.get("class") in ("File", "Directory") and "location" in urlfields: | |
| 1142 u = cast(Optional[str], doc.get("location", doc.get("path"))) | |
| 1143 if u and not u.startswith("_:"): | |
| 1144 deps = { | |
| 1145 "class": doc["class"], | |
| 1146 "location": urljoin(base, u), | |
| 1147 } # type: CWLObjectType | |
| 1148 if "basename" in doc: | |
| 1149 deps["basename"] = doc["basename"] | |
| 1150 if doc["class"] == "Directory" and "listing" in doc: | |
| 1151 deps["listing"] = doc["listing"] | |
| 1152 if doc["class"] == "File" and "secondaryFiles" in doc: | |
| 1153 deps["secondaryFiles"] = cast( | |
| 1154 CWLOutputAtomType, | |
| 1155 scandeps( | |
| 1156 base, | |
| 1157 cast( | |
| 1158 Union[CWLObjectType, MutableSequence[CWLObjectType]], | |
| 1159 doc["secondaryFiles"], | |
| 1160 ), | |
| 1161 reffields, | |
| 1162 urlfields, | |
| 1163 loadref, | |
| 1164 urljoin=urljoin, | |
| 1165 nestdirs=nestdirs, | |
| 1166 ), | |
| 1167 ) | |
| 1168 if nestdirs: | |
| 1169 deps = nestdir(base, deps) | |
| 1170 r.append(deps) | |
| 1171 else: | |
| 1172 if doc["class"] == "Directory" and "listing" in doc: | |
| 1173 r.extend( | |
| 1174 scandeps( | |
| 1175 base, | |
| 1176 cast(MutableSequence[CWLObjectType], doc["listing"]), | |
| 1177 reffields, | |
| 1178 urlfields, | |
| 1179 loadref, | |
| 1180 urljoin=urljoin, | |
| 1181 nestdirs=nestdirs, | |
| 1182 ) | |
| 1183 ) | |
| 1184 elif doc["class"] == "File" and "secondaryFiles" in doc: | |
| 1185 r.extend( | |
| 1186 scandeps( | |
| 1187 base, | |
| 1188 cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]), | |
| 1189 reffields, | |
| 1190 urlfields, | |
| 1191 loadref, | |
| 1192 urljoin=urljoin, | |
| 1193 nestdirs=nestdirs, | |
| 1194 ) | |
| 1195 ) | |
| 1196 | |
| 1197 for k, v in doc.items(): | |
| 1198 if k in reffields: | |
| 1199 for u2 in aslist(v): | |
| 1200 if isinstance(u2, MutableMapping): | |
| 1201 r.extend( | |
| 1202 scandeps( | |
| 1203 base, | |
| 1204 u2, | |
| 1205 reffields, | |
| 1206 urlfields, | |
| 1207 loadref, | |
| 1208 urljoin=urljoin, | |
| 1209 nestdirs=nestdirs, | |
| 1210 ) | |
| 1211 ) | |
| 1212 else: | |
| 1213 subid = urljoin(base, u2) | |
| 1214 basedf, _ = urllib.parse.urldefrag(base) | |
| 1215 subiddf, _ = urllib.parse.urldefrag(subid) | |
| 1216 if basedf == subiddf: | |
| 1217 continue | |
| 1218 sub = cast( | |
| 1219 Union[MutableSequence[CWLObjectType], CWLObjectType], | |
| 1220 loadref(base, u2), | |
| 1221 ) | |
| 1222 deps2 = { | |
| 1223 "class": "File", | |
| 1224 "location": subid, | |
| 1225 "format": CWL_IANA, | |
| 1226 } # type: CWLObjectType | |
| 1227 sf = scandeps( | |
| 1228 subid, | |
| 1229 sub, | |
| 1230 reffields, | |
| 1231 urlfields, | |
| 1232 loadref, | |
| 1233 urljoin=urljoin, | |
| 1234 nestdirs=nestdirs, | |
| 1235 ) | |
| 1236 if sf: | |
| 1237 deps2["secondaryFiles"] = cast( | |
| 1238 MutableSequence[CWLOutputAtomType], sf | |
| 1239 ) | |
| 1240 if nestdirs: | |
| 1241 deps2 = nestdir(base, deps2) | |
| 1242 r.append(deps2) | |
| 1243 elif k in urlfields and k != "location": | |
| 1244 for u3 in aslist(v): | |
| 1245 deps = {"class": "File", "location": urljoin(base, u3)} | |
| 1246 if nestdirs: | |
| 1247 deps = nestdir(base, deps) | |
| 1248 r.append(deps) | |
| 1249 elif doc.get("class") in ("File", "Directory") and k in ( | |
| 1250 "listing", | |
| 1251 "secondaryFiles", | |
| 1252 ): | |
| 1253 # should be handled earlier. | |
| 1254 pass | |
| 1255 else: | |
| 1256 r.extend( | |
| 1257 scandeps( | |
| 1258 base, | |
| 1259 cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v), | |
| 1260 reffields, | |
| 1261 urlfields, | |
| 1262 loadref, | |
| 1263 urljoin=urljoin, | |
| 1264 nestdirs=nestdirs, | |
| 1265 ) | |
| 1266 ) | |
| 1267 elif isinstance(doc, MutableSequence): | |
| 1268 for d in doc: | |
| 1269 r.extend( | |
| 1270 scandeps( | |
| 1271 base, | |
| 1272 d, | |
| 1273 reffields, | |
| 1274 urlfields, | |
| 1275 loadref, | |
| 1276 urljoin=urljoin, | |
| 1277 nestdirs=nestdirs, | |
| 1278 ) | |
| 1279 ) | |
| 1280 | |
| 1281 if r: | |
| 1282 normalizeFilesDirs(r) | |
| 1283 r = mergedirs(cast(List[CWLObjectType], r)) | |
| 1284 | |
| 1285 return r | |
| 1286 | |
| 1287 | |
| 1288 def compute_checksums(fs_access: StdFsAccess, fileobj: CWLObjectType) -> None: | |
| 1289 if "checksum" not in fileobj: | |
| 1290 checksum = hashlib.sha1() # nosec | |
| 1291 location = cast(str, fileobj["location"]) | |
| 1292 with fs_access.open(location, "rb") as f: | |
| 1293 contents = f.read(1024 * 1024) | |
| 1294 while contents != b"": | |
| 1295 checksum.update(contents) | |
| 1296 contents = f.read(1024 * 1024) | |
| 1297 fileobj["checksum"] = "sha1$%s" % checksum.hexdigest() | |
| 1298 fileobj["size"] = fs_access.size(location) |
