Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/command_line_tool.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/cwltool/command_line_tool.py Thu May 14 14:56:58 2020 -0400 @@ -0,0 +1,853 @@ +"""Implementation of CommandLineTool.""" +from __future__ import absolute_import + +import copy +import hashlib +import json +import locale +import logging +import os +import re +import shutil +import tempfile +import threading +from functools import cmp_to_key, partial +from typing import (Any, Callable, Dict, Generator, IO, List, Mapping, + MutableMapping, MutableSequence, Optional, Set, Union, cast) + +from typing_extensions import Text, Type, TYPE_CHECKING # pylint: disable=unused-import +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +import shellescape +from schema_salad import validate +from schema_salad.avro.schema import Schema +from schema_salad.ref_resolver import file_uri, uri_file_path +from schema_salad.sourceline import SourceLine +from six import string_types +from future.utils import raise_from + +from six.moves import map, urllib +from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import + Text, Type) +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +from .builder import (Builder, content_limit_respected_read_bytes, # pylint: disable=unused-import + substitute) +from .context import LoadingContext # pylint: disable=unused-import +from .context import RuntimeContext, getdefault +from .docker import DockerCommandLineJob +from .errors import WorkflowException +from .flatten import flatten +from .job import CommandLineJob, JobBase # pylint: disable=unused-import +from .loghandler import _logger +from .mutation import MutationManager # pylint: disable=unused-import +from .pathmapper import (PathMapper, adjustDirObjs, adjustFileObjs, + get_listing, trim_listing, visit_class) +from .process import (Process, UnsupportedRequirement, + _logger_validation_warnings, compute_checksums, + normalizeFilesDirs, shortname, uniquename) +from .singularity import SingularityCommandLineJob +from .software_requirements import ( # pylint: disable=unused-import + DependenciesConfiguration) +from .stdfsaccess import StdFsAccess # pylint: disable=unused-import +from .utils import (aslist, convert_pathsep_to_unix, + docker_windows_path_adjust, json_dumps, onWindows, + random_outdir, windows_default_container_id, + shared_file_lock, upgrade_lock) +if TYPE_CHECKING: + from .provenance import ProvenanceProfile # pylint: disable=unused-import + +ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$") +ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") # Accept anything +ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE +DEFAULT_CONTAINER_MSG = """ +We are on Microsoft Windows and not all components of this CWL description have a +container specified. This means that these steps will be executed in the default container, +which is %s. + +Note, this could affect portability if this CWL description relies on non-POSIX features +or commands in this container. For best results add the following to your CWL +description's hints section: + +hints: + DockerRequirement: + dockerPull: %s +""" + + +class ExpressionTool(Process): + class ExpressionJob(object): + """Job for ExpressionTools.""" + + def __init__(self, + builder, # type: Builder + script, # type: Dict[Text, Text] + output_callback, # type: Callable[[Any, Any], Any] + requirements, # type: List[Dict[Text, Text]] + hints, # type: List[Dict[Text, Text]] + outdir=None, # type: Optional[Text] + tmpdir=None, # type: Optional[Text] + ): # type: (...) -> None + """Initializet this ExpressionJob.""" + self.builder = builder + self.requirements = requirements + self.hints = hints + self.collect_outputs = None # type: Optional[Callable[[Any], Any]] + self.output_callback = output_callback + self.outdir = outdir + self.tmpdir = tmpdir + self.script = script + self.prov_obj = None # type: Optional[ProvenanceProfile] + + def run(self, + runtimeContext, # type: RuntimeContext + tmpdir_lock=None # type: Optional[threading.Lock] + ): # type: (...) -> None + try: + normalizeFilesDirs(self.builder.job) + ev = self.builder.do_eval(self.script) + normalizeFilesDirs(ev) + self.output_callback(ev, "success") + except Exception as err: + _logger.warning(u"Failed to evaluate expression:\n%s", + Text(err), exc_info=runtimeContext.debug) + self.output_callback({}, "permanentFail") + + def job(self, + job_order, # type: Mapping[Text, Text] + output_callbacks, # type: Callable[[Any, Any], Any] + runtimeContext # type: RuntimeContext + ): + # type: (...) -> Generator[ExpressionTool.ExpressionJob, None, None] + builder = self._init_job(job_order, runtimeContext) + + job = ExpressionTool.ExpressionJob( + builder, self.tool["expression"], output_callbacks, + self.requirements, self.hints) + job.prov_obj = runtimeContext.prov_obj + yield job + + +def remove_path(f): # type: (Dict[Text, Any]) -> None + if "path" in f: + del f["path"] + + +def revmap_file(builder, outdir, f): + # type: (Builder, Text, Dict[Text, Any]) -> Union[Dict[Text, Any], None] + """ + Remap a file from internal path to external path. + + For Docker, this maps from the path inside tho container to the path + outside the container. Recognizes files in the pathmapper or remaps + internal output directories to the external directory. + """ + split = urllib.parse.urlsplit(outdir) + if not split.scheme: + outdir = file_uri(str(outdir)) + + # builder.outdir is the inner (container/compute node) output directory + # outdir is the outer (host/storage system) output directory + + if "location" in f and "path" not in f: + if f["location"].startswith("file://"): + f["path"] = convert_pathsep_to_unix(uri_file_path(f["location"])) + else: + return f + + if "path" in f: + path = f["path"] + uripath = file_uri(path) + del f["path"] + + if "basename" not in f: + f["basename"] = os.path.basename(path) + + if not builder.pathmapper: + raise ValueError("Do not call revmap_file using a builder that doesn't have a pathmapper.") + revmap_f = builder.pathmapper.reversemap(path) + + if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"): + f["location"] = revmap_f[1] + elif uripath == outdir or uripath.startswith(outdir+os.sep) or uripath.startswith(outdir+'/'): + f["location"] = file_uri(path) + elif path == builder.outdir or path.startswith(builder.outdir+os.sep) or path.startswith(builder.outdir+'/'): + f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir) + 1:]) + elif not os.path.isabs(path): + f["location"] = builder.fs_access.join(outdir, path) + else: + raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input " + u"file pass through." % (path, builder.outdir)) + return f + + raise WorkflowException(u"Output File object is missing both 'location' " + "and 'path' fields: %s" % f) + + +class CallbackJob(object): + def __init__(self, job, output_callback, cachebuilder, jobcache): + # type: (CommandLineTool, Callable[[Any, Any], Any], Builder, Text) -> None + """Initialize this CallbackJob.""" + self.job = job + self.output_callback = output_callback + self.cachebuilder = cachebuilder + self.outdir = jobcache + self.prov_obj = None # type: Optional[ProvenanceProfile] + + def run(self, + runtimeContext, # type: RuntimeContext + tmpdir_lock=None # type: Optional[threading.Lock] + ): # type: (...) -> None + self.output_callback(self.job.collect_output_ports( + self.job.tool["outputs"], + self.cachebuilder, + self.outdir, + getdefault(runtimeContext.compute_checksum, True)), "success") + + +def check_adjust(builder, file_o): + # type: (Builder, Dict[Text, Any]) -> Dict[Text, Any] + """ + Map files to assigned path inside a container. + + We need to also explicitly walk over input, as implicit reassignment + doesn't reach everything in builder.bindings + """ + if not builder.pathmapper: + raise ValueError("Do not call check_adjust using a builder that doesn't have a pathmapper.") + file_o["path"] = docker_windows_path_adjust( + builder.pathmapper.mapper(file_o["location"])[1]) + dn, bn = os.path.split(file_o["path"]) + if file_o.get("dirname") != dn: + file_o["dirname"] = Text(dn) + if file_o.get("basename") != bn: + file_o["basename"] = Text(bn) + if file_o["class"] == "File": + nr, ne = os.path.splitext(file_o["basename"]) + if file_o.get("nameroot") != nr: + file_o["nameroot"] = Text(nr) + if file_o.get("nameext") != ne: + file_o["nameext"] = Text(ne) + if not ACCEPTLIST_RE.match(file_o["basename"]): + raise WorkflowException( + "Invalid filename: '{}' contains illegal characters".format( + file_o["basename"])) + return file_o + +def check_valid_locations(fs_access, ob): # type: (StdFsAccess, Dict[Text, Any]) -> None + if ob["location"].startswith("_:"): + pass + if ob["class"] == "File" and not fs_access.isfile(ob["location"]): + raise validate.ValidationException("Does not exist or is not a File: '%s'" % ob["location"]) + if ob["class"] == "Directory" and not fs_access.isdir(ob["location"]): + raise validate.ValidationException("Does not exist or is not a Directory: '%s'" % ob["location"]) + + +OutputPorts = Dict[Text, Union[None, Text, List[Union[Dict[Text, Any], Text]], Dict[Text, Any]]] + +class CommandLineTool(Process): + def __init__(self, toolpath_object, loadingContext): + # type: (MutableMapping[Text, Any], LoadingContext) -> None + """Initialize this CommandLineTool.""" + super(CommandLineTool, self).__init__(toolpath_object, loadingContext) + self.prov_obj = loadingContext.prov_obj + + def make_job_runner(self, + runtimeContext # type: RuntimeContext + ): # type: (...) -> Type[JobBase] + dockerReq, _ = self.get_requirement("DockerRequirement") + if not dockerReq and runtimeContext.use_container: + if runtimeContext.find_default_container is not None: + default_container = runtimeContext.find_default_container(self) + if default_container is not None: + self.requirements.insert(0, { + "class": "DockerRequirement", + "dockerPull": default_container + }) + dockerReq = self.requirements[0] + if default_container == windows_default_container_id \ + and runtimeContext.use_container and onWindows(): + _logger.warning( + DEFAULT_CONTAINER_MSG, windows_default_container_id, + windows_default_container_id) + + if dockerReq is not None and runtimeContext.use_container: + if runtimeContext.singularity: + return SingularityCommandLineJob + return DockerCommandLineJob + for t in reversed(self.requirements): + if t["class"] == "DockerRequirement": + raise UnsupportedRequirement( + "--no-container, but this CommandLineTool has " + "DockerRequirement under 'requirements'.") + return CommandLineJob + + def make_path_mapper(self, reffiles, stagedir, runtimeContext, separateDirs): + # type: (List[Any], Text, RuntimeContext, bool) -> PathMapper + return PathMapper(reffiles, runtimeContext.basedir, stagedir, separateDirs) + + def updatePathmap(self, outdir, pathmap, fn): + # type: (Text, PathMapper, Dict[Text, Any]) -> None + if "location" in fn and fn["location"] in pathmap: + pathmap.update(fn["location"], pathmap.mapper(fn["location"]).resolved, + os.path.join(outdir, fn["basename"]), + ("Writable" if fn.get("writable") else "") + fn["class"], False) + for sf in fn.get("secondaryFiles", []): + self.updatePathmap(outdir, pathmap, sf) + for ls in fn.get("listing", []): + self.updatePathmap(os.path.join(outdir, fn["basename"]), pathmap, ls) + + def job(self, + job_order, # type: Mapping[Text, Text] + output_callbacks, # type: Callable[[Any, Any], Any] + runtimeContext # type: RuntimeContext + ): + # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None] + + workReuse, _ = self.get_requirement("WorkReuse") + enableReuse = workReuse.get("enableReuse", True) if workReuse else True + + jobname = uniquename(runtimeContext.name or shortname(self.tool.get("id", "job"))) + if runtimeContext.cachedir and enableReuse: + cachecontext = runtimeContext.copy() + cachecontext.outdir = "/out" + cachecontext.tmpdir = "/tmp" # nosec + cachecontext.stagedir = "/stage" + cachebuilder = self._init_job(job_order, cachecontext) + cachebuilder.pathmapper = PathMapper(cachebuilder.files, + runtimeContext.basedir, + cachebuilder.stagedir, + separateDirs=False) + _check_adjust = partial(check_adjust, cachebuilder) + visit_class([cachebuilder.files, cachebuilder.bindings], + ("File", "Directory"), _check_adjust) + + cmdline = flatten(list(map(cachebuilder.generate_arg, cachebuilder.bindings))) + docker_req, _ = self.get_requirement("DockerRequirement") + if docker_req is not None and runtimeContext.use_container: + dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull") + elif runtimeContext.default_container is not None and runtimeContext.use_container: + dockerimg = runtimeContext.default_container + else: + dockerimg = None + + if dockerimg is not None: + cmdline = ["docker", "run", dockerimg] + cmdline + # not really run using docker, just for hashing purposes + keydict = {u"cmdline": cmdline} # type: Dict[Text, Union[Dict[Text, Any], List[Any]]] + + for shortcut in ["stdin", "stdout", "stderr"]: + if shortcut in self.tool: + keydict[shortcut] = self.tool[shortcut] + + for location, fobj in cachebuilder.pathmapper.items(): + if fobj.type == "File": + checksum = next( + (e['checksum'] for e in cachebuilder.files + if 'location' in e and e['location'] == location + and 'checksum' in e + and e['checksum'] != 'sha1$hash'), None) + fobj_stat = os.stat(fobj.resolved) + if checksum is not None: + keydict[fobj.resolved] = [fobj_stat.st_size, checksum] + else: + keydict[fobj.resolved] = [fobj_stat.st_size, + int(fobj_stat.st_mtime * 1000)] + + interesting = {"DockerRequirement", + "EnvVarRequirement", + "InitialWorkDirRequirement", + "ShellCommandRequirement", + "NetworkAccess"} + for rh in (self.original_requirements, self.original_hints): + for r in reversed(rh): + if r["class"] in interesting and r["class"] not in keydict: + keydict[r["class"]] = r + + keydictstr = json_dumps(keydict, separators=(',', ':'), + sort_keys=True) + cachekey = hashlib.md5( # nosec + keydictstr.encode('utf-8')).hexdigest() + + _logger.debug("[job %s] keydictstr is %s -> %s", jobname, + keydictstr, cachekey) + + jobcache = os.path.join(runtimeContext.cachedir, cachekey) + + # Create a lockfile to manage cache status. + jobcachepending = "{}.status".format(jobcache) + jobcachelock = None + jobstatus = None + + # Opens the file for read/write, or creates an empty file. + jobcachelock = open(jobcachepending, "a+") + + # get the shared lock to ensure no other process is trying + # to write to this cache + shared_file_lock(jobcachelock) + jobcachelock.seek(0) + jobstatus = jobcachelock.read() + + if os.path.isdir(jobcache) and jobstatus == "success": + if docker_req and runtimeContext.use_container: + cachebuilder.outdir = runtimeContext.docker_outdir or random_outdir() + else: + cachebuilder.outdir = jobcache + + _logger.info("[job %s] Using cached output in %s", jobname, jobcache) + yield CallbackJob(self, output_callbacks, cachebuilder, jobcache) + # we're done with the cache so release lock + jobcachelock.close() + return + else: + _logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache) + + # turn shared lock into an exclusive lock since we'll + # be writing the cache directory + upgrade_lock(jobcachelock) + + shutil.rmtree(jobcache, True) + os.makedirs(jobcache) + runtimeContext = runtimeContext.copy() + runtimeContext.outdir = jobcache + + def update_status_output_callback( + output_callbacks, # type: Callable[[List[Dict[Text, Any]], Text], None] + jobcachelock, # type: IO[Any] + outputs, # type: List[Dict[Text, Any]] + processStatus # type: Text + ): # type: (...) -> None + # save status to the lockfile then release the lock + jobcachelock.seek(0) + jobcachelock.truncate() + jobcachelock.write(processStatus) + jobcachelock.close() + output_callbacks(outputs, processStatus) + + output_callbacks = partial( + update_status_output_callback, output_callbacks, jobcachelock) + + builder = self._init_job(job_order, runtimeContext) + + reffiles = copy.deepcopy(builder.files) + + j = self.make_job_runner(runtimeContext)( + builder, builder.job, self.make_path_mapper, self.requirements, + self.hints, jobname) + j.prov_obj = self.prov_obj + + j.successCodes = self.tool.get("successCodes", []) + j.temporaryFailCodes = self.tool.get("temporaryFailCodes", []) + j.permanentFailCodes = self.tool.get("permanentFailCodes", []) + + debug = _logger.isEnabledFor(logging.DEBUG) + + if debug: + _logger.debug(u"[job %s] initializing from %s%s", + j.name, + self.tool.get("id", ""), + u" as part of %s" % runtimeContext.part_of + if runtimeContext.part_of else "") + _logger.debug(u"[job %s] %s", j.name, json_dumps(builder.job, + indent=4)) + + builder.pathmapper = self.make_path_mapper( + reffiles, builder.stagedir, runtimeContext, True) + builder.requirements = j.requirements + + _check_adjust = partial(check_adjust, builder) + + visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust) + + initialWorkdir, _ = self.get_requirement("InitialWorkDirRequirement") + if initialWorkdir is not None: + ls = [] # type: List[Dict[Text, Any]] + if isinstance(initialWorkdir["listing"], string_types): + ls = builder.do_eval(initialWorkdir["listing"]) + else: + for t in initialWorkdir["listing"]: + if isinstance(t, Mapping) and "entry" in t: + entry_exp = builder.do_eval(t["entry"], strip_whitespace=False) + for entry in aslist(entry_exp): + et = {u"entry": entry} + if "entryname" in t: + et["entryname"] = builder.do_eval(t["entryname"]) + else: + et["entryname"] = None + et["writable"] = t.get("writable", False) + if et[u"entry"] is not None: + ls.append(et) + else: + initwd_item = builder.do_eval(t) + if not initwd_item: + continue + if isinstance(initwd_item, MutableSequence): + ls.extend(initwd_item) + else: + ls.append(initwd_item) + for i, t in enumerate(ls): + if "entry" in t: + if isinstance(t["entry"], string_types): + ls[i] = { + "class": "File", + "basename": t["entryname"], + "contents": t["entry"], + "writable": t.get("writable") + } + else: + if t.get("entryname") or t.get("writable"): + t = copy.deepcopy(t) + if t.get("entryname"): + t["entry"]["basename"] = t["entryname"] + t["entry"]["writable"] = t.get("writable") + ls[i] = t["entry"] + j.generatefiles["listing"] = ls + for l in ls: + self.updatePathmap(builder.outdir, builder.pathmapper, l) + visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust) + + if debug: + _logger.debug(u"[job %s] path mappings is %s", j.name, + json_dumps({p: builder.pathmapper.mapper(p) + for p in builder.pathmapper.files()}, + indent=4)) + + if self.tool.get("stdin"): + with SourceLine(self.tool, "stdin", validate.ValidationException, debug): + j.stdin = builder.do_eval(self.tool["stdin"]) + if j.stdin: + reffiles.append({"class": "File", "path": j.stdin}) + + if self.tool.get("stderr"): + with SourceLine(self.tool, "stderr", validate.ValidationException, debug): + j.stderr = builder.do_eval(self.tool["stderr"]) + if j.stderr: + if os.path.isabs(j.stderr) or ".." in j.stderr: + raise validate.ValidationException( + "stderr must be a relative path, got '%s'" % j.stderr) + + if self.tool.get("stdout"): + with SourceLine(self.tool, "stdout", validate.ValidationException, debug): + j.stdout = builder.do_eval(self.tool["stdout"]) + if j.stdout: + if os.path.isabs(j.stdout) or ".." in j.stdout or not j.stdout: + raise validate.ValidationException( + "stdout must be a relative path, got '%s'" % j.stdout) + + if debug: + _logger.debug(u"[job %s] command line bindings is %s", j.name, + json_dumps(builder.bindings, indent=4)) + dockerReq, _ = self.get_requirement("DockerRequirement") + if dockerReq is not None and runtimeContext.use_container: + out_dir, out_prefix = os.path.split( + runtimeContext.tmp_outdir_prefix) + j.outdir = runtimeContext.outdir or \ + tempfile.mkdtemp(prefix=out_prefix, dir=out_dir) + tmpdir_dir, tmpdir_prefix = os.path.split( + runtimeContext.tmpdir_prefix) + j.tmpdir = runtimeContext.tmpdir or \ + tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir) + j.stagedir = tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir) + else: + j.outdir = builder.outdir + j.tmpdir = builder.tmpdir + j.stagedir = builder.stagedir + + inplaceUpdateReq, _ = self.get_requirement("InplaceUpdateRequirement") + if inplaceUpdateReq is not None: + j.inplace_update = inplaceUpdateReq["inplaceUpdate"] + normalizeFilesDirs(j.generatefiles) + + readers = {} # type: Dict[Text, Any] + muts = set() # type: Set[Text] + + if builder.mutation_manager is not None: + def register_mut(f): # type: (Dict[Text, Any]) -> None + mm = cast(MutationManager, builder.mutation_manager) + muts.add(f["location"]) + mm.register_mutation(j.name, f) + + def register_reader(f): # type: (Dict[Text, Any]) -> None + mm = cast(MutationManager, builder.mutation_manager) + if f["location"] not in muts: + mm.register_reader(j.name, f) + readers[f["location"]] = copy.deepcopy(f) + + for li in j.generatefiles["listing"]: + li = cast(Dict[Text, Any], li) + if li.get("writable") and j.inplace_update: + adjustFileObjs(li, register_mut) + adjustDirObjs(li, register_mut) + else: + adjustFileObjs(li, register_reader) + adjustDirObjs(li, register_reader) + + adjustFileObjs(builder.files, register_reader) + adjustFileObjs(builder.bindings, register_reader) + adjustDirObjs(builder.files, register_reader) + adjustDirObjs(builder.bindings, register_reader) + + timelimit, _ = self.get_requirement("ToolTimeLimit") + if timelimit is not None: + with SourceLine(timelimit, "timelimit", validate.ValidationException, debug): + j.timelimit = builder.do_eval(timelimit["timelimit"]) + if not isinstance(j.timelimit, int) or j.timelimit < 0: + raise Exception("timelimit must be an integer >= 0, got: %s" % j.timelimit) + + networkaccess, _ = self.get_requirement("NetworkAccess") + if networkaccess is not None: + with SourceLine(networkaccess, "networkAccess", validate.ValidationException, debug): + j.networkaccess = builder.do_eval(networkaccess["networkAccess"]) + if not isinstance(j.networkaccess, bool): + raise Exception("networkAccess must be a boolean, got: %s" % j.networkaccess) + + j.environment = {} + evr, _ = self.get_requirement("EnvVarRequirement") + if evr is not None: + for t in evr["envDef"]: + j.environment[t["envName"]] = builder.do_eval(t["envValue"]) + + shellcmd, _ = self.get_requirement("ShellCommandRequirement") + if shellcmd is not None: + cmd = [] # type: List[Text] + for b in builder.bindings: + arg = builder.generate_arg(b) + if b.get("shellQuote", True): + arg = [shellescape.quote(a) for a in aslist(arg)] + cmd.extend(aslist(arg)) + j.command_line = ["/bin/sh", "-c", " ".join(cmd)] + else: + j.command_line = flatten(list(map(builder.generate_arg, builder.bindings))) + + j.pathmapper = builder.pathmapper + j.collect_outputs = partial( + self.collect_output_ports, self.tool["outputs"], builder, + compute_checksum=getdefault(runtimeContext.compute_checksum, True), + jobname=jobname, + readers=readers) + j.output_callback = output_callbacks + + yield j + + def collect_output_ports(self, + ports, # type: Set[Dict[Text, Any]] + builder, # type: Builder + outdir, # type: Text + rcode, # type: int + compute_checksum=True, # type: bool + jobname="", # type: Text + readers=None # type: Optional[Dict[Text, Any]] + ): # type: (...) -> OutputPorts + ret = {} # type: OutputPorts + debug = _logger.isEnabledFor(logging.DEBUG) + cwl_version = self.metadata.get( + "http://commonwl.org/cwltool#original_cwlVersion", None) + if cwl_version != "v1.0": + builder.resources["exitCode"] = rcode + try: + fs_access = builder.make_fs_access(outdir) + custom_output = fs_access.join(outdir, "cwl.output.json") + if fs_access.exists(custom_output): + with fs_access.open(custom_output, "r") as f: + ret = json.load(f) + if debug: + _logger.debug(u"Raw output from %s: %s", custom_output, + json_dumps(ret, indent=4)) + else: + for i, port in enumerate(ports): + class ParameterOutputWorkflowException(WorkflowException): + def __init__(self, msg, **kwargs): # type: (Text, **Any) -> None + super(ParameterOutputWorkflowException, self).__init__( + u"Error collecting output for parameter '%s':\n%s" + % (shortname(port["id"]), msg), kwargs) + with SourceLine(ports, i, ParameterOutputWorkflowException, debug): + fragment = shortname(port["id"]) + ret[fragment] = self.collect_output(port, builder, outdir, fs_access, + compute_checksum=compute_checksum) + if ret: + revmap = partial(revmap_file, builder, outdir) + adjustDirObjs(ret, trim_listing) + visit_class(ret, ("File", "Directory"), cast(Callable[[Any], Any], revmap)) + visit_class(ret, ("File", "Directory"), remove_path) + normalizeFilesDirs(ret) + visit_class(ret, ("File", "Directory"), partial(check_valid_locations, fs_access)) + + if compute_checksum: + adjustFileObjs(ret, partial(compute_checksums, fs_access)) + expected_schema = cast(Schema, self.names.get_name( + "outputs_record_schema", "")) + validate.validate_ex(expected_schema, ret, + strict=False, logger=_logger_validation_warnings) + if ret is not None and builder.mutation_manager is not None: + adjustFileObjs(ret, builder.mutation_manager.set_generation) + return ret if ret is not None else {} + except validate.ValidationException as e: + raise_from(WorkflowException( + "Error validating output record. " + Text(e) + "\n in " + + json_dumps(ret, indent=4)), e) + finally: + if builder.mutation_manager and readers: + for r in readers.values(): + builder.mutation_manager.release_reader(jobname, r) + + def collect_output(self, + schema, # type: Dict[Text, Any] + builder, # type: Builder + outdir, # type: Text + fs_access, # type: StdFsAccess + compute_checksum=True # type: bool + ): + # type: (...) -> Optional[Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]] + r = [] # type: List[Any] + empty_and_optional = False + debug = _logger.isEnabledFor(logging.DEBUG) + if "outputBinding" in schema: + binding = schema["outputBinding"] + globpatterns = [] # type: List[Text] + + revmap = partial(revmap_file, builder, outdir) + + if "glob" in binding: + with SourceLine(binding, "glob", WorkflowException, debug): + for gb in aslist(binding["glob"]): + gb = builder.do_eval(gb) + if gb: + globpatterns.extend(aslist(gb)) + + for gb in globpatterns: + if gb.startswith(builder.outdir): + gb = gb[len(builder.outdir) + 1:] + elif gb == ".": + gb = outdir + elif gb.startswith("/"): + raise WorkflowException( + "glob patterns must not start with '/'") + try: + prefix = fs_access.glob(outdir) + r.extend([{"location": g, + "path": fs_access.join(builder.outdir, + g[len(prefix[0])+1:]), + "basename": os.path.basename(g), + "nameroot": os.path.splitext( + os.path.basename(g))[0], + "nameext": os.path.splitext( + os.path.basename(g))[1], + "class": "File" if fs_access.isfile(g) + else "Directory"} + for g in sorted(fs_access.glob( + fs_access.join(outdir, gb)), + key=cmp_to_key(cast( + Callable[[Text, Text], + int], locale.strcoll)))]) + except (OSError, IOError) as e: + _logger.warning(Text(e)) + except Exception: + _logger.error("Unexpected error from fs_access", exc_info=True) + raise + + for files in r: + rfile = files.copy() + revmap(rfile) + if files["class"] == "Directory": + ll = schema.get("loadListing") or builder.loadListing + if ll and ll != "no_listing": + get_listing(fs_access, files, (ll == "deep_listing")) + else: + if binding.get("loadContents"): + with fs_access.open(rfile["location"], "rb") as f: + files["contents"] = content_limit_respected_read_bytes(f).decode("utf-8") + if compute_checksum: + with fs_access.open(rfile["location"], "rb") as f: + checksum = hashlib.sha1() # nosec + contents = f.read(1024 * 1024) + while contents != b"": + checksum.update(contents) + contents = f.read(1024 * 1024) + files["checksum"] = "sha1$%s" % checksum.hexdigest() + files["size"] = fs_access.size(rfile["location"]) + + optional = False + single = False + if isinstance(schema["type"], MutableSequence): + if "null" in schema["type"]: + optional = True + if "File" in schema["type"] or "Directory" in schema["type"]: + single = True + elif schema["type"] == "File" or schema["type"] == "Directory": + single = True + + if "outputEval" in binding: + with SourceLine(binding, "outputEval", WorkflowException, debug): + r = builder.do_eval(binding["outputEval"], context=r) + + if single: + if not r and not optional: + with SourceLine(binding, "glob", WorkflowException, debug): + raise WorkflowException("Did not find output file with glob pattern: '{}'".format(globpatterns)) + elif not r and optional: + pass + elif isinstance(r, MutableSequence): + if len(r) > 1: + raise WorkflowException("Multiple matches for output item that is a single file.") + else: + r = r[0] + + if "secondaryFiles" in schema: + with SourceLine(schema, "secondaryFiles", WorkflowException, debug): + for primary in aslist(r): + if isinstance(primary, MutableMapping): + primary.setdefault("secondaryFiles", []) + pathprefix = primary["path"][0:primary["path"].rindex("/")+1] + for sf in aslist(schema["secondaryFiles"]): + if 'required' in sf: + sf_required = builder.do_eval(sf['required'], context=primary) + else: + sf_required = False + + if "$(" in sf["pattern"] or "${" in sf["pattern"]: + sfpath = builder.do_eval(sf["pattern"], context=primary) + else: + sfpath = substitute(primary["basename"], sf["pattern"]) + + for sfitem in aslist(sfpath): + if not sfitem: + continue + if isinstance(sfitem, string_types): + sfitem = {"path": pathprefix+sfitem} + if not fs_access.exists(sfitem['path']) and sf_required: + raise WorkflowException( + "Missing required secondary file '%s'" % ( + sfitem["path"])) + if "path" in sfitem and "location" not in sfitem: + revmap(sfitem) + if fs_access.isfile(sfitem["location"]): + sfitem["class"] = "File" + primary["secondaryFiles"].append(sfitem) + elif fs_access.isdir(sfitem["location"]): + sfitem["class"] = "Directory" + primary["secondaryFiles"].append(sfitem) + + if "format" in schema: + for primary in aslist(r): + primary["format"] = builder.do_eval(schema["format"], context=primary) + + # Ensure files point to local references outside of the run environment + adjustFileObjs(r, revmap) + + if not r and optional: + # Don't convert zero or empty string to None + if r in [0, '']: + return r + # For [] or None, return None + else: + return None + + if (not empty_and_optional and isinstance(schema["type"], MutableMapping) + and schema["type"]["type"] == "record"): + out = {} + for f in schema["type"]["fields"]: + out[shortname(f["name"])] = self.collect_output( # type: ignore + f, builder, outdir, fs_access, + compute_checksum=compute_checksum) + return out + return r