Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/process.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
"""Classes and methods relevant for all CWL Proccess types.""" import abc import copy import functools import hashlib import json import logging import math import os import shutil import stat import textwrap import urllib import uuid from os import scandir from typing import ( Any, Callable, Dict, Iterable, Iterator, List, MutableMapping, MutableSequence, Optional, Set, Sized, Tuple, Type, Union, cast, ) from pkg_resources import resource_stream from rdflib import Graph from ruamel.yaml.comments import CommentedMap, CommentedSeq from schema_salad.avro.schema import ( Names, Schema, SchemaParseException, make_avsc_object, ) from schema_salad.exceptions import ValidationException from schema_salad.ref_resolver import Loader, file_uri, uri_file_path from schema_salad.schema import load_schema, make_avro_schema, make_valid_avro from schema_salad.sourceline import SourceLine, strip_dup_lineno from schema_salad.utils import convert_to_dict from schema_salad.validate import validate_ex from typing_extensions import TYPE_CHECKING from . import expression from .builder import Builder, HasReqsHints from .context import LoadingContext, RuntimeContext, getdefault from .errors import UnsupportedRequirement, WorkflowException from .loghandler import _logger from .mpi import MPIRequirementName from .pathmapper import MapperEnt, PathMapper from .secrets import SecretStore from .stdfsaccess import StdFsAccess from .update import INTERNAL_VERSION from .utils import ( CWLObjectType, CWLOutputAtomType, CWLOutputType, JobsGeneratorType, OutputCallbackType, adjustDirObjs, aslist, cmp_like_py2, copytree_with_merge, ensure_writable, get_listing, normalizeFilesDirs, onWindows, random_outdir, visit_class, ) from .validate_js import validate_js_expressions if TYPE_CHECKING: from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import class LogAsDebugFilter(logging.Filter): def __init__(self, name: str, parent: logging.Logger) -> None: """Initialize.""" name = str(name) super().__init__(name) self.parent = parent def filter(self, record: logging.LogRecord) -> bool: return self.parent.isEnabledFor(logging.DEBUG) _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") _logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) _logger_validation_warnings.addFilter( LogAsDebugFilter("cwltool.validation_warnings", _logger) ) supportedProcessRequirements = [ "DockerRequirement", "SchemaDefRequirement", "EnvVarRequirement", "ScatterFeatureRequirement", "SubworkflowFeatureRequirement", "MultipleInputFeatureRequirement", "InlineJavascriptRequirement", "ShellCommandRequirement", "StepInputExpressionRequirement", "ResourceRequirement", "InitialWorkDirRequirement", "ToolTimeLimit", "WorkReuse", "NetworkAccess", "InplaceUpdateRequirement", "LoadListingRequirement", MPIRequirementName, "http://commonwl.org/cwltool#TimeLimit", "http://commonwl.org/cwltool#WorkReuse", "http://commonwl.org/cwltool#NetworkAccess", "http://commonwl.org/cwltool#LoadListingRequirement", "http://commonwl.org/cwltool#InplaceUpdateRequirement", ] cwl_files = ( "Workflow.yml", "CommandLineTool.yml", "CommonWorkflowLanguage.yml", "Process.yml", "Operation.yml", "concepts.md", "contrib.md", "intro.md", "invocation.md", ) salad_files = ( "metaschema.yml", "metaschema_base.yml", "salad.md", "field_name.yml", "import_include.md", "link_res.yml", "ident_res.yml", "vocab_res.yml", "vocab_res.yml", "field_name_schema.yml", "field_name_src.yml", "field_name_proc.yml", "ident_res_schema.yml", "ident_res_src.yml", "ident_res_proc.yml", "link_res_schema.yml", "link_res_src.yml", "link_res_proc.yml", "vocab_res_schema.yml", "vocab_res_src.yml", "vocab_res_proc.yml", ) SCHEMA_CACHE = ( {} ) # type: Dict[str, Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]] SCHEMA_FILE = None # type: Optional[CWLObjectType] SCHEMA_DIR = None # type: Optional[CWLObjectType] SCHEMA_ANY = None # type: Optional[CWLObjectType] custom_schemas = {} # type: Dict[str, Tuple[str, str]] def use_standard_schema(version: str) -> None: if version in custom_schemas: del custom_schemas[version] if version in SCHEMA_CACHE: del SCHEMA_CACHE[version] def use_custom_schema(version: str, name: str, text: str) -> None: custom_schemas[version] = (name, text) if version in SCHEMA_CACHE: del SCHEMA_CACHE[version] def get_schema( version: str, ) -> Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]: if version in SCHEMA_CACHE: return SCHEMA_CACHE[version] cache = {} # type: Dict[str, Union[str, Graph, bool]] version = version.split("#")[-1] if ".dev" in version: version = ".".join(version.split(".")[:-1]) for f in cwl_files: try: res = resource_stream(__name__, f"schemas/{version}/{f}") cache["https://w3id.org/cwl/" + f] = res.read().decode("UTF-8") res.close() except OSError: pass for f in salad_files: try: res = resource_stream( __name__, f"schemas/{version}/salad/schema_salad/metaschema/{f}", ) cache[ "https://w3id.org/cwl/salad/schema_salad/metaschema/" + f ] = res.read().decode("UTF-8") res.close() except OSError: pass if version in custom_schemas: cache[custom_schemas[version][0]] = custom_schemas[version][1] SCHEMA_CACHE[version] = load_schema(custom_schemas[version][0], cache=cache) else: SCHEMA_CACHE[version] = load_schema( "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache ) return SCHEMA_CACHE[version] def shortname(inputid: str) -> str: d = urllib.parse.urlparse(inputid) if d.fragment: return d.fragment.split("/")[-1] return d.path.split("/")[-1] def checkRequirements( rec: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType, None], supported_process_requirements: Iterable[str], ) -> None: if isinstance(rec, MutableMapping): if "requirements" in rec: for i, entry in enumerate( cast(MutableSequence[CWLObjectType], rec["requirements"]) ): with SourceLine(rec["requirements"], i, UnsupportedRequirement): if cast(str, entry["class"]) not in supported_process_requirements: raise UnsupportedRequirement( "Unsupported requirement {}".format(entry["class"]) ) for key in rec: checkRequirements(rec[key], supported_process_requirements) if isinstance(rec, MutableSequence): for entry2 in rec: checkRequirements(entry2, supported_process_requirements) def stage_files( pathmapper: PathMapper, stage_func: Optional[Callable[[str, str], None]] = None, ignore_writable: bool = False, symlink: bool = True, secret_store: Optional[SecretStore] = None, fix_conflicts: bool = False, ) -> None: """Link or copy files to their targets. Create them as needed.""" targets = {} # type: Dict[str, MapperEnt] for key, entry in pathmapper.items(): if "File" not in entry.type: continue if entry.target not in targets: targets[entry.target] = entry elif targets[entry.target].resolved != entry.resolved: if fix_conflicts: # find first key that does not clash with an existing entry in targets # start with entry.target + '_' + 2 and then keep incrementing the number till there is no clash i = 2 tgt = f"{entry.target}_{i}" while tgt in targets: i += 1 tgt = f"{entry.target}_{i}" targets[tgt] = pathmapper.update( key, entry.resolved, tgt, entry.type, entry.staged ) else: raise WorkflowException( "File staging conflict, trying to stage both %s and %s to the same target %s" % (targets[entry.target].resolved, entry.resolved, entry.target) ) for key, entry in pathmapper.items(): if not entry.staged: continue if not os.path.exists(os.path.dirname(entry.target)): os.makedirs(os.path.dirname(entry.target)) if entry.type in ("File", "Directory") and os.path.exists(entry.resolved): if symlink: # Use symlink func if allowed if onWindows(): if entry.type == "File": shutil.copy(entry.resolved, entry.target) elif entry.type == "Directory": if os.path.exists(entry.target) and os.path.isdir(entry.target): shutil.rmtree(entry.target) copytree_with_merge(entry.resolved, entry.target) else: os.symlink(entry.resolved, entry.target) elif stage_func is not None: stage_func(entry.resolved, entry.target) elif ( entry.type == "Directory" and not os.path.exists(entry.target) and entry.resolved.startswith("_:") ): os.makedirs(entry.target) elif entry.type == "WritableFile" and not ignore_writable: shutil.copy(entry.resolved, entry.target) ensure_writable(entry.target) elif entry.type == "WritableDirectory" and not ignore_writable: if entry.resolved.startswith("_:"): os.makedirs(entry.target) else: shutil.copytree(entry.resolved, entry.target) ensure_writable(entry.target) elif entry.type == "CreateFile" or entry.type == "CreateWritableFile": with open(entry.target, "wb") as new: if secret_store is not None: new.write( cast(str, secret_store.retrieve(entry.resolved)).encode("utf-8") ) else: new.write(entry.resolved.encode("utf-8")) if entry.type == "CreateFile": os.chmod(entry.target, stat.S_IRUSR) # Read only else: # it is a "CreateWritableFile" ensure_writable(entry.target) pathmapper.update(key, entry.target, entry.target, entry.type, entry.staged) def relocateOutputs( outputObj: CWLObjectType, destination_path: str, source_directories: Set[str], action: str, fs_access: StdFsAccess, compute_checksum: bool = True, path_mapper: Type[PathMapper] = PathMapper, ) -> CWLObjectType: adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True)) if action not in ("move", "copy"): return outputObj def _collectDirEntries( obj: Union[CWLObjectType, MutableSequence[CWLObjectType], None] ) -> Iterator[CWLObjectType]: if isinstance(obj, dict): if obj.get("class") in ("File", "Directory"): yield obj else: for sub_obj in obj.values(): yield from _collectDirEntries(sub_obj) elif isinstance(obj, MutableSequence): for sub_obj in obj: yield from _collectDirEntries(sub_obj) def _relocate(src: str, dst: str) -> None: if src == dst: return # If the source is not contained in source_directories we're not allowed to delete it src = fs_access.realpath(src) src_can_deleted = any( os.path.commonprefix([p, src]) == p for p in source_directories ) _action = "move" if action == "move" and src_can_deleted else "copy" if _action == "move": _logger.debug("Moving %s to %s", src, dst) if fs_access.isdir(src) and fs_access.isdir(dst): # merge directories for dir_entry in scandir(src): _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name)) else: shutil.move(src, dst) elif _action == "copy": _logger.debug("Copying %s to %s", src, dst) if fs_access.isdir(src): if os.path.isdir(dst): shutil.rmtree(dst) elif os.path.isfile(dst): os.unlink(dst) shutil.copytree(src, dst) else: shutil.copy2(src, dst) def _realpath( ob: CWLObjectType, ) -> None: # should be type Union[CWLFile, CWLDirectory] location = cast(str, ob["location"]) if location.startswith("file:"): ob["location"] = file_uri(os.path.realpath(uri_file_path(location))) elif location.startswith("/"): ob["location"] = os.path.realpath(location) elif not location.startswith("_:") and ":" in location: ob["location"] = file_uri(fs_access.realpath(location)) outfiles = list(_collectDirEntries(outputObj)) visit_class(outfiles, ("File", "Directory"), _realpath) pm = path_mapper(outfiles, "", destination_path, separateDirs=False) stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True) def _check_adjust(a_file: CWLObjectType) -> CWLObjectType: a_file["location"] = file_uri(pm.mapper(cast(str, a_file["location"]))[1]) if "contents" in a_file: del a_file["contents"] return a_file visit_class(outputObj, ("File", "Directory"), _check_adjust) if compute_checksum: visit_class( outputObj, ("File",), functools.partial(compute_checksums, fs_access) ) return outputObj def cleanIntermediate(output_dirs: Iterable[str]) -> None: for a in output_dirs: if os.path.exists(a): _logger.debug("Removing intermediate output directory %s", a) shutil.rmtree(a, True) def add_sizes(fsaccess: StdFsAccess, obj: CWLObjectType) -> None: if "location" in obj: try: if "size" not in obj: obj["size"] = fsaccess.size(cast(str, obj["location"])) except OSError: pass elif "contents" in obj: obj["size"] = len(cast(Sized, obj["contents"])) return # best effort def fill_in_defaults( inputs: List[CWLObjectType], job: CWLObjectType, fsaccess: StdFsAccess, ) -> None: for e, inp in enumerate(inputs): with SourceLine( inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG) ): fieldname = shortname(cast(str, inp["id"])) if job.get(fieldname) is not None: pass elif job.get(fieldname) is None and "default" in inp: job[fieldname] = copy.deepcopy(inp["default"]) elif job.get(fieldname) is None and "null" in aslist(inp["type"]): job[fieldname] = None else: raise WorkflowException( "Missing required input parameter '%s'" % shortname(cast(str, inp["id"])) ) def avroize_type( field_type: Union[ CWLObjectType, MutableSequence[CWLOutputType], CWLOutputType, None ], name_prefix: str = "", ) -> None: """Add missing information to a type so that CWL types are valid.""" if isinstance(field_type, MutableSequence): for field in field_type: avroize_type(field, name_prefix) elif isinstance(field_type, MutableMapping): if field_type["type"] in ("enum", "record"): if "name" not in field_type: field_type["name"] = name_prefix + str(uuid.uuid4()) if field_type["type"] == "record": avroize_type( cast(MutableSequence[CWLOutputType], field_type["fields"]), name_prefix ) if field_type["type"] == "array": avroize_type( cast(MutableSequence[CWLOutputType], field_type["items"]), name_prefix ) if isinstance(field_type["type"], MutableSequence): for ctype in field_type["type"]: avroize_type(cast(CWLOutputType, ctype), name_prefix) def get_overrides( overrides: MutableSequence[CWLObjectType], toolid: str ) -> CWLObjectType: req = {} # type: CWLObjectType if not isinstance(overrides, MutableSequence): raise ValidationException( "Expected overrides to be a list, but was %s" % type(overrides) ) for ov in overrides: if ov["overrideTarget"] == toolid: req.update(ov) return req _VAR_SPOOL_ERROR = textwrap.dedent( """ Non-portable reference to /var/spool/cwl detected: '{}'. To fix, replace /var/spool/cwl with $(runtime.outdir) or add DockerRequirement to the 'requirements' section and declare 'dockerOutputDirectory: /var/spool/cwl'. """ ) def var_spool_cwl_detector( obj: CWLOutputType, item: Optional[Any] = None, obj_key: Optional[Any] = None, ) -> bool: """Detect any textual reference to /var/spool/cwl.""" r = False if isinstance(obj, str): if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory": _logger.warning( SourceLine(item=item, key=obj_key, raise_type=str).makeError( _VAR_SPOOL_ERROR.format(obj) ) ) r = True elif isinstance(obj, MutableMapping): for mkey, mvalue in obj.items(): r = var_spool_cwl_detector(cast(CWLOutputType, mvalue), obj, mkey) or r elif isinstance(obj, MutableSequence): for lkey, lvalue in enumerate(obj): r = var_spool_cwl_detector(cast(CWLOutputType, lvalue), obj, lkey) or r return r def eval_resource( builder: Builder, resource_req: Union[str, int, float] ) -> Optional[Union[str, int, float]]: if isinstance(resource_req, str) and expression.needs_parsing(resource_req): result = builder.do_eval(resource_req) if isinstance(result, (str, int)) or result is None: return result raise WorkflowException( "Got incorrect return type {} from resource expression evaluation of {}.".format( type(result), resource_req ) ) return resource_req # Threshold where the "too many files" warning kicks in FILE_COUNT_WARNING = 5000 class Process(HasReqsHints, metaclass=abc.ABCMeta): def __init__( self, toolpath_object: CommentedMap, loadingContext: LoadingContext ) -> None: """Build a Process object from the provided dictionary.""" super().__init__() self.metadata = getdefault(loadingContext.metadata, {}) # type: CWLObjectType self.provenance_object = None # type: Optional[ProvenanceProfile] self.parent_wf = None # type: Optional[ProvenanceProfile] global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None: get_schema("v1.0") SCHEMA_ANY = cast( CWLObjectType, SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"], ) SCHEMA_FILE = cast( CWLObjectType, SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"], ) SCHEMA_DIR = cast( CWLObjectType, SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"], ) self.names = make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], Loader({})) self.tool = toolpath_object self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, [])) self.requirements.extend(self.tool.get("requirements", [])) if "id" not in self.tool: self.tool["id"] = "_:" + str(uuid.uuid4()) self.requirements.extend( cast( List[CWLObjectType], get_overrides( getdefault(loadingContext.overrides_list, []), self.tool["id"] ).get("requirements", []), ) ) self.hints = copy.deepcopy(getdefault(loadingContext.hints, [])) self.hints.extend(self.tool.get("hints", [])) # Versions of requirements and hints which aren't mutated. self.original_requirements = copy.deepcopy(self.requirements) self.original_hints = copy.deepcopy(self.hints) self.doc_loader = loadingContext.loader self.doc_schema = loadingContext.avsc_names self.formatgraph = None # type: Optional[Graph] if self.doc_loader is not None: self.formatgraph = self.doc_loader.graph checkRequirements(self.tool, supportedProcessRequirements) self.validate_hints( cast(Names, loadingContext.avsc_names), self.tool.get("hints", []), strict=getdefault(loadingContext.strict, False), ) self.schemaDefs = {} # type: MutableMapping[str, CWLObjectType] sd, _ = self.get_requirement("SchemaDefRequirement") if sd is not None: sdtypes = cast(MutableSequence[CWLObjectType], sd["types"]) avroize_type(cast(MutableSequence[CWLOutputType], sdtypes)) av = make_valid_avro( sdtypes, {cast(str, t["name"]): cast(Dict[str, Any], t) for t in sdtypes}, set(), ) for i in av: self.schemaDefs[i["name"]] = i # type: ignore make_avsc_object(convert_to_dict(av), self.names) # Build record schema from inputs self.inputs_record_schema = { "name": "input_record_schema", "type": "record", "fields": [], } # type: CWLObjectType self.outputs_record_schema = { "name": "outputs_record_schema", "type": "record", "fields": [], } # type: CWLObjectType for key in ("inputs", "outputs"): for i in self.tool[key]: c = copy.deepcopy(i) c["name"] = shortname(c["id"]) del c["id"] if "type" not in c: raise ValidationException( "Missing 'type' in parameter '{}'".format(c["name"]) ) if "default" in c and "null" not in aslist(c["type"]): nullable = ["null"] nullable.extend(aslist(c["type"])) c["type"] = nullable else: c["type"] = c["type"] avroize_type(c["type"], c["name"]) if key == "inputs": cast( List[CWLObjectType], self.inputs_record_schema["fields"] ).append(c) elif key == "outputs": cast( List[CWLObjectType], self.outputs_record_schema["fields"] ).append(c) with SourceLine(toolpath_object, "inputs", ValidationException): self.inputs_record_schema = cast( CWLObjectType, make_valid_avro(self.inputs_record_schema, {}, set()), ) make_avsc_object(convert_to_dict(self.inputs_record_schema), self.names) with SourceLine(toolpath_object, "outputs", ValidationException): self.outputs_record_schema = cast( CWLObjectType, make_valid_avro(self.outputs_record_schema, {}, set()), ) make_avsc_object(convert_to_dict(self.outputs_record_schema), self.names) if toolpath_object.get("class") is not None and not getdefault( loadingContext.disable_js_validation, False ): validate_js_options = ( None ) # type: Optional[Dict[str, Union[List[str], str, int]]] if loadingContext.js_hint_options_file is not None: try: with open(loadingContext.js_hint_options_file) as options_file: validate_js_options = json.load(options_file) except (OSError, ValueError): _logger.error( "Failed to read options file %s", loadingContext.js_hint_options_file, ) raise if self.doc_schema is not None: validate_js_expressions( toolpath_object, self.doc_schema.names[toolpath_object["class"]], validate_js_options, ) dockerReq, is_req = self.get_requirement("DockerRequirement") if ( dockerReq is not None and "dockerOutputDirectory" in dockerReq and is_req is not None and not is_req ): _logger.warning( SourceLine(item=dockerReq, raise_type=str).makeError( "When 'dockerOutputDirectory' is declared, DockerRequirement " "should go in the 'requirements' section, not 'hints'." "" ) ) if ( dockerReq is not None and is_req is not None and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl" ): if is_req: # In this specific case, it is legal to have /var/spool/cwl, so skip the check. pass else: # Must be a requirement var_spool_cwl_detector(self.tool) else: var_spool_cwl_detector(self.tool) def _init_job( self, joborder: CWLObjectType, runtime_context: RuntimeContext ) -> Builder: if self.metadata.get("cwlVersion") != INTERNAL_VERSION: raise WorkflowException( "Process object loaded with version '%s', must update to '%s' in order to execute." % (self.metadata.get("cwlVersion"), INTERNAL_VERSION) ) job = copy.deepcopy(joborder) make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) fs_access = make_fs_access(runtime_context.basedir) load_listing_req, _ = self.get_requirement("LoadListingRequirement") load_listing = ( cast(str, load_listing_req.get("loadListing")) if load_listing_req is not None else "no_listing" ) # Validate job order try: fill_in_defaults(self.tool["inputs"], job, fs_access) normalizeFilesDirs(job) schema = self.names.get_name("input_record_schema", None) if schema is None: raise WorkflowException( "Missing input record schema: " "{}".format(self.names) ) validate_ex(schema, job, strict=False, logger=_logger_validation_warnings) if load_listing and load_listing != "no_listing": get_listing(fs_access, job, recursive=(load_listing == "deep_listing")) visit_class(job, ("File",), functools.partial(add_sizes, fs_access)) if load_listing == "deep_listing": for i, inparm in enumerate(self.tool["inputs"]): k = shortname(inparm["id"]) if k not in job: continue v = job[k] dircount = [0] def inc(d): # type: (List[int]) -> None d[0] += 1 visit_class(v, ("Directory",), lambda x: inc(dircount)) if dircount[0] == 0: continue filecount = [0] visit_class(v, ("File",), lambda x: inc(filecount)) if filecount[0] > FILE_COUNT_WARNING: # Long lines in this message are okay, will be reflowed based on terminal columns. _logger.warning( strip_dup_lineno( SourceLine(self.tool["inputs"], i, str).makeError( """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use. If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior: $namespaces: cwltool: "http://commonwl.org/cwltool#" hints: cwltool:LoadListingRequirement: loadListing: shallow_listing """ % (filecount[0], k) ) ) ) except (ValidationException, WorkflowException) as err: raise WorkflowException("Invalid job input record:\n" + str(err)) from err files = [] # type: List[CWLObjectType] bindings = CommentedSeq() outdir = "" tmpdir = "" stagedir = "" docker_req, _ = self.get_requirement("DockerRequirement") default_docker = None if docker_req is None and runtime_context.default_container: default_docker = runtime_context.default_container if (docker_req or default_docker) and runtime_context.use_container: if docker_req is not None: # Check if docker output directory is absolute if docker_req.get("dockerOutputDirectory") and cast( str, docker_req.get("dockerOutputDirectory") ).startswith("/"): outdir = cast(str, docker_req.get("dockerOutputDirectory")) else: outdir = cast( str, docker_req.get("dockerOutputDirectory") or runtime_context.docker_outdir or random_outdir(), ) elif default_docker is not None: outdir = runtime_context.docker_outdir or random_outdir() tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec stagedir = runtime_context.docker_stagedir or "/var/lib/cwl" else: if self.tool["class"] == "CommandLineTool": outdir = fs_access.realpath(runtime_context.get_outdir()) tmpdir = fs_access.realpath(runtime_context.get_tmpdir()) stagedir = fs_access.realpath(runtime_context.get_stagedir()) cwl_version = cast( str, self.metadata.get("http://commonwl.org/cwltool#original_cwlVersion", None), ) builder = Builder( job, files, bindings, self.schemaDefs, self.names, self.requirements, self.hints, {}, runtime_context.mutation_manager, self.formatgraph, make_fs_access, fs_access, runtime_context.job_script_provider, runtime_context.eval_timeout, runtime_context.debug, runtime_context.js_console, runtime_context.force_docker_pull, load_listing, outdir, tmpdir, stagedir, cwl_version, ) bindings.extend( builder.bind_input( self.inputs_record_schema, job, discover_secondaryFiles=getdefault(runtime_context.toplevel, False), ) ) if self.tool.get("baseCommand"): for index, command in enumerate(aslist(self.tool["baseCommand"])): bindings.append({"position": [-1000000, index], "datum": command}) if self.tool.get("arguments"): for i, arg in enumerate(self.tool["arguments"]): lc = self.tool["arguments"].lc.data[i] filename = self.tool["arguments"].lc.filename bindings.lc.add_kv_line_col(len(bindings), lc) if isinstance(arg, MutableMapping): arg = copy.deepcopy(arg) if arg.get("position"): position = arg.get("position") if isinstance(position, str): # no need to test the # CWLVersion as the v1.0 # schema only allows ints position = builder.do_eval(position) if position is None: position = 0 arg["position"] = [position, i] else: arg["position"] = [0, i] bindings.append(arg) elif ("$(" in arg) or ("${" in arg): cm = CommentedMap((("position", [0, i]), ("valueFrom", arg))) cm.lc.add_kv_line_col("valueFrom", lc) cm.lc.filename = filename bindings.append(cm) else: cm = CommentedMap((("position", [0, i]), ("datum", arg))) cm.lc.add_kv_line_col("datum", lc) cm.lc.filename = filename bindings.append(cm) # use python2 like sorting of heterogeneous lists # (containing str and int types), key = functools.cmp_to_key(cmp_like_py2) # This awkward construction replaces the contents of # "bindings" in place (because Builder expects it to be # mutated in place, sigh, I'm sorry) with its contents sorted, # supporting different versions of Python and ruamel.yaml with # different behaviors/bugs in CommentedSeq. bindings_copy = copy.deepcopy(bindings) del bindings[:] bindings.extend(sorted(bindings_copy, key=key)) if self.tool["class"] != "Workflow": builder.resources = self.evalResources(builder, runtime_context) return builder def evalResources( self, builder: Builder, runtimeContext: RuntimeContext ) -> Dict[str, Union[int, float, str]]: resourceReq, _ = self.get_requirement("ResourceRequirement") if resourceReq is None: resourceReq = {} cwl_version = self.metadata.get( "http://commonwl.org/cwltool#original_cwlVersion", None ) if cwl_version == "v1.0": ram = 1024 else: ram = 256 request: Dict[str, Union[int, float, str]] = { "coresMin": 1, "coresMax": 1, "ramMin": ram, "ramMax": ram, "tmpdirMin": 1024, "tmpdirMax": 1024, "outdirMin": 1024, "outdirMax": 1024, } for a in ("cores", "ram", "tmpdir", "outdir"): mn = mx = None # type: Optional[Union[int, float]] if resourceReq.get(a + "Min"): mn = cast( Union[int, float], eval_resource( builder, cast(Union[str, int, float], resourceReq[a + "Min"]) ), ) if resourceReq.get(a + "Max"): mx = cast( Union[int, float], eval_resource( builder, cast(Union[str, int, float], resourceReq[a + "Max"]) ), ) if mn is None: mn = mx elif mx is None: mx = mn if mn is not None: request[a + "Min"] = mn request[a + "Max"] = cast(Union[int, float], mx) if runtimeContext.select_resources is not None: return runtimeContext.select_resources(request, runtimeContext) return { "cores": request["coresMin"], "ram": math.ceil(request["ramMin"]) if not isinstance(request["ramMin"], str) else request["ramMin"], "tmpdirSize": math.ceil(request["tmpdirMin"]) if not isinstance(request["tmpdirMin"], str) else request["tmpdirMin"], "outdirSize": math.ceil(request["outdirMin"]) if not isinstance(request["outdirMin"], str) else request["outdirMin"], } def validate_hints( self, avsc_names: Names, hints: List[CWLObjectType], strict: bool ) -> None: for i, r in enumerate(hints): sl = SourceLine(hints, i, ValidationException) with sl: if ( avsc_names.get_name(cast(str, r["class"]), None) is not None and self.doc_loader is not None ): plain_hint = { key: r[key] for key in r if key not in self.doc_loader.identifiers } # strip identifiers validate_ex( cast( Schema, avsc_names.get_name(cast(str, plain_hint["class"]), None), ), plain_hint, strict=strict, ) elif r["class"] in ("NetworkAccess", "LoadListingRequirement"): pass else: _logger.info(str(sl.makeError("Unknown hint %s" % (r["class"])))) def visit(self, op: Callable[[CommentedMap], None]) -> None: op(self.tool) @abc.abstractmethod def job( self, job_order: CWLObjectType, output_callbacks: Optional[OutputCallbackType], runtimeContext: RuntimeContext, ) -> JobsGeneratorType: pass _names = set() # type: Set[str] def uniquename(stem: str, names: Optional[Set[str]] = None) -> str: global _names if names is None: names = _names c = 1 u = stem while u in names: c += 1 u = f"{stem}_{c}" names.add(u) return u def nestdir(base: str, deps: CWLObjectType) -> CWLObjectType: dirname = os.path.dirname(base) + "/" subid = cast(str, deps["location"]) if subid.startswith(dirname): s2 = subid[len(dirname) :] sp = s2.split("/") sp.pop() while sp: nx = sp.pop() deps = {"class": "Directory", "basename": nx, "listing": [deps]} return deps def mergedirs(listing: List[CWLObjectType]) -> List[CWLObjectType]: r = [] # type: List[CWLObjectType] ents = {} # type: Dict[str, CWLObjectType] collided = set() # type: Set[str] for e in listing: basename = cast(str, e["basename"]) if basename not in ents: ents[basename] = e elif e["class"] == "Directory": if e.get("listing"): cast( List[CWLObjectType], ents[basename].setdefault("listing", []) ).extend(cast(List[CWLObjectType], e["listing"])) if cast(str, ents[basename]["location"]).startswith("_:"): ents[basename]["location"] = e["location"] elif e["location"] != ents[basename]["location"]: # same basename, different location, collision, # rename both. collided.add(basename) e2 = ents[basename] e["basename"] = urllib.parse.quote(cast(str, e["location"]), safe="") e2["basename"] = urllib.parse.quote(cast(str, e2["location"]), safe="") e["nameroot"], e["nameext"] = os.path.splitext(cast(str, e["basename"])) e2["nameroot"], e2["nameext"] = os.path.splitext(cast(str, e2["basename"])) ents[cast(str, e["basename"])] = e ents[cast(str, e2["basename"])] = e2 for c in collided: del ents[c] for e in ents.values(): if e["class"] == "Directory" and "listing" in e: e["listing"] = cast( MutableSequence[CWLOutputAtomType], mergedirs(cast(List[CWLObjectType], e["listing"])), ) r.extend(ents.values()) return r CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl" def scandeps( base: str, doc: Union[CWLObjectType, MutableSequence[CWLObjectType]], reffields: Set[str], urlfields: Set[str], loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]], urljoin: Callable[[str, str], str] = urllib.parse.urljoin, nestdirs: bool = True, ) -> MutableSequence[CWLObjectType]: r = [] # type: MutableSequence[CWLObjectType] if isinstance(doc, MutableMapping): if "id" in doc: if cast(str, doc["id"]).startswith("file://"): df, _ = urllib.parse.urldefrag(cast(str, doc["id"])) if base != df: r.append({"class": "File", "location": df, "format": CWL_IANA}) base = df if doc.get("class") in ("File", "Directory") and "location" in urlfields: u = cast(Optional[str], doc.get("location", doc.get("path"))) if u and not u.startswith("_:"): deps = { "class": doc["class"], "location": urljoin(base, u), } # type: CWLObjectType if "basename" in doc: deps["basename"] = doc["basename"] if doc["class"] == "Directory" and "listing" in doc: deps["listing"] = doc["listing"] if doc["class"] == "File" and "secondaryFiles" in doc: deps["secondaryFiles"] = cast( CWLOutputAtomType, scandeps( base, cast( Union[CWLObjectType, MutableSequence[CWLObjectType]], doc["secondaryFiles"], ), reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs, ), ) if nestdirs: deps = nestdir(base, deps) r.append(deps) else: if doc["class"] == "Directory" and "listing" in doc: r.extend( scandeps( base, cast(MutableSequence[CWLObjectType], doc["listing"]), reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs, ) ) elif doc["class"] == "File" and "secondaryFiles" in doc: r.extend( scandeps( base, cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]), reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs, ) ) for k, v in doc.items(): if k in reffields: for u2 in aslist(v): if isinstance(u2, MutableMapping): r.extend( scandeps( base, u2, reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs, ) ) else: subid = urljoin(base, u2) basedf, _ = urllib.parse.urldefrag(base) subiddf, _ = urllib.parse.urldefrag(subid) if basedf == subiddf: continue sub = cast( Union[MutableSequence[CWLObjectType], CWLObjectType], loadref(base, u2), ) deps2 = { "class": "File", "location": subid, "format": CWL_IANA, } # type: CWLObjectType sf = scandeps( subid, sub, reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs, ) if sf: deps2["secondaryFiles"] = cast( MutableSequence[CWLOutputAtomType], sf ) if nestdirs: deps2 = nestdir(base, deps2) r.append(deps2) elif k in urlfields and k != "location": for u3 in aslist(v): deps = {"class": "File", "location": urljoin(base, u3)} if nestdirs: deps = nestdir(base, deps) r.append(deps) elif doc.get("class") in ("File", "Directory") and k in ( "listing", "secondaryFiles", ): # should be handled earlier. pass else: r.extend( scandeps( base, cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v), reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs, ) ) elif isinstance(doc, MutableSequence): for d in doc: r.extend( scandeps( base, d, reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs, ) ) if r: normalizeFilesDirs(r) r = mergedirs(cast(List[CWLObjectType], r)) return r def compute_checksums(fs_access: StdFsAccess, fileobj: CWLObjectType) -> None: if "checksum" not in fileobj: checksum = hashlib.sha1() # nosec location = cast(str, fileobj["location"]) with fs_access.open(location, "rb") as f: contents = f.read(1024 * 1024) while contents != b"": checksum.update(contents) contents = f.read(1024 * 1024) fileobj["checksum"] = "sha1$%s" % checksum.hexdigest() fileobj["size"] = fs_access.size(location)