Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/process.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/process.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,1298 @@ +"""Classes and methods relevant for all CWL Proccess types.""" +import abc +import copy +import functools +import hashlib +import json +import logging +import math +import os +import shutil +import stat +import textwrap +import urllib +import uuid +from os import scandir +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + MutableMapping, + MutableSequence, + Optional, + Set, + Sized, + Tuple, + Type, + Union, + cast, +) + +from pkg_resources import resource_stream +from rdflib import Graph +from ruamel.yaml.comments import CommentedMap, CommentedSeq +from schema_salad.avro.schema import ( + Names, + Schema, + SchemaParseException, + make_avsc_object, +) +from schema_salad.exceptions import ValidationException +from schema_salad.ref_resolver import Loader, file_uri, uri_file_path +from schema_salad.schema import load_schema, make_avro_schema, make_valid_avro +from schema_salad.sourceline import SourceLine, strip_dup_lineno +from schema_salad.utils import convert_to_dict +from schema_salad.validate import validate_ex +from typing_extensions import TYPE_CHECKING + +from . import expression +from .builder import Builder, HasReqsHints +from .context import LoadingContext, RuntimeContext, getdefault +from .errors import UnsupportedRequirement, WorkflowException +from .loghandler import _logger +from .mpi import MPIRequirementName +from .pathmapper import MapperEnt, PathMapper +from .secrets import SecretStore +from .stdfsaccess import StdFsAccess +from .update import INTERNAL_VERSION +from .utils import ( + CWLObjectType, + CWLOutputAtomType, + CWLOutputType, + JobsGeneratorType, + OutputCallbackType, + adjustDirObjs, + aslist, + cmp_like_py2, + copytree_with_merge, + ensure_writable, + get_listing, + normalizeFilesDirs, + onWindows, + random_outdir, + visit_class, +) +from .validate_js import validate_js_expressions + +if TYPE_CHECKING: + from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import + + +class LogAsDebugFilter(logging.Filter): + def __init__(self, name: str, parent: logging.Logger) -> None: + """Initialize.""" + name = str(name) + super().__init__(name) + self.parent = parent + + def filter(self, record: logging.LogRecord) -> bool: + return self.parent.isEnabledFor(logging.DEBUG) + + +_logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") +_logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) +_logger_validation_warnings.addFilter( + LogAsDebugFilter("cwltool.validation_warnings", _logger) +) + +supportedProcessRequirements = [ + "DockerRequirement", + "SchemaDefRequirement", + "EnvVarRequirement", + "ScatterFeatureRequirement", + "SubworkflowFeatureRequirement", + "MultipleInputFeatureRequirement", + "InlineJavascriptRequirement", + "ShellCommandRequirement", + "StepInputExpressionRequirement", + "ResourceRequirement", + "InitialWorkDirRequirement", + "ToolTimeLimit", + "WorkReuse", + "NetworkAccess", + "InplaceUpdateRequirement", + "LoadListingRequirement", + MPIRequirementName, + "http://commonwl.org/cwltool#TimeLimit", + "http://commonwl.org/cwltool#WorkReuse", + "http://commonwl.org/cwltool#NetworkAccess", + "http://commonwl.org/cwltool#LoadListingRequirement", + "http://commonwl.org/cwltool#InplaceUpdateRequirement", +] + +cwl_files = ( + "Workflow.yml", + "CommandLineTool.yml", + "CommonWorkflowLanguage.yml", + "Process.yml", + "Operation.yml", + "concepts.md", + "contrib.md", + "intro.md", + "invocation.md", +) + +salad_files = ( + "metaschema.yml", + "metaschema_base.yml", + "salad.md", + "field_name.yml", + "import_include.md", + "link_res.yml", + "ident_res.yml", + "vocab_res.yml", + "vocab_res.yml", + "field_name_schema.yml", + "field_name_src.yml", + "field_name_proc.yml", + "ident_res_schema.yml", + "ident_res_src.yml", + "ident_res_proc.yml", + "link_res_schema.yml", + "link_res_src.yml", + "link_res_proc.yml", + "vocab_res_schema.yml", + "vocab_res_src.yml", + "vocab_res_proc.yml", +) + +SCHEMA_CACHE = ( + {} +) # type: Dict[str, Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]] +SCHEMA_FILE = None # type: Optional[CWLObjectType] +SCHEMA_DIR = None # type: Optional[CWLObjectType] +SCHEMA_ANY = None # type: Optional[CWLObjectType] + +custom_schemas = {} # type: Dict[str, Tuple[str, str]] + + +def use_standard_schema(version: str) -> None: + if version in custom_schemas: + del custom_schemas[version] + if version in SCHEMA_CACHE: + del SCHEMA_CACHE[version] + + +def use_custom_schema(version: str, name: str, text: str) -> None: + custom_schemas[version] = (name, text) + if version in SCHEMA_CACHE: + del SCHEMA_CACHE[version] + + +def get_schema( + version: str, +) -> Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]: + + if version in SCHEMA_CACHE: + return SCHEMA_CACHE[version] + + cache = {} # type: Dict[str, Union[str, Graph, bool]] + version = version.split("#")[-1] + if ".dev" in version: + version = ".".join(version.split(".")[:-1]) + for f in cwl_files: + try: + res = resource_stream(__name__, f"schemas/{version}/{f}") + cache["https://w3id.org/cwl/" + f] = res.read().decode("UTF-8") + res.close() + except OSError: + pass + + for f in salad_files: + try: + res = resource_stream( + __name__, + f"schemas/{version}/salad/schema_salad/metaschema/{f}", + ) + cache[ + "https://w3id.org/cwl/salad/schema_salad/metaschema/" + f + ] = res.read().decode("UTF-8") + res.close() + except OSError: + pass + + if version in custom_schemas: + cache[custom_schemas[version][0]] = custom_schemas[version][1] + SCHEMA_CACHE[version] = load_schema(custom_schemas[version][0], cache=cache) + else: + SCHEMA_CACHE[version] = load_schema( + "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache + ) + + return SCHEMA_CACHE[version] + + +def shortname(inputid: str) -> str: + d = urllib.parse.urlparse(inputid) + if d.fragment: + return d.fragment.split("/")[-1] + return d.path.split("/")[-1] + + +def checkRequirements( + rec: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType, None], + supported_process_requirements: Iterable[str], +) -> None: + if isinstance(rec, MutableMapping): + if "requirements" in rec: + for i, entry in enumerate( + cast(MutableSequence[CWLObjectType], rec["requirements"]) + ): + with SourceLine(rec["requirements"], i, UnsupportedRequirement): + if cast(str, entry["class"]) not in supported_process_requirements: + raise UnsupportedRequirement( + "Unsupported requirement {}".format(entry["class"]) + ) + for key in rec: + checkRequirements(rec[key], supported_process_requirements) + if isinstance(rec, MutableSequence): + for entry2 in rec: + checkRequirements(entry2, supported_process_requirements) + + +def stage_files( + pathmapper: PathMapper, + stage_func: Optional[Callable[[str, str], None]] = None, + ignore_writable: bool = False, + symlink: bool = True, + secret_store: Optional[SecretStore] = None, + fix_conflicts: bool = False, +) -> None: + """Link or copy files to their targets. Create them as needed.""" + targets = {} # type: Dict[str, MapperEnt] + for key, entry in pathmapper.items(): + if "File" not in entry.type: + continue + if entry.target not in targets: + targets[entry.target] = entry + elif targets[entry.target].resolved != entry.resolved: + if fix_conflicts: + # find first key that does not clash with an existing entry in targets + # start with entry.target + '_' + 2 and then keep incrementing the number till there is no clash + i = 2 + tgt = f"{entry.target}_{i}" + while tgt in targets: + i += 1 + tgt = f"{entry.target}_{i}" + targets[tgt] = pathmapper.update( + key, entry.resolved, tgt, entry.type, entry.staged + ) + else: + raise WorkflowException( + "File staging conflict, trying to stage both %s and %s to the same target %s" + % (targets[entry.target].resolved, entry.resolved, entry.target) + ) + + for key, entry in pathmapper.items(): + if not entry.staged: + continue + if not os.path.exists(os.path.dirname(entry.target)): + os.makedirs(os.path.dirname(entry.target)) + if entry.type in ("File", "Directory") and os.path.exists(entry.resolved): + if symlink: # Use symlink func if allowed + if onWindows(): + if entry.type == "File": + shutil.copy(entry.resolved, entry.target) + elif entry.type == "Directory": + if os.path.exists(entry.target) and os.path.isdir(entry.target): + shutil.rmtree(entry.target) + copytree_with_merge(entry.resolved, entry.target) + else: + os.symlink(entry.resolved, entry.target) + elif stage_func is not None: + stage_func(entry.resolved, entry.target) + elif ( + entry.type == "Directory" + and not os.path.exists(entry.target) + and entry.resolved.startswith("_:") + ): + os.makedirs(entry.target) + elif entry.type == "WritableFile" and not ignore_writable: + shutil.copy(entry.resolved, entry.target) + ensure_writable(entry.target) + elif entry.type == "WritableDirectory" and not ignore_writable: + if entry.resolved.startswith("_:"): + os.makedirs(entry.target) + else: + shutil.copytree(entry.resolved, entry.target) + ensure_writable(entry.target) + elif entry.type == "CreateFile" or entry.type == "CreateWritableFile": + with open(entry.target, "wb") as new: + if secret_store is not None: + new.write( + cast(str, secret_store.retrieve(entry.resolved)).encode("utf-8") + ) + else: + new.write(entry.resolved.encode("utf-8")) + if entry.type == "CreateFile": + os.chmod(entry.target, stat.S_IRUSR) # Read only + else: # it is a "CreateWritableFile" + ensure_writable(entry.target) + pathmapper.update(key, entry.target, entry.target, entry.type, entry.staged) + + +def relocateOutputs( + outputObj: CWLObjectType, + destination_path: str, + source_directories: Set[str], + action: str, + fs_access: StdFsAccess, + compute_checksum: bool = True, + path_mapper: Type[PathMapper] = PathMapper, +) -> CWLObjectType: + adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True)) + + if action not in ("move", "copy"): + return outputObj + + def _collectDirEntries( + obj: Union[CWLObjectType, MutableSequence[CWLObjectType], None] + ) -> Iterator[CWLObjectType]: + if isinstance(obj, dict): + if obj.get("class") in ("File", "Directory"): + yield obj + else: + for sub_obj in obj.values(): + yield from _collectDirEntries(sub_obj) + elif isinstance(obj, MutableSequence): + for sub_obj in obj: + yield from _collectDirEntries(sub_obj) + + def _relocate(src: str, dst: str) -> None: + if src == dst: + return + + # If the source is not contained in source_directories we're not allowed to delete it + src = fs_access.realpath(src) + src_can_deleted = any( + os.path.commonprefix([p, src]) == p for p in source_directories + ) + + _action = "move" if action == "move" and src_can_deleted else "copy" + + if _action == "move": + _logger.debug("Moving %s to %s", src, dst) + if fs_access.isdir(src) and fs_access.isdir(dst): + # merge directories + for dir_entry in scandir(src): + _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name)) + else: + shutil.move(src, dst) + + elif _action == "copy": + _logger.debug("Copying %s to %s", src, dst) + if fs_access.isdir(src): + if os.path.isdir(dst): + shutil.rmtree(dst) + elif os.path.isfile(dst): + os.unlink(dst) + shutil.copytree(src, dst) + else: + shutil.copy2(src, dst) + + def _realpath( + ob: CWLObjectType, + ) -> None: # should be type Union[CWLFile, CWLDirectory] + location = cast(str, ob["location"]) + if location.startswith("file:"): + ob["location"] = file_uri(os.path.realpath(uri_file_path(location))) + elif location.startswith("/"): + ob["location"] = os.path.realpath(location) + elif not location.startswith("_:") and ":" in location: + ob["location"] = file_uri(fs_access.realpath(location)) + + outfiles = list(_collectDirEntries(outputObj)) + visit_class(outfiles, ("File", "Directory"), _realpath) + pm = path_mapper(outfiles, "", destination_path, separateDirs=False) + stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True) + + def _check_adjust(a_file: CWLObjectType) -> CWLObjectType: + a_file["location"] = file_uri(pm.mapper(cast(str, a_file["location"]))[1]) + if "contents" in a_file: + del a_file["contents"] + return a_file + + visit_class(outputObj, ("File", "Directory"), _check_adjust) + + if compute_checksum: + visit_class( + outputObj, ("File",), functools.partial(compute_checksums, fs_access) + ) + return outputObj + + +def cleanIntermediate(output_dirs: Iterable[str]) -> None: + for a in output_dirs: + if os.path.exists(a): + _logger.debug("Removing intermediate output directory %s", a) + shutil.rmtree(a, True) + + +def add_sizes(fsaccess: StdFsAccess, obj: CWLObjectType) -> None: + if "location" in obj: + try: + if "size" not in obj: + obj["size"] = fsaccess.size(cast(str, obj["location"])) + except OSError: + pass + elif "contents" in obj: + obj["size"] = len(cast(Sized, obj["contents"])) + return # best effort + + +def fill_in_defaults( + inputs: List[CWLObjectType], + job: CWLObjectType, + fsaccess: StdFsAccess, +) -> None: + for e, inp in enumerate(inputs): + with SourceLine( + inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG) + ): + fieldname = shortname(cast(str, inp["id"])) + if job.get(fieldname) is not None: + pass + elif job.get(fieldname) is None and "default" in inp: + job[fieldname] = copy.deepcopy(inp["default"]) + elif job.get(fieldname) is None and "null" in aslist(inp["type"]): + job[fieldname] = None + else: + raise WorkflowException( + "Missing required input parameter '%s'" + % shortname(cast(str, inp["id"])) + ) + + +def avroize_type( + field_type: Union[ + CWLObjectType, MutableSequence[CWLOutputType], CWLOutputType, None + ], + name_prefix: str = "", +) -> None: + """Add missing information to a type so that CWL types are valid.""" + if isinstance(field_type, MutableSequence): + for field in field_type: + avroize_type(field, name_prefix) + elif isinstance(field_type, MutableMapping): + if field_type["type"] in ("enum", "record"): + if "name" not in field_type: + field_type["name"] = name_prefix + str(uuid.uuid4()) + if field_type["type"] == "record": + avroize_type( + cast(MutableSequence[CWLOutputType], field_type["fields"]), name_prefix + ) + if field_type["type"] == "array": + avroize_type( + cast(MutableSequence[CWLOutputType], field_type["items"]), name_prefix + ) + if isinstance(field_type["type"], MutableSequence): + for ctype in field_type["type"]: + avroize_type(cast(CWLOutputType, ctype), name_prefix) + + +def get_overrides( + overrides: MutableSequence[CWLObjectType], toolid: str +) -> CWLObjectType: + req = {} # type: CWLObjectType + if not isinstance(overrides, MutableSequence): + raise ValidationException( + "Expected overrides to be a list, but was %s" % type(overrides) + ) + for ov in overrides: + if ov["overrideTarget"] == toolid: + req.update(ov) + return req + + +_VAR_SPOOL_ERROR = textwrap.dedent( + """ + Non-portable reference to /var/spool/cwl detected: '{}'. + To fix, replace /var/spool/cwl with $(runtime.outdir) or add + DockerRequirement to the 'requirements' section and declare + 'dockerOutputDirectory: /var/spool/cwl'. + """ +) + + +def var_spool_cwl_detector( + obj: CWLOutputType, + item: Optional[Any] = None, + obj_key: Optional[Any] = None, +) -> bool: + """Detect any textual reference to /var/spool/cwl.""" + r = False + if isinstance(obj, str): + if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory": + _logger.warning( + SourceLine(item=item, key=obj_key, raise_type=str).makeError( + _VAR_SPOOL_ERROR.format(obj) + ) + ) + r = True + elif isinstance(obj, MutableMapping): + for mkey, mvalue in obj.items(): + r = var_spool_cwl_detector(cast(CWLOutputType, mvalue), obj, mkey) or r + elif isinstance(obj, MutableSequence): + for lkey, lvalue in enumerate(obj): + r = var_spool_cwl_detector(cast(CWLOutputType, lvalue), obj, lkey) or r + return r + + +def eval_resource( + builder: Builder, resource_req: Union[str, int, float] +) -> Optional[Union[str, int, float]]: + if isinstance(resource_req, str) and expression.needs_parsing(resource_req): + result = builder.do_eval(resource_req) + if isinstance(result, (str, int)) or result is None: + return result + raise WorkflowException( + "Got incorrect return type {} from resource expression evaluation of {}.".format( + type(result), resource_req + ) + ) + return resource_req + + +# Threshold where the "too many files" warning kicks in +FILE_COUNT_WARNING = 5000 + + +class Process(HasReqsHints, metaclass=abc.ABCMeta): + def __init__( + self, toolpath_object: CommentedMap, loadingContext: LoadingContext + ) -> None: + """Build a Process object from the provided dictionary.""" + super().__init__() + self.metadata = getdefault(loadingContext.metadata, {}) # type: CWLObjectType + self.provenance_object = None # type: Optional[ProvenanceProfile] + self.parent_wf = None # type: Optional[ProvenanceProfile] + global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement + if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None: + get_schema("v1.0") + SCHEMA_ANY = cast( + CWLObjectType, + SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"], + ) + SCHEMA_FILE = cast( + CWLObjectType, + SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"], + ) + SCHEMA_DIR = cast( + CWLObjectType, + SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"], + ) + + self.names = make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], Loader({})) + self.tool = toolpath_object + self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, [])) + self.requirements.extend(self.tool.get("requirements", [])) + if "id" not in self.tool: + self.tool["id"] = "_:" + str(uuid.uuid4()) + self.requirements.extend( + cast( + List[CWLObjectType], + get_overrides( + getdefault(loadingContext.overrides_list, []), self.tool["id"] + ).get("requirements", []), + ) + ) + self.hints = copy.deepcopy(getdefault(loadingContext.hints, [])) + self.hints.extend(self.tool.get("hints", [])) + # Versions of requirements and hints which aren't mutated. + self.original_requirements = copy.deepcopy(self.requirements) + self.original_hints = copy.deepcopy(self.hints) + self.doc_loader = loadingContext.loader + self.doc_schema = loadingContext.avsc_names + + self.formatgraph = None # type: Optional[Graph] + if self.doc_loader is not None: + self.formatgraph = self.doc_loader.graph + + checkRequirements(self.tool, supportedProcessRequirements) + self.validate_hints( + cast(Names, loadingContext.avsc_names), + self.tool.get("hints", []), + strict=getdefault(loadingContext.strict, False), + ) + + self.schemaDefs = {} # type: MutableMapping[str, CWLObjectType] + + sd, _ = self.get_requirement("SchemaDefRequirement") + + if sd is not None: + sdtypes = cast(MutableSequence[CWLObjectType], sd["types"]) + avroize_type(cast(MutableSequence[CWLOutputType], sdtypes)) + av = make_valid_avro( + sdtypes, + {cast(str, t["name"]): cast(Dict[str, Any], t) for t in sdtypes}, + set(), + ) + for i in av: + self.schemaDefs[i["name"]] = i # type: ignore + make_avsc_object(convert_to_dict(av), self.names) + + # Build record schema from inputs + self.inputs_record_schema = { + "name": "input_record_schema", + "type": "record", + "fields": [], + } # type: CWLObjectType + self.outputs_record_schema = { + "name": "outputs_record_schema", + "type": "record", + "fields": [], + } # type: CWLObjectType + + for key in ("inputs", "outputs"): + for i in self.tool[key]: + c = copy.deepcopy(i) + c["name"] = shortname(c["id"]) + del c["id"] + + if "type" not in c: + raise ValidationException( + "Missing 'type' in parameter '{}'".format(c["name"]) + ) + + if "default" in c and "null" not in aslist(c["type"]): + nullable = ["null"] + nullable.extend(aslist(c["type"])) + c["type"] = nullable + else: + c["type"] = c["type"] + avroize_type(c["type"], c["name"]) + if key == "inputs": + cast( + List[CWLObjectType], self.inputs_record_schema["fields"] + ).append(c) + elif key == "outputs": + cast( + List[CWLObjectType], self.outputs_record_schema["fields"] + ).append(c) + + with SourceLine(toolpath_object, "inputs", ValidationException): + self.inputs_record_schema = cast( + CWLObjectType, + make_valid_avro(self.inputs_record_schema, {}, set()), + ) + make_avsc_object(convert_to_dict(self.inputs_record_schema), self.names) + with SourceLine(toolpath_object, "outputs", ValidationException): + self.outputs_record_schema = cast( + CWLObjectType, + make_valid_avro(self.outputs_record_schema, {}, set()), + ) + make_avsc_object(convert_to_dict(self.outputs_record_schema), self.names) + + if toolpath_object.get("class") is not None and not getdefault( + loadingContext.disable_js_validation, False + ): + validate_js_options = ( + None + ) # type: Optional[Dict[str, Union[List[str], str, int]]] + if loadingContext.js_hint_options_file is not None: + try: + with open(loadingContext.js_hint_options_file) as options_file: + validate_js_options = json.load(options_file) + except (OSError, ValueError): + _logger.error( + "Failed to read options file %s", + loadingContext.js_hint_options_file, + ) + raise + if self.doc_schema is not None: + validate_js_expressions( + toolpath_object, + self.doc_schema.names[toolpath_object["class"]], + validate_js_options, + ) + + dockerReq, is_req = self.get_requirement("DockerRequirement") + + if ( + dockerReq is not None + and "dockerOutputDirectory" in dockerReq + and is_req is not None + and not is_req + ): + _logger.warning( + SourceLine(item=dockerReq, raise_type=str).makeError( + "When 'dockerOutputDirectory' is declared, DockerRequirement " + "should go in the 'requirements' section, not 'hints'." + "" + ) + ) + + if ( + dockerReq is not None + and is_req is not None + and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl" + ): + if is_req: + # In this specific case, it is legal to have /var/spool/cwl, so skip the check. + pass + else: + # Must be a requirement + var_spool_cwl_detector(self.tool) + else: + var_spool_cwl_detector(self.tool) + + def _init_job( + self, joborder: CWLObjectType, runtime_context: RuntimeContext + ) -> Builder: + + if self.metadata.get("cwlVersion") != INTERNAL_VERSION: + raise WorkflowException( + "Process object loaded with version '%s', must update to '%s' in order to execute." + % (self.metadata.get("cwlVersion"), INTERNAL_VERSION) + ) + + job = copy.deepcopy(joborder) + + make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) + fs_access = make_fs_access(runtime_context.basedir) + + load_listing_req, _ = self.get_requirement("LoadListingRequirement") + + load_listing = ( + cast(str, load_listing_req.get("loadListing")) + if load_listing_req is not None + else "no_listing" + ) + + # Validate job order + try: + fill_in_defaults(self.tool["inputs"], job, fs_access) + + normalizeFilesDirs(job) + schema = self.names.get_name("input_record_schema", None) + if schema is None: + raise WorkflowException( + "Missing input record schema: " "{}".format(self.names) + ) + validate_ex(schema, job, strict=False, logger=_logger_validation_warnings) + + if load_listing and load_listing != "no_listing": + get_listing(fs_access, job, recursive=(load_listing == "deep_listing")) + + visit_class(job, ("File",), functools.partial(add_sizes, fs_access)) + + if load_listing == "deep_listing": + for i, inparm in enumerate(self.tool["inputs"]): + k = shortname(inparm["id"]) + if k not in job: + continue + v = job[k] + dircount = [0] + + def inc(d): # type: (List[int]) -> None + d[0] += 1 + + visit_class(v, ("Directory",), lambda x: inc(dircount)) + if dircount[0] == 0: + continue + filecount = [0] + visit_class(v, ("File",), lambda x: inc(filecount)) + if filecount[0] > FILE_COUNT_WARNING: + # Long lines in this message are okay, will be reflowed based on terminal columns. + _logger.warning( + strip_dup_lineno( + SourceLine(self.tool["inputs"], i, str).makeError( + """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use. + +If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior: + +$namespaces: + cwltool: "http://commonwl.org/cwltool#" +hints: + cwltool:LoadListingRequirement: + loadListing: shallow_listing + +""" + % (filecount[0], k) + ) + ) + ) + + except (ValidationException, WorkflowException) as err: + raise WorkflowException("Invalid job input record:\n" + str(err)) from err + + files = [] # type: List[CWLObjectType] + bindings = CommentedSeq() + outdir = "" + tmpdir = "" + stagedir = "" + + docker_req, _ = self.get_requirement("DockerRequirement") + default_docker = None + + if docker_req is None and runtime_context.default_container: + default_docker = runtime_context.default_container + + if (docker_req or default_docker) and runtime_context.use_container: + if docker_req is not None: + # Check if docker output directory is absolute + if docker_req.get("dockerOutputDirectory") and cast( + str, docker_req.get("dockerOutputDirectory") + ).startswith("/"): + outdir = cast(str, docker_req.get("dockerOutputDirectory")) + else: + outdir = cast( + str, + docker_req.get("dockerOutputDirectory") + or runtime_context.docker_outdir + or random_outdir(), + ) + elif default_docker is not None: + outdir = runtime_context.docker_outdir or random_outdir() + tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec + stagedir = runtime_context.docker_stagedir or "/var/lib/cwl" + else: + if self.tool["class"] == "CommandLineTool": + outdir = fs_access.realpath(runtime_context.get_outdir()) + tmpdir = fs_access.realpath(runtime_context.get_tmpdir()) + stagedir = fs_access.realpath(runtime_context.get_stagedir()) + + cwl_version = cast( + str, + self.metadata.get("http://commonwl.org/cwltool#original_cwlVersion", None), + ) + builder = Builder( + job, + files, + bindings, + self.schemaDefs, + self.names, + self.requirements, + self.hints, + {}, + runtime_context.mutation_manager, + self.formatgraph, + make_fs_access, + fs_access, + runtime_context.job_script_provider, + runtime_context.eval_timeout, + runtime_context.debug, + runtime_context.js_console, + runtime_context.force_docker_pull, + load_listing, + outdir, + tmpdir, + stagedir, + cwl_version, + ) + + bindings.extend( + builder.bind_input( + self.inputs_record_schema, + job, + discover_secondaryFiles=getdefault(runtime_context.toplevel, False), + ) + ) + + if self.tool.get("baseCommand"): + for index, command in enumerate(aslist(self.tool["baseCommand"])): + bindings.append({"position": [-1000000, index], "datum": command}) + + if self.tool.get("arguments"): + for i, arg in enumerate(self.tool["arguments"]): + lc = self.tool["arguments"].lc.data[i] + filename = self.tool["arguments"].lc.filename + bindings.lc.add_kv_line_col(len(bindings), lc) + if isinstance(arg, MutableMapping): + arg = copy.deepcopy(arg) + if arg.get("position"): + position = arg.get("position") + if isinstance(position, str): # no need to test the + # CWLVersion as the v1.0 + # schema only allows ints + position = builder.do_eval(position) + if position is None: + position = 0 + arg["position"] = [position, i] + else: + arg["position"] = [0, i] + bindings.append(arg) + elif ("$(" in arg) or ("${" in arg): + cm = CommentedMap((("position", [0, i]), ("valueFrom", arg))) + cm.lc.add_kv_line_col("valueFrom", lc) + cm.lc.filename = filename + bindings.append(cm) + else: + cm = CommentedMap((("position", [0, i]), ("datum", arg))) + cm.lc.add_kv_line_col("datum", lc) + cm.lc.filename = filename + bindings.append(cm) + + # use python2 like sorting of heterogeneous lists + # (containing str and int types), + key = functools.cmp_to_key(cmp_like_py2) + + # This awkward construction replaces the contents of + # "bindings" in place (because Builder expects it to be + # mutated in place, sigh, I'm sorry) with its contents sorted, + # supporting different versions of Python and ruamel.yaml with + # different behaviors/bugs in CommentedSeq. + bindings_copy = copy.deepcopy(bindings) + del bindings[:] + bindings.extend(sorted(bindings_copy, key=key)) + + if self.tool["class"] != "Workflow": + builder.resources = self.evalResources(builder, runtime_context) + return builder + + def evalResources( + self, builder: Builder, runtimeContext: RuntimeContext + ) -> Dict[str, Union[int, float, str]]: + resourceReq, _ = self.get_requirement("ResourceRequirement") + if resourceReq is None: + resourceReq = {} + cwl_version = self.metadata.get( + "http://commonwl.org/cwltool#original_cwlVersion", None + ) + if cwl_version == "v1.0": + ram = 1024 + else: + ram = 256 + request: Dict[str, Union[int, float, str]] = { + "coresMin": 1, + "coresMax": 1, + "ramMin": ram, + "ramMax": ram, + "tmpdirMin": 1024, + "tmpdirMax": 1024, + "outdirMin": 1024, + "outdirMax": 1024, + } + for a in ("cores", "ram", "tmpdir", "outdir"): + mn = mx = None # type: Optional[Union[int, float]] + if resourceReq.get(a + "Min"): + mn = cast( + Union[int, float], + eval_resource( + builder, cast(Union[str, int, float], resourceReq[a + "Min"]) + ), + ) + if resourceReq.get(a + "Max"): + mx = cast( + Union[int, float], + eval_resource( + builder, cast(Union[str, int, float], resourceReq[a + "Max"]) + ), + ) + if mn is None: + mn = mx + elif mx is None: + mx = mn + + if mn is not None: + request[a + "Min"] = mn + request[a + "Max"] = cast(Union[int, float], mx) + + if runtimeContext.select_resources is not None: + return runtimeContext.select_resources(request, runtimeContext) + return { + "cores": request["coresMin"], + "ram": math.ceil(request["ramMin"]) + if not isinstance(request["ramMin"], str) + else request["ramMin"], + "tmpdirSize": math.ceil(request["tmpdirMin"]) + if not isinstance(request["tmpdirMin"], str) + else request["tmpdirMin"], + "outdirSize": math.ceil(request["outdirMin"]) + if not isinstance(request["outdirMin"], str) + else request["outdirMin"], + } + + def validate_hints( + self, avsc_names: Names, hints: List[CWLObjectType], strict: bool + ) -> None: + for i, r in enumerate(hints): + sl = SourceLine(hints, i, ValidationException) + with sl: + if ( + avsc_names.get_name(cast(str, r["class"]), None) is not None + and self.doc_loader is not None + ): + plain_hint = { + key: r[key] + for key in r + if key not in self.doc_loader.identifiers + } # strip identifiers + validate_ex( + cast( + Schema, + avsc_names.get_name(cast(str, plain_hint["class"]), None), + ), + plain_hint, + strict=strict, + ) + elif r["class"] in ("NetworkAccess", "LoadListingRequirement"): + pass + else: + _logger.info(str(sl.makeError("Unknown hint %s" % (r["class"])))) + + def visit(self, op: Callable[[CommentedMap], None]) -> None: + op(self.tool) + + @abc.abstractmethod + def job( + self, + job_order: CWLObjectType, + output_callbacks: Optional[OutputCallbackType], + runtimeContext: RuntimeContext, + ) -> JobsGeneratorType: + pass + + +_names = set() # type: Set[str] + + +def uniquename(stem: str, names: Optional[Set[str]] = None) -> str: + global _names + if names is None: + names = _names + c = 1 + u = stem + while u in names: + c += 1 + u = f"{stem}_{c}" + names.add(u) + return u + + +def nestdir(base: str, deps: CWLObjectType) -> CWLObjectType: + dirname = os.path.dirname(base) + "/" + subid = cast(str, deps["location"]) + if subid.startswith(dirname): + s2 = subid[len(dirname) :] + sp = s2.split("/") + sp.pop() + while sp: + nx = sp.pop() + deps = {"class": "Directory", "basename": nx, "listing": [deps]} + return deps + + +def mergedirs(listing: List[CWLObjectType]) -> List[CWLObjectType]: + r = [] # type: List[CWLObjectType] + ents = {} # type: Dict[str, CWLObjectType] + collided = set() # type: Set[str] + for e in listing: + basename = cast(str, e["basename"]) + if basename not in ents: + ents[basename] = e + elif e["class"] == "Directory": + if e.get("listing"): + cast( + List[CWLObjectType], ents[basename].setdefault("listing", []) + ).extend(cast(List[CWLObjectType], e["listing"])) + if cast(str, ents[basename]["location"]).startswith("_:"): + ents[basename]["location"] = e["location"] + elif e["location"] != ents[basename]["location"]: + # same basename, different location, collision, + # rename both. + collided.add(basename) + e2 = ents[basename] + + e["basename"] = urllib.parse.quote(cast(str, e["location"]), safe="") + e2["basename"] = urllib.parse.quote(cast(str, e2["location"]), safe="") + + e["nameroot"], e["nameext"] = os.path.splitext(cast(str, e["basename"])) + e2["nameroot"], e2["nameext"] = os.path.splitext(cast(str, e2["basename"])) + + ents[cast(str, e["basename"])] = e + ents[cast(str, e2["basename"])] = e2 + for c in collided: + del ents[c] + for e in ents.values(): + if e["class"] == "Directory" and "listing" in e: + e["listing"] = cast( + MutableSequence[CWLOutputAtomType], + mergedirs(cast(List[CWLObjectType], e["listing"])), + ) + r.extend(ents.values()) + return r + + +CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl" + + +def scandeps( + base: str, + doc: Union[CWLObjectType, MutableSequence[CWLObjectType]], + reffields: Set[str], + urlfields: Set[str], + loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]], + urljoin: Callable[[str, str], str] = urllib.parse.urljoin, + nestdirs: bool = True, +) -> MutableSequence[CWLObjectType]: + r = [] # type: MutableSequence[CWLObjectType] + if isinstance(doc, MutableMapping): + if "id" in doc: + if cast(str, doc["id"]).startswith("file://"): + df, _ = urllib.parse.urldefrag(cast(str, doc["id"])) + if base != df: + r.append({"class": "File", "location": df, "format": CWL_IANA}) + base = df + + if doc.get("class") in ("File", "Directory") and "location" in urlfields: + u = cast(Optional[str], doc.get("location", doc.get("path"))) + if u and not u.startswith("_:"): + deps = { + "class": doc["class"], + "location": urljoin(base, u), + } # type: CWLObjectType + if "basename" in doc: + deps["basename"] = doc["basename"] + if doc["class"] == "Directory" and "listing" in doc: + deps["listing"] = doc["listing"] + if doc["class"] == "File" and "secondaryFiles" in doc: + deps["secondaryFiles"] = cast( + CWLOutputAtomType, + scandeps( + base, + cast( + Union[CWLObjectType, MutableSequence[CWLObjectType]], + doc["secondaryFiles"], + ), + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + ), + ) + if nestdirs: + deps = nestdir(base, deps) + r.append(deps) + else: + if doc["class"] == "Directory" and "listing" in doc: + r.extend( + scandeps( + base, + cast(MutableSequence[CWLObjectType], doc["listing"]), + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + ) + ) + elif doc["class"] == "File" and "secondaryFiles" in doc: + r.extend( + scandeps( + base, + cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]), + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + ) + ) + + for k, v in doc.items(): + if k in reffields: + for u2 in aslist(v): + if isinstance(u2, MutableMapping): + r.extend( + scandeps( + base, + u2, + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + ) + ) + else: + subid = urljoin(base, u2) + basedf, _ = urllib.parse.urldefrag(base) + subiddf, _ = urllib.parse.urldefrag(subid) + if basedf == subiddf: + continue + sub = cast( + Union[MutableSequence[CWLObjectType], CWLObjectType], + loadref(base, u2), + ) + deps2 = { + "class": "File", + "location": subid, + "format": CWL_IANA, + } # type: CWLObjectType + sf = scandeps( + subid, + sub, + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + ) + if sf: + deps2["secondaryFiles"] = cast( + MutableSequence[CWLOutputAtomType], sf + ) + if nestdirs: + deps2 = nestdir(base, deps2) + r.append(deps2) + elif k in urlfields and k != "location": + for u3 in aslist(v): + deps = {"class": "File", "location": urljoin(base, u3)} + if nestdirs: + deps = nestdir(base, deps) + r.append(deps) + elif doc.get("class") in ("File", "Directory") and k in ( + "listing", + "secondaryFiles", + ): + # should be handled earlier. + pass + else: + r.extend( + scandeps( + base, + cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v), + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + ) + ) + elif isinstance(doc, MutableSequence): + for d in doc: + r.extend( + scandeps( + base, + d, + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + ) + ) + + if r: + normalizeFilesDirs(r) + r = mergedirs(cast(List[CWLObjectType], r)) + + return r + + +def compute_checksums(fs_access: StdFsAccess, fileobj: CWLObjectType) -> None: + if "checksum" not in fileobj: + checksum = hashlib.sha1() # nosec + location = cast(str, fileobj["location"]) + with fs_access.open(location, "rb") as f: + contents = f.read(1024 * 1024) + while contents != b"": + checksum.update(contents) + contents = f.read(1024 * 1024) + fileobj["checksum"] = "sha1$%s" % checksum.hexdigest() + fileobj["size"] = fs_access.size(location)