Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/process.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
from __future__ import absolute_import import abc import copy import errno import functools import hashlib import json import logging import os import shutil import stat import tempfile import textwrap import uuid from io import open from typing import (Any, Callable, Dict, Generator, Iterator, List, Mapping, MutableMapping, MutableSequence, Optional, Set, Tuple, Type, Union, cast) from pkg_resources import resource_stream from rdflib import Graph # pylint: disable=unused-import from ruamel.yaml.comments import CommentedMap, CommentedSeq from six import PY3, iteritems, itervalues, string_types, with_metaclass from six.moves import urllib from future.utils import raise_from from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import Text) from schema_salad import schema, validate from schema_salad.ref_resolver import Loader, file_uri, uri_file_path from schema_salad.sourceline import SourceLine, strip_dup_lineno from . import expression from .builder import Builder, HasReqsHints from .context import LoadingContext # pylint: disable=unused-import from .context import RuntimeContext, getdefault from .errors import UnsupportedRequirement, WorkflowException from .loghandler import _logger from .mutation import MutationManager # pylint: disable=unused-import from .pathmapper import (PathMapper, adjustDirObjs, ensure_writable, get_listing, normalizeFilesDirs, visit_class, MapperEnt) from .secrets import SecretStore # pylint: disable=unused-import from .software_requirements import ( # pylint: disable=unused-import DependenciesConfiguration) from .stdfsaccess import StdFsAccess from .utils import (DEFAULT_TMP_PREFIX, aslist, cmp_like_py2, copytree_with_merge, onWindows, random_outdir) from .validate_js import validate_js_expressions from .update import INTERNAL_VERSION try: from os import scandir # type: ignore except ImportError: from scandir import scandir # type: ignore if TYPE_CHECKING: from .provenance import ProvenanceProfile # pylint: disable=unused-import if PY3: from collections.abc import Iterable # only works on python 3.3+ else: from collections import Iterable # pylint: disable=unused-import class LogAsDebugFilter(logging.Filter): def __init__(self, name, parent): # type: (Text, logging.Logger) -> None """Initialize.""" name = str(name) super(LogAsDebugFilter, self).__init__(name) self.parent = parent def filter(self, record): # type: (logging.LogRecord) -> bool return self.parent.isEnabledFor(logging.DEBUG) _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") _logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) _logger_validation_warnings.addFilter(LogAsDebugFilter("cwltool.validation_warnings", _logger)) supportedProcessRequirements = ["DockerRequirement", "SchemaDefRequirement", "EnvVarRequirement", "ScatterFeatureRequirement", "SubworkflowFeatureRequirement", "MultipleInputFeatureRequirement", "InlineJavascriptRequirement", "ShellCommandRequirement", "StepInputExpressionRequirement", "ResourceRequirement", "InitialWorkDirRequirement", "ToolTimeLimit", "WorkReuse", "NetworkAccess", "InplaceUpdateRequirement", "LoadListingRequirement", "http://commonwl.org/cwltool#TimeLimit", "http://commonwl.org/cwltool#WorkReuse", "http://commonwl.org/cwltool#NetworkAccess", "http://commonwl.org/cwltool#LoadListingRequirement", "http://commonwl.org/cwltool#InplaceUpdateRequirement"] cwl_files = ( "Workflow.yml", "CommandLineTool.yml", "CommonWorkflowLanguage.yml", "Process.yml", "concepts.md", "contrib.md", "intro.md", "invocation.md") salad_files = ('metaschema.yml', 'metaschema_base.yml', 'salad.md', 'field_name.yml', 'import_include.md', 'link_res.yml', 'ident_res.yml', 'vocab_res.yml', 'vocab_res.yml', 'field_name_schema.yml', 'field_name_src.yml', 'field_name_proc.yml', 'ident_res_schema.yml', 'ident_res_src.yml', 'ident_res_proc.yml', 'link_res_schema.yml', 'link_res_src.yml', 'link_res_proc.yml', 'vocab_res_schema.yml', 'vocab_res_src.yml', 'vocab_res_proc.yml') SCHEMA_CACHE = {} # type: Dict[Text, Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text, Any], Loader]] SCHEMA_FILE = None # type: Optional[Dict[Text, Any]] SCHEMA_DIR = None # type: Optional[Dict[Text, Any]] SCHEMA_ANY = None # type: Optional[Dict[Text, Any]] custom_schemas = {} # type: Dict[Text, Tuple[Text, Text]] def use_standard_schema(version): # type: (Text) -> None if version in custom_schemas: del custom_schemas[version] if version in SCHEMA_CACHE: del SCHEMA_CACHE[version] def use_custom_schema(version, name, text): # type: (Text, Text, Union[Text, bytes]) -> None if isinstance(text, bytes): text2 = text.decode() else: text2 = text custom_schemas[version] = (name, text2) if version in SCHEMA_CACHE: del SCHEMA_CACHE[version] def get_schema(version): # type: (Text) -> Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text,Any], Loader] if version in SCHEMA_CACHE: return SCHEMA_CACHE[version] cache = {} # type: Dict[Text, Any] version = version.split("#")[-1] if '.dev' in version: version = ".".join(version.split(".")[:-1]) for f in cwl_files: try: res = resource_stream(__name__, 'schemas/%s/%s' % (version, f)) cache["https://w3id.org/cwl/" + f] = res.read() res.close() except IOError: pass for f in salad_files: try: res = resource_stream( __name__, 'schemas/{}/salad/schema_salad/metaschema/{}'.format( version, f)) cache["https://w3id.org/cwl/salad/schema_salad/metaschema/" + f] = res.read() res.close() except IOError: pass if version in custom_schemas: cache[custom_schemas[version][0]] = custom_schemas[version][1] SCHEMA_CACHE[version] = schema.load_schema( custom_schemas[version][0], cache=cache) else: SCHEMA_CACHE[version] = schema.load_schema( "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache) return SCHEMA_CACHE[version] def shortname(inputid): # type: (Text) -> Text d = urllib.parse.urlparse(inputid) if d.fragment: return d.fragment.split(u"/")[-1] return d.path.split(u"/")[-1] def checkRequirements(rec, supported_process_requirements): # type: (Any, Iterable[Any]) -> None if isinstance(rec, MutableMapping): if "requirements" in rec: for i, entry in enumerate(rec["requirements"]): with SourceLine(rec["requirements"], i, UnsupportedRequirement): if entry["class"] not in supported_process_requirements: raise UnsupportedRequirement( u"Unsupported requirement {}".format(entry["class"])) for key in rec: checkRequirements(rec[key], supported_process_requirements) if isinstance(rec, MutableSequence): for entry in rec: checkRequirements(entry, supported_process_requirements) def stage_files(pathmapper, # type: PathMapper stage_func=None, # type: Optional[Callable[..., Any]] ignore_writable=False, # type: bool symlink=True, # type: bool secret_store=None, # type: Optional[SecretStore] fix_conflicts=False # type: bool ): # type: (...) -> None """Link or copy files to their targets. Create them as needed.""" targets = {} # type: Dict[Text, MapperEnt] for key, entry in pathmapper.items(): if not 'File' in entry.type: continue if entry.target not in targets: targets[entry.target] = entry elif targets[entry.target].resolved != entry.resolved: if fix_conflicts: tgt = entry.target i = 2 tgt = "%s_%s" % (tgt, i) while tgt in targets: i += 1 tgt = "%s_%s" % (tgt, i) targets[tgt] = pathmapper.update(key, entry.resolved, tgt, entry.type, entry.staged) else: raise WorkflowException("File staging conflict, trying to stage both %s and %s to the same target %s" % ( targets[entry.target].resolved, entry.resolved, entry.target)) for key, entry in pathmapper.items(): if not entry.staged: continue if not os.path.exists(os.path.dirname(entry.target)): os.makedirs(os.path.dirname(entry.target)) if entry.type in ("File", "Directory") and os.path.exists(entry.resolved): if symlink: # Use symlink func if allowed if onWindows(): if entry.type == "File": shutil.copy(entry.resolved, entry.target) elif entry.type == "Directory": if os.path.exists(entry.target) \ and os.path.isdir(entry.target): shutil.rmtree(entry.target) copytree_with_merge(entry.resolved, entry.target) else: os.symlink(entry.resolved, entry.target) elif stage_func is not None: stage_func(entry.resolved, entry.target) elif entry.type == "Directory" and not os.path.exists(entry.target) \ and entry.resolved.startswith("_:"): os.makedirs(entry.target) elif entry.type == "WritableFile" and not ignore_writable: shutil.copy(entry.resolved, entry.target) ensure_writable(entry.target) elif entry.type == "WritableDirectory" and not ignore_writable: if entry.resolved.startswith("_:"): os.makedirs(entry.target) else: shutil.copytree(entry.resolved, entry.target) ensure_writable(entry.target) elif entry.type == "CreateFile" or entry.type == "CreateWritableFile": with open(entry.target, "wb") as new: if secret_store is not None: new.write( secret_store.retrieve(entry.resolved).encode("utf-8")) else: new.write(entry.resolved.encode("utf-8")) if entry.type == "CreateFile": os.chmod(entry.target, stat.S_IRUSR) # Read only else: # it is a "CreateWritableFile" ensure_writable(entry.target) pathmapper.update( key, entry.target, entry.target, entry.type, entry.staged) def relocateOutputs(outputObj, # type: Union[Dict[Text, Any], List[Dict[Text, Any]]] destination_path, # type: Text source_directories, # type: Set[Text] action, # type: Text fs_access, # type: StdFsAccess compute_checksum=True, # type: bool path_mapper=PathMapper # type: Type[PathMapper] ): # type: (...) -> Union[Dict[Text, Any], List[Dict[Text, Any]]] adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True)) if action not in ("move", "copy"): return outputObj def _collectDirEntries(obj): # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> Iterator[Dict[Text, Any]] if isinstance(obj, dict): if obj.get("class") in ("File", "Directory"): yield obj else: for sub_obj in obj.values(): for dir_entry in _collectDirEntries(sub_obj): yield dir_entry elif isinstance(obj, MutableSequence): for sub_obj in obj: for dir_entry in _collectDirEntries(sub_obj): yield dir_entry def _relocate(src, dst): # type: (Text, Text) -> None if src == dst: return # If the source is not contained in source_directories we're not allowed to delete it src = fs_access.realpath(src) src_can_deleted = any(os.path.commonprefix([p, src]) == p for p in source_directories) _action = "move" if action == "move" and src_can_deleted else "copy" if _action == "move": _logger.debug("Moving %s to %s", src, dst) if fs_access.isdir(src) and fs_access.isdir(dst): # merge directories for dir_entry in scandir(src): _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name)) else: shutil.move(src, dst) elif _action == "copy": _logger.debug("Copying %s to %s", src, dst) if fs_access.isdir(src): if os.path.isdir(dst): shutil.rmtree(dst) elif os.path.isfile(dst): os.unlink(dst) shutil.copytree(src, dst) else: shutil.copy2(src, dst) def _realpath(ob): # type: (Dict[Text, Any]) -> None if ob["location"].startswith("file:"): ob["location"] = file_uri(os.path.realpath(uri_file_path(ob["location"]))) if ob["location"].startswith("/"): ob["location"] = os.path.realpath(ob["location"]) outfiles = list(_collectDirEntries(outputObj)) visit_class(outfiles, ("File", "Directory"), _realpath) pm = path_mapper(outfiles, "", destination_path, separateDirs=False) stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True) def _check_adjust(a_file): # type: (Dict[Text, Text]) -> Dict[Text, Text] a_file["location"] = file_uri(pm.mapper(a_file["location"])[1]) if "contents" in a_file: del a_file["contents"] return a_file visit_class(outputObj, ("File", "Directory"), _check_adjust) if compute_checksum: visit_class(outputObj, ("File",), functools.partial( compute_checksums, fs_access)) return outputObj def cleanIntermediate(output_dirs): # type: (Iterable[Text]) -> None for a in output_dirs: if os.path.exists(a): _logger.debug(u"Removing intermediate output directory %s", a) shutil.rmtree(a, True) def add_sizes(fsaccess, obj): # type: (StdFsAccess, Dict[Text, Any]) -> None if 'location' in obj: try: if "size" not in obj: obj["size"] = fsaccess.size(obj["location"]) except OSError: pass elif 'contents' in obj: obj["size"] = len(obj['contents']) else: return # best effort def fill_in_defaults(inputs, # type: List[Dict[Text, Text]] job, # type: Dict[Text, expression.JSON] fsaccess # type: StdFsAccess ): # type: (...) -> None for e, inp in enumerate(inputs): with SourceLine(inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)): fieldname = shortname(inp[u"id"]) if job.get(fieldname) is not None: pass elif job.get(fieldname) is None and u"default" in inp: job[fieldname] = copy.deepcopy(inp[u"default"]) elif job.get(fieldname) is None and u"null" in aslist(inp[u"type"]): job[fieldname] = None else: raise WorkflowException("Missing required input parameter '%s'" % shortname(inp["id"])) def avroize_type(field_type, name_prefix=""): # type: (Union[List[Dict[Text, Any]], Dict[Text, Any]], Text) -> Any """Add missing information to a type so that CWL types are valid.""" if isinstance(field_type, MutableSequence): for field in field_type: avroize_type(field, name_prefix) elif isinstance(field_type, MutableMapping): if field_type["type"] in ("enum", "record"): if "name" not in field_type: field_type["name"] = name_prefix + Text(uuid.uuid4()) if field_type["type"] == "record": avroize_type(field_type["fields"], name_prefix) if field_type["type"] == "array": avroize_type(field_type["items"], name_prefix) if isinstance(field_type["type"], MutableSequence): for ctype in field_type["type"]: avroize_type(ctype, name_prefix) return field_type def get_overrides(overrides, toolid): # type: (List[Dict[Text, Any]], Text) -> Dict[Text, Any] req = {} # type: Dict[Text, Any] if not isinstance(overrides, MutableSequence): raise validate.ValidationException("Expected overrides to be a list, but was %s" % type(overrides)) for ov in overrides: if ov["overrideTarget"] == toolid: req.update(ov) return req _VAR_SPOOL_ERROR = textwrap.dedent( """ Non-portable reference to /var/spool/cwl detected: '{}'. To fix, replace /var/spool/cwl with $(runtime.outdir) or add DockerRequirement to the 'requirements' section and declare 'dockerOutputDirectory: /var/spool/cwl'. """) def var_spool_cwl_detector(obj, # type: Union[MutableMapping[Text, Text], List[Dict[Text, Any]], Text] item=None, # type: Optional[Any] obj_key=None, # type: Optional[Any] ): # type: (...)->bool """Detect any textual reference to /var/spool/cwl.""" r = False if isinstance(obj, string_types): if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory": _logger.warning( SourceLine(item=item, key=obj_key, raise_type=Text).makeError( _VAR_SPOOL_ERROR.format(obj))) r = True elif isinstance(obj, MutableMapping): for mkey, mvalue in iteritems(obj): r = var_spool_cwl_detector(mvalue, obj, mkey) or r elif isinstance(obj, MutableSequence): for lkey, lvalue in enumerate(obj): r = var_spool_cwl_detector(lvalue, obj, lkey) or r return r def eval_resource(builder, resource_req): # type: (Builder, Text) -> Any if expression.needs_parsing(resource_req): return builder.do_eval(resource_req) return resource_req # Threshold where the "too many files" warning kicks in FILE_COUNT_WARNING = 5000 class Process(with_metaclass(abc.ABCMeta, HasReqsHints)): def __init__(self, toolpath_object, # type: MutableMapping[Text, Any] loadingContext # type: LoadingContext ): # type: (...) -> None """Build a Process object from the provided dictionary.""" self.metadata = getdefault(loadingContext.metadata, {}) # type: Dict[Text,Any] self.provenance_object = None # type: Optional[ProvenanceProfile] self.parent_wf = None # type: Optional[ProvenanceProfile] global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None: get_schema("v1.0") SCHEMA_ANY = cast(Dict[Text, Any], SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"]) SCHEMA_FILE = cast(Dict[Text, Any], SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"]) SCHEMA_DIR = cast(Dict[Text, Any], SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"]) self.names = schema.make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], Loader({})) self.tool = toolpath_object self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, [])) self.requirements.extend(self.tool.get("requirements", [])) if "id" not in self.tool: self.tool["id"] = "_:" + Text(uuid.uuid4()) self.requirements.extend(get_overrides(getdefault(loadingContext.overrides_list, []), self.tool["id"]).get("requirements", [])) self.hints = copy.deepcopy(getdefault(loadingContext.hints, [])) self.hints.extend(self.tool.get("hints", [])) # Versions of requirements and hints which aren't mutated. self.original_requirements = copy.deepcopy(self.requirements) self.original_hints = copy.deepcopy(self.hints) self.doc_loader = loadingContext.loader self.doc_schema = loadingContext.avsc_names self.formatgraph = None # type: Optional[Graph] if self.doc_loader is not None: self.formatgraph = self.doc_loader.graph checkRequirements(self.tool, supportedProcessRequirements) self.validate_hints(loadingContext.avsc_names, self.tool.get("hints", []), strict=getdefault(loadingContext.strict, False)) self.schemaDefs = {} # type: Dict[Text,Dict[Text, Any]] sd, _ = self.get_requirement("SchemaDefRequirement") if sd is not None: sdtypes = avroize_type(sd["types"]) av = schema.make_valid_avro(sdtypes, {t["name"]: t for t in sdtypes}, set()) for i in av: self.schemaDefs[i["name"]] = i # type: ignore schema.make_avsc_object(schema.convert_to_dict(av), self.names) # Build record schema from inputs self.inputs_record_schema = { "name": "input_record_schema", "type": "record", "fields": []} # type: Dict[Text, Any] self.outputs_record_schema = { "name": "outputs_record_schema", "type": "record", "fields": []} # type: Dict[Text, Any] for key in ("inputs", "outputs"): for i in self.tool[key]: c = copy.deepcopy(i) c["name"] = shortname(c["id"]) del c["id"] if "type" not in c: raise validate.ValidationException( u"Missing 'type' in parameter '{}'".format(c["name"])) if "default" in c and "null" not in aslist(c["type"]): nullable = ["null"] nullable.extend(aslist(c["type"])) c["type"] = nullable else: c["type"] = c["type"] c["type"] = avroize_type(c["type"], c["name"]) if key == "inputs": self.inputs_record_schema["fields"].append(c) elif key == "outputs": self.outputs_record_schema["fields"].append(c) with SourceLine(toolpath_object, "inputs", validate.ValidationException): self.inputs_record_schema = cast( Dict[Text, Any], schema.make_valid_avro( self.inputs_record_schema, {}, set())) schema.make_avsc_object( schema.convert_to_dict(self.inputs_record_schema), self.names) with SourceLine(toolpath_object, "outputs", validate.ValidationException): self.outputs_record_schema = cast( Dict[Text, Any], schema.make_valid_avro(self.outputs_record_schema, {}, set())) schema.make_avsc_object( schema.convert_to_dict(self.outputs_record_schema), self.names) if toolpath_object.get("class") is not None \ and not getdefault(loadingContext.disable_js_validation, False): if loadingContext.js_hint_options_file is not None: try: with open(loadingContext.js_hint_options_file) as options_file: validate_js_options = json.load(options_file) except (OSError, ValueError) as err: _logger.error( "Failed to read options file %s", loadingContext.js_hint_options_file) raise else: validate_js_options = None if self.doc_schema is not None: validate_js_expressions( cast(CommentedMap, toolpath_object), self.doc_schema.names[toolpath_object["class"]], validate_js_options) dockerReq, is_req = self.get_requirement("DockerRequirement") if dockerReq is not None and "dockerOutputDirectory" in dockerReq\ and is_req is not None and not is_req: _logger.warning(SourceLine( item=dockerReq, raise_type=Text).makeError( "When 'dockerOutputDirectory' is declared, DockerRequirement " "should go in the 'requirements' section, not 'hints'.""")) if dockerReq is not None and is_req is not None\ and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl": if is_req: # In this specific case, it is legal to have /var/spool/cwl, so skip the check. pass else: # Must be a requirement var_spool_cwl_detector(self.tool) else: var_spool_cwl_detector(self.tool) def _init_job(self, joborder, runtime_context): # type: (Mapping[Text, Text], RuntimeContext) -> Builder if self.metadata.get("cwlVersion") != INTERNAL_VERSION: raise WorkflowException("Process object loaded with version '%s', must update to '%s' in order to execute." % ( self.metadata.get("cwlVersion"), INTERNAL_VERSION)) job = cast(Dict[Text, expression.JSON], copy.deepcopy(joborder)) make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) fs_access = make_fs_access(runtime_context.basedir) load_listing_req, _ = self.get_requirement( "LoadListingRequirement") if load_listing_req is not None: load_listing = load_listing_req.get("loadListing") else: load_listing = "no_listing" # Validate job order try: fill_in_defaults(self.tool[u"inputs"], job, fs_access) normalizeFilesDirs(job) schema = self.names.get_name("input_record_schema", "") if schema is None: raise WorkflowException("Missing input record schema: " "{}".format(self.names)) validate.validate_ex(schema, job, strict=False, logger=_logger_validation_warnings) if load_listing and load_listing != "no_listing": get_listing(fs_access, job, recursive=(load_listing == "deep_listing")) visit_class(job, ("File",), functools.partial(add_sizes, fs_access)) if load_listing == "deep_listing": for i, inparm in enumerate(self.tool["inputs"]): k = shortname(inparm["id"]) if k not in job: continue v = job[k] dircount = [0] def inc(d): # type: (List[int]) -> None d[0] += 1 visit_class(v, ("Directory",), lambda x: inc(dircount)) if dircount[0] == 0: continue filecount = [0] visit_class(v, ("File",), lambda x: inc(filecount)) if filecount[0] > FILE_COUNT_WARNING: # Long lines in this message are okay, will be reflowed based on terminal columns. _logger.warning(strip_dup_lineno(SourceLine(self.tool["inputs"], i, Text).makeError( """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use. If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior: $namespaces: cwltool: "http://commonwl.org/cwltool#" hints: cwltool:LoadListingRequirement: loadListing: shallow_listing """ % (filecount[0], k)))) except (validate.ValidationException, WorkflowException) as err: raise_from(WorkflowException("Invalid job input record:\n" + Text(err)), err) files = [] # type: List[Dict[Text, Text]] bindings = CommentedSeq() tmpdir = u"" stagedir = u"" docker_req, _ = self.get_requirement("DockerRequirement") default_docker = None if docker_req is None and runtime_context.default_container: default_docker = runtime_context.default_container if (docker_req or default_docker) and runtime_context.use_container: if docker_req is not None: # Check if docker output directory is absolute if docker_req.get("dockerOutputDirectory") and \ docker_req.get("dockerOutputDirectory").startswith('/'): outdir = docker_req.get("dockerOutputDirectory") else: outdir = docker_req.get("dockerOutputDirectory") or \ runtime_context.docker_outdir or random_outdir() elif default_docker is not None: outdir = runtime_context.docker_outdir or random_outdir() tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec stagedir = runtime_context.docker_stagedir or "/var/lib/cwl" else: outdir = fs_access.realpath( runtime_context.outdir or tempfile.mkdtemp( prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))) if self.tool[u"class"] != 'Workflow': tmpdir = fs_access.realpath(runtime_context.tmpdir or tempfile.mkdtemp()) stagedir = fs_access.realpath(runtime_context.stagedir or tempfile.mkdtemp()) builder = Builder(job, files, bindings, self.schemaDefs, self.names, self.requirements, self.hints, {}, runtime_context.mutation_manager, self.formatgraph, make_fs_access, fs_access, runtime_context.job_script_provider, runtime_context.eval_timeout, runtime_context.debug, runtime_context.js_console, runtime_context.force_docker_pull, load_listing, outdir, tmpdir, stagedir) bindings.extend(builder.bind_input( self.inputs_record_schema, job, discover_secondaryFiles=getdefault(runtime_context.toplevel, False))) if self.tool.get("baseCommand"): for index, command in enumerate(aslist(self.tool["baseCommand"])): bindings.append({ "position": [-1000000, index], "datum": command }) if self.tool.get("arguments"): for i, arg in enumerate(self.tool["arguments"]): lc = self.tool["arguments"].lc.data[i] filename = self.tool["arguments"].lc.filename bindings.lc.add_kv_line_col(len(bindings), lc) if isinstance(arg, MutableMapping): arg = copy.deepcopy(arg) if arg.get("position"): position = arg.get("position") if isinstance(position, str): # no need to test the # CWLVersion as the v1.0 # schema only allows ints position = builder.do_eval(position) if position is None: position = 0 arg["position"] = [position, i] else: arg["position"] = [0, i] bindings.append(arg) elif ("$(" in arg) or ("${" in arg): cm = CommentedMap(( ("position", [0, i]), ("valueFrom", arg) )) cm.lc.add_kv_line_col("valueFrom", lc) cm.lc.filename = filename bindings.append(cm) else: cm = CommentedMap(( ("position", [0, i]), ("datum", arg) )) cm.lc.add_kv_line_col("datum", lc) cm.lc.filename = filename bindings.append(cm) # use python2 like sorting of heterogeneous lists # (containing str and int types), if PY3: key = functools.cmp_to_key(cmp_like_py2) else: # PY2 key = lambda d: d["position"] # This awkward construction replaces the contents of # "bindings" in place (because Builder expects it to be # mutated in place, sigh, I'm sorry) with its contents sorted, # supporting different versions of Python and ruamel.yaml with # different behaviors/bugs in CommentedSeq. bindings_copy = copy.deepcopy(bindings) del bindings[:] bindings.extend(sorted(bindings_copy, key=key)) if self.tool[u"class"] != 'Workflow': builder.resources = self.evalResources(builder, runtime_context) return builder def evalResources(self, builder, runtimeContext): # type: (Builder, RuntimeContext) -> Dict[str, int] resourceReq, _ = self.get_requirement("ResourceRequirement") if resourceReq is None: resourceReq = {} cwl_version = self.metadata.get( "http://commonwl.org/cwltool#original_cwlVersion", None) if cwl_version == "v1.0": ram = 1024 else: ram = 256 request = { "coresMin": 1, "coresMax": 1, "ramMin": ram, "ramMax": ram, "tmpdirMin": 1024, "tmpdirMax": 1024, "outdirMin": 1024, "outdirMax": 1024 } # type: Dict[str, int] for a in ("cores", "ram", "tmpdir", "outdir"): mn = None mx = None if resourceReq.get(a + "Min"): mn = eval_resource(builder, resourceReq[a + "Min"]) if resourceReq.get(a + "Max"): mx = eval_resource(builder, resourceReq[a + "Max"]) if mn is None: mn = mx elif mx is None: mx = mn if mn is not None: request[a + "Min"] = cast(int, mn) request[a + "Max"] = cast(int, mx) if runtimeContext.select_resources is not None: return runtimeContext.select_resources(request, runtimeContext) return { "cores": request["coresMin"], "ram": request["ramMin"], "tmpdirSize": request["tmpdirMin"], "outdirSize": request["outdirMin"], } def validate_hints(self, avsc_names, hints, strict): # type: (Any, List[Dict[Text, Any]], bool) -> None for i, r in enumerate(hints): sl = SourceLine(hints, i, validate.ValidationException) with sl: if avsc_names.get_name(r["class"], "") is not None and self.doc_loader is not None: plain_hint = dict((key, r[key]) for key in r if key not in self.doc_loader.identifiers) # strip identifiers validate.validate_ex( avsc_names.get_name(plain_hint["class"], ""), plain_hint, strict=strict) elif r["class"] in ("NetworkAccess", "LoadListingRequirement"): pass else: _logger.info(Text(sl.makeError(u"Unknown hint %s" % (r["class"])))) def visit(self, op): # type: (Callable[[MutableMapping[Text, Any]], None]) -> None op(self.tool) @abc.abstractmethod def job(self, job_order, # type: Mapping[Text, Text] output_callbacks, # type: Callable[[Any, Any], Any] runtimeContext # type: RuntimeContext ): # type: (...) -> Generator[Any, None, None] # FIXME: Declare base type for what Generator yields pass _names = set() # type: Set[Text] def uniquename(stem, names=None): # type: (Text, Optional[Set[Text]]) -> Text global _names if names is None: names = _names c = 1 u = stem while u in names: c += 1 u = u"%s_%s" % (stem, c) names.add(u) return u def nestdir(base, deps): # type: (Text, Dict[Text, Any]) -> Dict[Text, Any] dirname = os.path.dirname(base) + "/" subid = deps["location"] if subid.startswith(dirname): s2 = subid[len(dirname):] sp = s2.split('/') sp.pop() while sp: nx = sp.pop() deps = { "class": "Directory", "basename": nx, "listing": [deps] } return deps def mergedirs(listing): # type: (List[Dict[Text, Any]]) -> List[Dict[Text, Any]] r = [] # type: List[Dict[Text, Any]] ents = {} # type: Dict[Text, Any] collided = set() # type: Set[Text] for e in listing: if e["basename"] not in ents: ents[e["basename"]] = e elif e["class"] == "Directory": if e.get("listing"): ents[e["basename"]].setdefault("listing", []).extend(e["listing"]) if ents[e["basename"]]["location"].startswith("_:"): ents[e["basename"]]["location"] = e["location"] elif e["location"] != ents[e["basename"]]["location"]: # same basename, different location, collision, # rename both. collided.add(e["basename"]) e2 = ents[e["basename"]] e["basename"] = urllib.parse.quote(e["location"], safe="") e2["basename"] = urllib.parse.quote(e2["location"], safe="") e["nameroot"], e["nameext"] = os.path.splitext(e["basename"]) e2["nameroot"], e2["nameext"] = os.path.splitext(e2["basename"]) ents[e["basename"]] = e ents[e2["basename"]] = e2 for c in collided: del ents[c] for e in itervalues(ents): if e["class"] == "Directory" and "listing" in e: e["listing"] = mergedirs(e["listing"]) r.extend(itervalues(ents)) return r CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl" def scandeps(base, # type: Text doc, # type: Any reffields, # type: Set[Text] urlfields, # type: Set[Text] loadref, # type: Callable[[Text, Text], Text] urljoin=urllib.parse.urljoin, # type: Callable[[Text, Text], Text] nestdirs=True # type: bool ): # type: (...) -> List[Dict[Text, Text]] r = [] # type: List[Dict[Text, Text]] if isinstance(doc, MutableMapping): if "id" in doc: if doc["id"].startswith("file://"): df, _ = urllib.parse.urldefrag(doc["id"]) if base != df: r.append({ "class": "File", "location": df, "format": CWL_IANA }) base = df if doc.get("class") in ("File", "Directory") and "location" in urlfields: u = doc.get("location", doc.get("path")) if u and not u.startswith("_:"): deps = {"class": doc["class"], "location": urljoin(base, u) } # type: Dict[Text, Any] if "basename" in doc: deps["basename"] = doc["basename"] if doc["class"] == "Directory" and "listing" in doc: deps["listing"] = doc["listing"] if doc["class"] == "File" and "secondaryFiles" in doc: deps["secondaryFiles"] = doc["secondaryFiles"] if nestdirs: deps = nestdir(base, deps) r.append(deps) else: if doc["class"] == "Directory" and "listing" in doc: r.extend(scandeps( base, doc["listing"], reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs)) elif doc["class"] == "File" and "secondaryFiles" in doc: r.extend(scandeps( base, doc["secondaryFiles"], reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs)) for k, v in iteritems(doc): if k in reffields: for u in aslist(v): if isinstance(u, MutableMapping): r.extend(scandeps( base, u, reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs)) else: subid = urljoin(base, u) basedf, _ = urllib.parse.urldefrag(base) subiddf, _ = urllib.parse.urldefrag(subid) if basedf == subiddf: continue sub = loadref(base, u) deps = { "class": "File", "location": subid, "format": CWL_IANA } sf = scandeps( subid, sub, reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs) if sf: deps["secondaryFiles"] = sf if nestdirs: deps = nestdir(base, deps) r.append(deps) elif k in urlfields and k != "location": for u in aslist(v): deps = { "class": "File", "location": urljoin(base, u) } if nestdirs: deps = nestdir(base, deps) r.append(deps) elif k not in ("listing", "secondaryFiles"): r.extend(scandeps( base, v, reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs)) elif isinstance(doc, MutableSequence): for d in doc: r.extend(scandeps( base, d, reffields, urlfields, loadref, urljoin=urljoin, nestdirs=nestdirs)) if r: normalizeFilesDirs(r) r = mergedirs(r) return r def compute_checksums(fs_access, fileobj): # type: (StdFsAccess, Dict[Text, Any]) -> None if "checksum" not in fileobj: checksum = hashlib.sha1() # nosec with fs_access.open(fileobj["location"], "rb") as f: contents = f.read(1024 * 1024) while contents != b"": checksum.update(contents) contents = f.read(1024 * 1024) fileobj["checksum"] = "sha1$%s" % checksum.hexdigest() fileobj["size"] = fs_access.size(fileobj["location"])