Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/process.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/process.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1067 +0,0 @@ -from __future__ import absolute_import - -import abc -import copy -import errno -import functools -import hashlib -import json -import logging -import os -import shutil -import stat -import tempfile -import textwrap -import uuid - -from io import open -from typing import (Any, Callable, Dict, Generator, Iterator, List, - Mapping, MutableMapping, MutableSequence, Optional, Set, Tuple, - Type, Union, cast) - -from pkg_resources import resource_stream -from rdflib import Graph # pylint: disable=unused-import -from ruamel.yaml.comments import CommentedMap, CommentedSeq -from six import PY3, iteritems, itervalues, string_types, with_metaclass -from six.moves import urllib -from future.utils import raise_from -from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import - Text) -from schema_salad import schema, validate -from schema_salad.ref_resolver import Loader, file_uri, uri_file_path -from schema_salad.sourceline import SourceLine, strip_dup_lineno - -from . import expression -from .builder import Builder, HasReqsHints -from .context import LoadingContext # pylint: disable=unused-import -from .context import RuntimeContext, getdefault -from .errors import UnsupportedRequirement, WorkflowException -from .loghandler import _logger -from .mutation import MutationManager # pylint: disable=unused-import -from .pathmapper import (PathMapper, adjustDirObjs, ensure_writable, - get_listing, normalizeFilesDirs, visit_class, - MapperEnt) -from .secrets import SecretStore # pylint: disable=unused-import -from .software_requirements import ( # pylint: disable=unused-import - DependenciesConfiguration) -from .stdfsaccess import StdFsAccess -from .utils import (DEFAULT_TMP_PREFIX, aslist, cmp_like_py2, - copytree_with_merge, onWindows, random_outdir) -from .validate_js import validate_js_expressions -from .update import INTERNAL_VERSION - -try: - from os import scandir # type: ignore -except ImportError: - from scandir import scandir # type: ignore -if TYPE_CHECKING: - from .provenance import ProvenanceProfile # pylint: disable=unused-import - -if PY3: - from collections.abc import Iterable # only works on python 3.3+ -else: - from collections import Iterable # pylint: disable=unused-import - -class LogAsDebugFilter(logging.Filter): - def __init__(self, name, parent): # type: (Text, logging.Logger) -> None - """Initialize.""" - name = str(name) - super(LogAsDebugFilter, self).__init__(name) - self.parent = parent - - def filter(self, record): # type: (logging.LogRecord) -> bool - return self.parent.isEnabledFor(logging.DEBUG) - - -_logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") -_logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) -_logger_validation_warnings.addFilter(LogAsDebugFilter("cwltool.validation_warnings", _logger)) - -supportedProcessRequirements = ["DockerRequirement", - "SchemaDefRequirement", - "EnvVarRequirement", - "ScatterFeatureRequirement", - "SubworkflowFeatureRequirement", - "MultipleInputFeatureRequirement", - "InlineJavascriptRequirement", - "ShellCommandRequirement", - "StepInputExpressionRequirement", - "ResourceRequirement", - "InitialWorkDirRequirement", - "ToolTimeLimit", - "WorkReuse", - "NetworkAccess", - "InplaceUpdateRequirement", - "LoadListingRequirement", - "http://commonwl.org/cwltool#TimeLimit", - "http://commonwl.org/cwltool#WorkReuse", - "http://commonwl.org/cwltool#NetworkAccess", - "http://commonwl.org/cwltool#LoadListingRequirement", - "http://commonwl.org/cwltool#InplaceUpdateRequirement"] - -cwl_files = ( - "Workflow.yml", - "CommandLineTool.yml", - "CommonWorkflowLanguage.yml", - "Process.yml", - "concepts.md", - "contrib.md", - "intro.md", - "invocation.md") - -salad_files = ('metaschema.yml', - 'metaschema_base.yml', - 'salad.md', - 'field_name.yml', - 'import_include.md', - 'link_res.yml', - 'ident_res.yml', - 'vocab_res.yml', - 'vocab_res.yml', - 'field_name_schema.yml', - 'field_name_src.yml', - 'field_name_proc.yml', - 'ident_res_schema.yml', - 'ident_res_src.yml', - 'ident_res_proc.yml', - 'link_res_schema.yml', - 'link_res_src.yml', - 'link_res_proc.yml', - 'vocab_res_schema.yml', - 'vocab_res_src.yml', - 'vocab_res_proc.yml') - -SCHEMA_CACHE = {} # type: Dict[Text, Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text, Any], Loader]] -SCHEMA_FILE = None # type: Optional[Dict[Text, Any]] -SCHEMA_DIR = None # type: Optional[Dict[Text, Any]] -SCHEMA_ANY = None # type: Optional[Dict[Text, Any]] - -custom_schemas = {} # type: Dict[Text, Tuple[Text, Text]] - -def use_standard_schema(version): - # type: (Text) -> None - if version in custom_schemas: - del custom_schemas[version] - if version in SCHEMA_CACHE: - del SCHEMA_CACHE[version] - -def use_custom_schema(version, name, text): - # type: (Text, Text, Union[Text, bytes]) -> None - if isinstance(text, bytes): - text2 = text.decode() - else: - text2 = text - custom_schemas[version] = (name, text2) - if version in SCHEMA_CACHE: - del SCHEMA_CACHE[version] - -def get_schema(version): - # type: (Text) -> Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text,Any], Loader] - - if version in SCHEMA_CACHE: - return SCHEMA_CACHE[version] - - cache = {} # type: Dict[Text, Any] - version = version.split("#")[-1] - if '.dev' in version: - version = ".".join(version.split(".")[:-1]) - for f in cwl_files: - try: - res = resource_stream(__name__, 'schemas/%s/%s' % (version, f)) - cache["https://w3id.org/cwl/" + f] = res.read() - res.close() - except IOError: - pass - - for f in salad_files: - try: - res = resource_stream( - __name__, 'schemas/{}/salad/schema_salad/metaschema/{}'.format( - version, f)) - cache["https://w3id.org/cwl/salad/schema_salad/metaschema/" - + f] = res.read() - res.close() - except IOError: - pass - - if version in custom_schemas: - cache[custom_schemas[version][0]] = custom_schemas[version][1] - SCHEMA_CACHE[version] = schema.load_schema( - custom_schemas[version][0], cache=cache) - else: - SCHEMA_CACHE[version] = schema.load_schema( - "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache) - - return SCHEMA_CACHE[version] - - -def shortname(inputid): - # type: (Text) -> Text - d = urllib.parse.urlparse(inputid) - if d.fragment: - return d.fragment.split(u"/")[-1] - return d.path.split(u"/")[-1] - - -def checkRequirements(rec, supported_process_requirements): - # type: (Any, Iterable[Any]) -> None - if isinstance(rec, MutableMapping): - if "requirements" in rec: - for i, entry in enumerate(rec["requirements"]): - with SourceLine(rec["requirements"], i, UnsupportedRequirement): - if entry["class"] not in supported_process_requirements: - raise UnsupportedRequirement( - u"Unsupported requirement {}".format(entry["class"])) - for key in rec: - checkRequirements(rec[key], supported_process_requirements) - if isinstance(rec, MutableSequence): - for entry in rec: - checkRequirements(entry, supported_process_requirements) - - -def stage_files(pathmapper, # type: PathMapper - stage_func=None, # type: Optional[Callable[..., Any]] - ignore_writable=False, # type: bool - symlink=True, # type: bool - secret_store=None, # type: Optional[SecretStore] - fix_conflicts=False # type: bool - ): # type: (...) -> None - """Link or copy files to their targets. Create them as needed.""" - - targets = {} # type: Dict[Text, MapperEnt] - for key, entry in pathmapper.items(): - if not 'File' in entry.type: - continue - if entry.target not in targets: - targets[entry.target] = entry - elif targets[entry.target].resolved != entry.resolved: - if fix_conflicts: - tgt = entry.target - i = 2 - tgt = "%s_%s" % (tgt, i) - while tgt in targets: - i += 1 - tgt = "%s_%s" % (tgt, i) - targets[tgt] = pathmapper.update(key, entry.resolved, tgt, entry.type, entry.staged) - else: - raise WorkflowException("File staging conflict, trying to stage both %s and %s to the same target %s" % ( - targets[entry.target].resolved, entry.resolved, entry.target)) - - for key, entry in pathmapper.items(): - if not entry.staged: - continue - if not os.path.exists(os.path.dirname(entry.target)): - os.makedirs(os.path.dirname(entry.target)) - if entry.type in ("File", "Directory") and os.path.exists(entry.resolved): - if symlink: # Use symlink func if allowed - if onWindows(): - if entry.type == "File": - shutil.copy(entry.resolved, entry.target) - elif entry.type == "Directory": - if os.path.exists(entry.target) \ - and os.path.isdir(entry.target): - shutil.rmtree(entry.target) - copytree_with_merge(entry.resolved, entry.target) - else: - os.symlink(entry.resolved, entry.target) - elif stage_func is not None: - stage_func(entry.resolved, entry.target) - elif entry.type == "Directory" and not os.path.exists(entry.target) \ - and entry.resolved.startswith("_:"): - os.makedirs(entry.target) - elif entry.type == "WritableFile" and not ignore_writable: - shutil.copy(entry.resolved, entry.target) - ensure_writable(entry.target) - elif entry.type == "WritableDirectory" and not ignore_writable: - if entry.resolved.startswith("_:"): - os.makedirs(entry.target) - else: - shutil.copytree(entry.resolved, entry.target) - ensure_writable(entry.target) - elif entry.type == "CreateFile" or entry.type == "CreateWritableFile": - with open(entry.target, "wb") as new: - if secret_store is not None: - new.write( - secret_store.retrieve(entry.resolved).encode("utf-8")) - else: - new.write(entry.resolved.encode("utf-8")) - if entry.type == "CreateFile": - os.chmod(entry.target, stat.S_IRUSR) # Read only - else: # it is a "CreateWritableFile" - ensure_writable(entry.target) - pathmapper.update( - key, entry.target, entry.target, entry.type, entry.staged) - - -def relocateOutputs(outputObj, # type: Union[Dict[Text, Any], List[Dict[Text, Any]]] - destination_path, # type: Text - source_directories, # type: Set[Text] - action, # type: Text - fs_access, # type: StdFsAccess - compute_checksum=True, # type: bool - path_mapper=PathMapper # type: Type[PathMapper] - ): - # type: (...) -> Union[Dict[Text, Any], List[Dict[Text, Any]]] - adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True)) - - if action not in ("move", "copy"): - return outputObj - - def _collectDirEntries(obj): - # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> Iterator[Dict[Text, Any]] - if isinstance(obj, dict): - if obj.get("class") in ("File", "Directory"): - yield obj - else: - for sub_obj in obj.values(): - for dir_entry in _collectDirEntries(sub_obj): - yield dir_entry - elif isinstance(obj, MutableSequence): - for sub_obj in obj: - for dir_entry in _collectDirEntries(sub_obj): - yield dir_entry - - def _relocate(src, dst): # type: (Text, Text) -> None - if src == dst: - return - - # If the source is not contained in source_directories we're not allowed to delete it - src = fs_access.realpath(src) - src_can_deleted = any(os.path.commonprefix([p, src]) == p for p in source_directories) - - _action = "move" if action == "move" and src_can_deleted else "copy" - - if _action == "move": - _logger.debug("Moving %s to %s", src, dst) - if fs_access.isdir(src) and fs_access.isdir(dst): - # merge directories - for dir_entry in scandir(src): - _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name)) - else: - shutil.move(src, dst) - - elif _action == "copy": - _logger.debug("Copying %s to %s", src, dst) - if fs_access.isdir(src): - if os.path.isdir(dst): - shutil.rmtree(dst) - elif os.path.isfile(dst): - os.unlink(dst) - shutil.copytree(src, dst) - else: - shutil.copy2(src, dst) - - def _realpath(ob): # type: (Dict[Text, Any]) -> None - if ob["location"].startswith("file:"): - ob["location"] = file_uri(os.path.realpath(uri_file_path(ob["location"]))) - if ob["location"].startswith("/"): - ob["location"] = os.path.realpath(ob["location"]) - - outfiles = list(_collectDirEntries(outputObj)) - visit_class(outfiles, ("File", "Directory"), _realpath) - pm = path_mapper(outfiles, "", destination_path, separateDirs=False) - stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True) - - def _check_adjust(a_file): # type: (Dict[Text, Text]) -> Dict[Text, Text] - a_file["location"] = file_uri(pm.mapper(a_file["location"])[1]) - if "contents" in a_file: - del a_file["contents"] - return a_file - - visit_class(outputObj, ("File", "Directory"), _check_adjust) - - if compute_checksum: - visit_class(outputObj, ("File",), functools.partial( - compute_checksums, fs_access)) - return outputObj - - -def cleanIntermediate(output_dirs): # type: (Iterable[Text]) -> None - for a in output_dirs: - if os.path.exists(a): - _logger.debug(u"Removing intermediate output directory %s", a) - shutil.rmtree(a, True) - -def add_sizes(fsaccess, obj): # type: (StdFsAccess, Dict[Text, Any]) -> None - if 'location' in obj: - try: - if "size" not in obj: - obj["size"] = fsaccess.size(obj["location"]) - except OSError: - pass - elif 'contents' in obj: - obj["size"] = len(obj['contents']) - else: - return # best effort - -def fill_in_defaults(inputs, # type: List[Dict[Text, Text]] - job, # type: Dict[Text, expression.JSON] - fsaccess # type: StdFsAccess - ): # type: (...) -> None - for e, inp in enumerate(inputs): - with SourceLine(inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)): - fieldname = shortname(inp[u"id"]) - if job.get(fieldname) is not None: - pass - elif job.get(fieldname) is None and u"default" in inp: - job[fieldname] = copy.deepcopy(inp[u"default"]) - elif job.get(fieldname) is None and u"null" in aslist(inp[u"type"]): - job[fieldname] = None - else: - raise WorkflowException("Missing required input parameter '%s'" % shortname(inp["id"])) - - -def avroize_type(field_type, name_prefix=""): - # type: (Union[List[Dict[Text, Any]], Dict[Text, Any]], Text) -> Any - """Add missing information to a type so that CWL types are valid.""" - if isinstance(field_type, MutableSequence): - for field in field_type: - avroize_type(field, name_prefix) - elif isinstance(field_type, MutableMapping): - if field_type["type"] in ("enum", "record"): - if "name" not in field_type: - field_type["name"] = name_prefix + Text(uuid.uuid4()) - if field_type["type"] == "record": - avroize_type(field_type["fields"], name_prefix) - if field_type["type"] == "array": - avroize_type(field_type["items"], name_prefix) - if isinstance(field_type["type"], MutableSequence): - for ctype in field_type["type"]: - avroize_type(ctype, name_prefix) - return field_type - -def get_overrides(overrides, toolid): # type: (List[Dict[Text, Any]], Text) -> Dict[Text, Any] - req = {} # type: Dict[Text, Any] - if not isinstance(overrides, MutableSequence): - raise validate.ValidationException("Expected overrides to be a list, but was %s" % type(overrides)) - for ov in overrides: - if ov["overrideTarget"] == toolid: - req.update(ov) - return req - - -_VAR_SPOOL_ERROR = textwrap.dedent( - """ - Non-portable reference to /var/spool/cwl detected: '{}'. - To fix, replace /var/spool/cwl with $(runtime.outdir) or add - DockerRequirement to the 'requirements' section and declare - 'dockerOutputDirectory: /var/spool/cwl'. - """) - - -def var_spool_cwl_detector(obj, # type: Union[MutableMapping[Text, Text], List[Dict[Text, Any]], Text] - item=None, # type: Optional[Any] - obj_key=None, # type: Optional[Any] - ): # type: (...)->bool - """Detect any textual reference to /var/spool/cwl.""" - r = False - if isinstance(obj, string_types): - if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory": - _logger.warning( - SourceLine(item=item, key=obj_key, raise_type=Text).makeError( - _VAR_SPOOL_ERROR.format(obj))) - r = True - elif isinstance(obj, MutableMapping): - for mkey, mvalue in iteritems(obj): - r = var_spool_cwl_detector(mvalue, obj, mkey) or r - elif isinstance(obj, MutableSequence): - for lkey, lvalue in enumerate(obj): - r = var_spool_cwl_detector(lvalue, obj, lkey) or r - return r - -def eval_resource(builder, resource_req): # type: (Builder, Text) -> Any - if expression.needs_parsing(resource_req): - return builder.do_eval(resource_req) - return resource_req - - -# Threshold where the "too many files" warning kicks in -FILE_COUNT_WARNING = 5000 - -class Process(with_metaclass(abc.ABCMeta, HasReqsHints)): - def __init__(self, - toolpath_object, # type: MutableMapping[Text, Any] - loadingContext # type: LoadingContext - ): # type: (...) -> None - """Build a Process object from the provided dictionary.""" - self.metadata = getdefault(loadingContext.metadata, {}) # type: Dict[Text,Any] - self.provenance_object = None # type: Optional[ProvenanceProfile] - self.parent_wf = None # type: Optional[ProvenanceProfile] - global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement - if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None: - get_schema("v1.0") - SCHEMA_ANY = cast(Dict[Text, Any], - SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"]) - SCHEMA_FILE = cast(Dict[Text, Any], - SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"]) - SCHEMA_DIR = cast(Dict[Text, Any], - SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"]) - - self.names = schema.make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], - Loader({})) - self.tool = toolpath_object - self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, [])) - self.requirements.extend(self.tool.get("requirements", [])) - if "id" not in self.tool: - self.tool["id"] = "_:" + Text(uuid.uuid4()) - self.requirements.extend(get_overrides(getdefault(loadingContext.overrides_list, []), - self.tool["id"]).get("requirements", [])) - self.hints = copy.deepcopy(getdefault(loadingContext.hints, [])) - self.hints.extend(self.tool.get("hints", [])) - # Versions of requirements and hints which aren't mutated. - self.original_requirements = copy.deepcopy(self.requirements) - self.original_hints = copy.deepcopy(self.hints) - self.doc_loader = loadingContext.loader - self.doc_schema = loadingContext.avsc_names - - self.formatgraph = None # type: Optional[Graph] - if self.doc_loader is not None: - self.formatgraph = self.doc_loader.graph - - checkRequirements(self.tool, supportedProcessRequirements) - self.validate_hints(loadingContext.avsc_names, self.tool.get("hints", []), - strict=getdefault(loadingContext.strict, False)) - - self.schemaDefs = {} # type: Dict[Text,Dict[Text, Any]] - - sd, _ = self.get_requirement("SchemaDefRequirement") - - if sd is not None: - sdtypes = avroize_type(sd["types"]) - av = schema.make_valid_avro(sdtypes, {t["name"]: t for t in sdtypes}, set()) - for i in av: - self.schemaDefs[i["name"]] = i # type: ignore - schema.make_avsc_object(schema.convert_to_dict(av), self.names) - - # Build record schema from inputs - self.inputs_record_schema = { - "name": "input_record_schema", "type": "record", - "fields": []} # type: Dict[Text, Any] - self.outputs_record_schema = { - "name": "outputs_record_schema", "type": "record", - "fields": []} # type: Dict[Text, Any] - - for key in ("inputs", "outputs"): - for i in self.tool[key]: - c = copy.deepcopy(i) - c["name"] = shortname(c["id"]) - del c["id"] - - if "type" not in c: - raise validate.ValidationException( - u"Missing 'type' in parameter '{}'".format(c["name"])) - - if "default" in c and "null" not in aslist(c["type"]): - nullable = ["null"] - nullable.extend(aslist(c["type"])) - c["type"] = nullable - else: - c["type"] = c["type"] - c["type"] = avroize_type(c["type"], c["name"]) - if key == "inputs": - self.inputs_record_schema["fields"].append(c) - elif key == "outputs": - self.outputs_record_schema["fields"].append(c) - - with SourceLine(toolpath_object, "inputs", validate.ValidationException): - self.inputs_record_schema = cast( - Dict[Text, Any], schema.make_valid_avro( - self.inputs_record_schema, {}, set())) - schema.make_avsc_object( - schema.convert_to_dict(self.inputs_record_schema), self.names) - with SourceLine(toolpath_object, "outputs", validate.ValidationException): - self.outputs_record_schema = cast( - Dict[Text, Any], - schema.make_valid_avro(self.outputs_record_schema, {}, set())) - schema.make_avsc_object( - schema.convert_to_dict(self.outputs_record_schema), self.names) - - if toolpath_object.get("class") is not None \ - and not getdefault(loadingContext.disable_js_validation, False): - if loadingContext.js_hint_options_file is not None: - try: - with open(loadingContext.js_hint_options_file) as options_file: - validate_js_options = json.load(options_file) - except (OSError, ValueError) as err: - _logger.error( - "Failed to read options file %s", - loadingContext.js_hint_options_file) - raise - else: - validate_js_options = None - if self.doc_schema is not None: - validate_js_expressions( - cast(CommentedMap, toolpath_object), - self.doc_schema.names[toolpath_object["class"]], - validate_js_options) - - dockerReq, is_req = self.get_requirement("DockerRequirement") - - if dockerReq is not None and "dockerOutputDirectory" in dockerReq\ - and is_req is not None and not is_req: - _logger.warning(SourceLine( - item=dockerReq, raise_type=Text).makeError( - "When 'dockerOutputDirectory' is declared, DockerRequirement " - "should go in the 'requirements' section, not 'hints'.""")) - - if dockerReq is not None and is_req is not None\ - and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl": - if is_req: - # In this specific case, it is legal to have /var/spool/cwl, so skip the check. - pass - else: - # Must be a requirement - var_spool_cwl_detector(self.tool) - else: - var_spool_cwl_detector(self.tool) - - def _init_job(self, joborder, runtime_context): - # type: (Mapping[Text, Text], RuntimeContext) -> Builder - - if self.metadata.get("cwlVersion") != INTERNAL_VERSION: - raise WorkflowException("Process object loaded with version '%s', must update to '%s' in order to execute." % ( - self.metadata.get("cwlVersion"), INTERNAL_VERSION)) - - job = cast(Dict[Text, expression.JSON], copy.deepcopy(joborder)) - - make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) - fs_access = make_fs_access(runtime_context.basedir) - - load_listing_req, _ = self.get_requirement( - "LoadListingRequirement") - - if load_listing_req is not None: - load_listing = load_listing_req.get("loadListing") - else: - load_listing = "no_listing" - - # Validate job order - try: - fill_in_defaults(self.tool[u"inputs"], job, fs_access) - - normalizeFilesDirs(job) - schema = self.names.get_name("input_record_schema", "") - if schema is None: - raise WorkflowException("Missing input record schema: " - "{}".format(self.names)) - validate.validate_ex(schema, job, strict=False, - logger=_logger_validation_warnings) - - if load_listing and load_listing != "no_listing": - get_listing(fs_access, job, recursive=(load_listing == "deep_listing")) - - visit_class(job, ("File",), functools.partial(add_sizes, fs_access)) - - if load_listing == "deep_listing": - for i, inparm in enumerate(self.tool["inputs"]): - k = shortname(inparm["id"]) - if k not in job: - continue - v = job[k] - dircount = [0] - - def inc(d): # type: (List[int]) -> None - d[0] += 1 - visit_class(v, ("Directory",), lambda x: inc(dircount)) - if dircount[0] == 0: - continue - filecount = [0] - visit_class(v, ("File",), lambda x: inc(filecount)) - if filecount[0] > FILE_COUNT_WARNING: - # Long lines in this message are okay, will be reflowed based on terminal columns. - _logger.warning(strip_dup_lineno(SourceLine(self.tool["inputs"], i, Text).makeError( - """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use. - -If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior: - -$namespaces: - cwltool: "http://commonwl.org/cwltool#" -hints: - cwltool:LoadListingRequirement: - loadListing: shallow_listing - -""" % (filecount[0], k)))) - - except (validate.ValidationException, WorkflowException) as err: - raise_from(WorkflowException("Invalid job input record:\n" + Text(err)), err) - - files = [] # type: List[Dict[Text, Text]] - bindings = CommentedSeq() - tmpdir = u"" - stagedir = u"" - - docker_req, _ = self.get_requirement("DockerRequirement") - default_docker = None - - if docker_req is None and runtime_context.default_container: - default_docker = runtime_context.default_container - - if (docker_req or default_docker) and runtime_context.use_container: - if docker_req is not None: - # Check if docker output directory is absolute - if docker_req.get("dockerOutputDirectory") and \ - docker_req.get("dockerOutputDirectory").startswith('/'): - outdir = docker_req.get("dockerOutputDirectory") - else: - outdir = docker_req.get("dockerOutputDirectory") or \ - runtime_context.docker_outdir or random_outdir() - elif default_docker is not None: - outdir = runtime_context.docker_outdir or random_outdir() - tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec - stagedir = runtime_context.docker_stagedir or "/var/lib/cwl" - else: - outdir = fs_access.realpath( - runtime_context.outdir or tempfile.mkdtemp( - prefix=getdefault(runtime_context.tmp_outdir_prefix, - DEFAULT_TMP_PREFIX))) - if self.tool[u"class"] != 'Workflow': - tmpdir = fs_access.realpath(runtime_context.tmpdir - or tempfile.mkdtemp()) - stagedir = fs_access.realpath(runtime_context.stagedir - or tempfile.mkdtemp()) - - builder = Builder(job, - files, - bindings, - self.schemaDefs, - self.names, - self.requirements, - self.hints, - {}, - runtime_context.mutation_manager, - self.formatgraph, - make_fs_access, - fs_access, - runtime_context.job_script_provider, - runtime_context.eval_timeout, - runtime_context.debug, - runtime_context.js_console, - runtime_context.force_docker_pull, - load_listing, - outdir, - tmpdir, - stagedir) - - bindings.extend(builder.bind_input( - self.inputs_record_schema, job, - discover_secondaryFiles=getdefault(runtime_context.toplevel, False))) - - if self.tool.get("baseCommand"): - for index, command in enumerate(aslist(self.tool["baseCommand"])): - bindings.append({ - "position": [-1000000, index], - "datum": command - }) - - if self.tool.get("arguments"): - for i, arg in enumerate(self.tool["arguments"]): - lc = self.tool["arguments"].lc.data[i] - filename = self.tool["arguments"].lc.filename - bindings.lc.add_kv_line_col(len(bindings), lc) - if isinstance(arg, MutableMapping): - arg = copy.deepcopy(arg) - if arg.get("position"): - position = arg.get("position") - if isinstance(position, str): # no need to test the - # CWLVersion as the v1.0 - # schema only allows ints - position = builder.do_eval(position) - if position is None: - position = 0 - arg["position"] = [position, i] - else: - arg["position"] = [0, i] - bindings.append(arg) - elif ("$(" in arg) or ("${" in arg): - cm = CommentedMap(( - ("position", [0, i]), - ("valueFrom", arg) - )) - cm.lc.add_kv_line_col("valueFrom", lc) - cm.lc.filename = filename - bindings.append(cm) - else: - cm = CommentedMap(( - ("position", [0, i]), - ("datum", arg) - )) - cm.lc.add_kv_line_col("datum", lc) - cm.lc.filename = filename - bindings.append(cm) - - # use python2 like sorting of heterogeneous lists - # (containing str and int types), - if PY3: - key = functools.cmp_to_key(cmp_like_py2) - else: # PY2 - key = lambda d: d["position"] - - # This awkward construction replaces the contents of - # "bindings" in place (because Builder expects it to be - # mutated in place, sigh, I'm sorry) with its contents sorted, - # supporting different versions of Python and ruamel.yaml with - # different behaviors/bugs in CommentedSeq. - bindings_copy = copy.deepcopy(bindings) - del bindings[:] - bindings.extend(sorted(bindings_copy, key=key)) - - if self.tool[u"class"] != 'Workflow': - builder.resources = self.evalResources(builder, runtime_context) - return builder - - def evalResources(self, builder, runtimeContext): - # type: (Builder, RuntimeContext) -> Dict[str, int] - resourceReq, _ = self.get_requirement("ResourceRequirement") - if resourceReq is None: - resourceReq = {} - cwl_version = self.metadata.get( - "http://commonwl.org/cwltool#original_cwlVersion", None) - if cwl_version == "v1.0": - ram = 1024 - else: - ram = 256 - request = { - "coresMin": 1, - "coresMax": 1, - "ramMin": ram, - "ramMax": ram, - "tmpdirMin": 1024, - "tmpdirMax": 1024, - "outdirMin": 1024, - "outdirMax": 1024 - } # type: Dict[str, int] - for a in ("cores", "ram", "tmpdir", "outdir"): - mn = None - mx = None - if resourceReq.get(a + "Min"): - mn = eval_resource(builder, resourceReq[a + "Min"]) - if resourceReq.get(a + "Max"): - mx = eval_resource(builder, resourceReq[a + "Max"]) - if mn is None: - mn = mx - elif mx is None: - mx = mn - - if mn is not None: - request[a + "Min"] = cast(int, mn) - request[a + "Max"] = cast(int, mx) - - if runtimeContext.select_resources is not None: - return runtimeContext.select_resources(request, runtimeContext) - return { - "cores": request["coresMin"], - "ram": request["ramMin"], - "tmpdirSize": request["tmpdirMin"], - "outdirSize": request["outdirMin"], - } - - def validate_hints(self, avsc_names, hints, strict): - # type: (Any, List[Dict[Text, Any]], bool) -> None - for i, r in enumerate(hints): - sl = SourceLine(hints, i, validate.ValidationException) - with sl: - if avsc_names.get_name(r["class"], "") is not None and self.doc_loader is not None: - plain_hint = dict((key, r[key]) for key in r if key not in - self.doc_loader.identifiers) # strip identifiers - validate.validate_ex( - avsc_names.get_name(plain_hint["class"], ""), - plain_hint, strict=strict) - elif r["class"] in ("NetworkAccess", "LoadListingRequirement"): - pass - else: - _logger.info(Text(sl.makeError(u"Unknown hint %s" % (r["class"])))) - - def visit(self, op): # type: (Callable[[MutableMapping[Text, Any]], None]) -> None - op(self.tool) - - @abc.abstractmethod - def job(self, - job_order, # type: Mapping[Text, Text] - output_callbacks, # type: Callable[[Any, Any], Any] - runtimeContext # type: RuntimeContext - ): # type: (...) -> Generator[Any, None, None] - # FIXME: Declare base type for what Generator yields - pass - - -_names = set() # type: Set[Text] - - -def uniquename(stem, names=None): # type: (Text, Optional[Set[Text]]) -> Text - global _names - if names is None: - names = _names - c = 1 - u = stem - while u in names: - c += 1 - u = u"%s_%s" % (stem, c) - names.add(u) - return u - - -def nestdir(base, deps): - # type: (Text, Dict[Text, Any]) -> Dict[Text, Any] - dirname = os.path.dirname(base) + "/" - subid = deps["location"] - if subid.startswith(dirname): - s2 = subid[len(dirname):] - sp = s2.split('/') - sp.pop() - while sp: - nx = sp.pop() - deps = { - "class": "Directory", - "basename": nx, - "listing": [deps] - } - return deps - - -def mergedirs(listing): - # type: (List[Dict[Text, Any]]) -> List[Dict[Text, Any]] - r = [] # type: List[Dict[Text, Any]] - ents = {} # type: Dict[Text, Any] - collided = set() # type: Set[Text] - for e in listing: - if e["basename"] not in ents: - ents[e["basename"]] = e - elif e["class"] == "Directory": - if e.get("listing"): - ents[e["basename"]].setdefault("listing", []).extend(e["listing"]) - if ents[e["basename"]]["location"].startswith("_:"): - ents[e["basename"]]["location"] = e["location"] - elif e["location"] != ents[e["basename"]]["location"]: - # same basename, different location, collision, - # rename both. - collided.add(e["basename"]) - e2 = ents[e["basename"]] - - e["basename"] = urllib.parse.quote(e["location"], safe="") - e2["basename"] = urllib.parse.quote(e2["location"], safe="") - - e["nameroot"], e["nameext"] = os.path.splitext(e["basename"]) - e2["nameroot"], e2["nameext"] = os.path.splitext(e2["basename"]) - - ents[e["basename"]] = e - ents[e2["basename"]] = e2 - for c in collided: - del ents[c] - for e in itervalues(ents): - if e["class"] == "Directory" and "listing" in e: - e["listing"] = mergedirs(e["listing"]) - r.extend(itervalues(ents)) - return r - - -CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl" - -def scandeps(base, # type: Text - doc, # type: Any - reffields, # type: Set[Text] - urlfields, # type: Set[Text] - loadref, # type: Callable[[Text, Text], Text] - urljoin=urllib.parse.urljoin, # type: Callable[[Text, Text], Text] - nestdirs=True # type: bool - ): # type: (...) -> List[Dict[Text, Text]] - r = [] # type: List[Dict[Text, Text]] - if isinstance(doc, MutableMapping): - if "id" in doc: - if doc["id"].startswith("file://"): - df, _ = urllib.parse.urldefrag(doc["id"]) - if base != df: - r.append({ - "class": "File", - "location": df, - "format": CWL_IANA - }) - base = df - - if doc.get("class") in ("File", "Directory") and "location" in urlfields: - u = doc.get("location", doc.get("path")) - if u and not u.startswith("_:"): - deps = {"class": doc["class"], - "location": urljoin(base, u) - } # type: Dict[Text, Any] - if "basename" in doc: - deps["basename"] = doc["basename"] - if doc["class"] == "Directory" and "listing" in doc: - deps["listing"] = doc["listing"] - if doc["class"] == "File" and "secondaryFiles" in doc: - deps["secondaryFiles"] = doc["secondaryFiles"] - if nestdirs: - deps = nestdir(base, deps) - r.append(deps) - else: - if doc["class"] == "Directory" and "listing" in doc: - r.extend(scandeps( - base, doc["listing"], reffields, urlfields, loadref, - urljoin=urljoin, nestdirs=nestdirs)) - elif doc["class"] == "File" and "secondaryFiles" in doc: - r.extend(scandeps( - base, doc["secondaryFiles"], reffields, urlfields, - loadref, urljoin=urljoin, nestdirs=nestdirs)) - - for k, v in iteritems(doc): - if k in reffields: - for u in aslist(v): - if isinstance(u, MutableMapping): - r.extend(scandeps( - base, u, reffields, urlfields, loadref, - urljoin=urljoin, nestdirs=nestdirs)) - else: - subid = urljoin(base, u) - basedf, _ = urllib.parse.urldefrag(base) - subiddf, _ = urllib.parse.urldefrag(subid) - if basedf == subiddf: - continue - sub = loadref(base, u) - deps = { - "class": "File", - "location": subid, - "format": CWL_IANA - } - sf = scandeps( - subid, sub, reffields, urlfields, loadref, - urljoin=urljoin, nestdirs=nestdirs) - if sf: - deps["secondaryFiles"] = sf - if nestdirs: - deps = nestdir(base, deps) - r.append(deps) - elif k in urlfields and k != "location": - for u in aslist(v): - deps = { - "class": "File", - "location": urljoin(base, u) - } - if nestdirs: - deps = nestdir(base, deps) - r.append(deps) - elif k not in ("listing", "secondaryFiles"): - r.extend(scandeps( - base, v, reffields, urlfields, loadref, urljoin=urljoin, - nestdirs=nestdirs)) - elif isinstance(doc, MutableSequence): - for d in doc: - r.extend(scandeps( - base, d, reffields, urlfields, loadref, urljoin=urljoin, - nestdirs=nestdirs)) - - if r: - normalizeFilesDirs(r) - r = mergedirs(r) - - return r - - -def compute_checksums(fs_access, fileobj): # type: (StdFsAccess, Dict[Text, Any]) -> None - if "checksum" not in fileobj: - checksum = hashlib.sha1() # nosec - with fs_access.open(fileobj["location"], "rb") as f: - contents = f.read(1024 * 1024) - while contents != b"": - checksum.update(contents) - contents = f.read(1024 * 1024) - fileobj["checksum"] = "sha1$%s" % checksum.hexdigest() - fileobj["size"] = fs_access.size(fileobj["location"])