diff env/lib/python3.7/site-packages/cwltool/command_line_tool.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/cwltool/command_line_tool.py	Thu May 14 14:56:58 2020 -0400
@@ -0,0 +1,853 @@
+"""Implementation of CommandLineTool."""
+from __future__ import absolute_import
+
+import copy
+import hashlib
+import json
+import locale
+import logging
+import os
+import re
+import shutil
+import tempfile
+import threading
+from functools import cmp_to_key, partial
+from typing import (Any, Callable, Dict, Generator, IO, List, Mapping,
+                    MutableMapping, MutableSequence, Optional, Set, Union, cast)
+
+from typing_extensions import Text, Type, TYPE_CHECKING  # pylint: disable=unused-import
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+import shellescape
+from schema_salad import validate
+from schema_salad.avro.schema import Schema
+from schema_salad.ref_resolver import file_uri, uri_file_path
+from schema_salad.sourceline import SourceLine
+from six import string_types
+from future.utils import raise_from
+
+from six.moves import map, urllib
+from typing_extensions import (TYPE_CHECKING,  # pylint: disable=unused-import
+                               Text, Type)
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+from .builder import (Builder, content_limit_respected_read_bytes, # pylint: disable=unused-import
+                      substitute)
+from .context import LoadingContext  # pylint: disable=unused-import
+from .context import RuntimeContext, getdefault
+from .docker import DockerCommandLineJob
+from .errors import WorkflowException
+from .flatten import flatten
+from .job import CommandLineJob, JobBase  # pylint: disable=unused-import
+from .loghandler import _logger
+from .mutation import MutationManager  # pylint: disable=unused-import
+from .pathmapper import (PathMapper, adjustDirObjs, adjustFileObjs,
+                         get_listing, trim_listing, visit_class)
+from .process import (Process, UnsupportedRequirement,
+                      _logger_validation_warnings, compute_checksums,
+                      normalizeFilesDirs, shortname, uniquename)
+from .singularity import SingularityCommandLineJob
+from .software_requirements import (  # pylint: disable=unused-import
+    DependenciesConfiguration)
+from .stdfsaccess import StdFsAccess  # pylint: disable=unused-import
+from .utils import (aslist, convert_pathsep_to_unix,
+                    docker_windows_path_adjust, json_dumps, onWindows,
+                    random_outdir, windows_default_container_id,
+                    shared_file_lock, upgrade_lock)
+if TYPE_CHECKING:
+    from .provenance import ProvenanceProfile  # pylint: disable=unused-import
+
+ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")  # Accept anything
+ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE
+DEFAULT_CONTAINER_MSG = """
+We are on Microsoft Windows and not all components of this CWL description have a
+container specified. This means that these steps will be executed in the default container,
+which is %s.
+
+Note, this could affect portability if this CWL description relies on non-POSIX features
+or commands in this container. For best results add the following to your CWL
+description's hints section:
+
+hints:
+  DockerRequirement:
+    dockerPull: %s
+"""
+
+
+class ExpressionTool(Process):
+    class ExpressionJob(object):
+        """Job for ExpressionTools."""
+
+        def __init__(self,
+                     builder,          # type: Builder
+                     script,           # type: Dict[Text, Text]
+                     output_callback,  # type: Callable[[Any, Any], Any]
+                     requirements,     # type: List[Dict[Text, Text]]
+                     hints,            # type: List[Dict[Text, Text]]
+                     outdir=None,      # type: Optional[Text]
+                     tmpdir=None,      # type: Optional[Text]
+                    ):  # type: (...) -> None
+            """Initializet this ExpressionJob."""
+            self.builder = builder
+            self.requirements = requirements
+            self.hints = hints
+            self.collect_outputs = None  # type: Optional[Callable[[Any], Any]]
+            self.output_callback = output_callback
+            self.outdir = outdir
+            self.tmpdir = tmpdir
+            self.script = script
+            self.prov_obj = None  # type: Optional[ProvenanceProfile]
+
+        def run(self,
+                runtimeContext,   # type: RuntimeContext
+                tmpdir_lock=None  # type: Optional[threading.Lock]
+               ):  # type: (...) -> None
+            try:
+                normalizeFilesDirs(self.builder.job)
+                ev = self.builder.do_eval(self.script)
+                normalizeFilesDirs(ev)
+                self.output_callback(ev, "success")
+            except Exception as err:
+                _logger.warning(u"Failed to evaluate expression:\n%s",
+                                Text(err), exc_info=runtimeContext.debug)
+                self.output_callback({}, "permanentFail")
+
+    def job(self,
+            job_order,         # type: Mapping[Text, Text]
+            output_callbacks,  # type: Callable[[Any, Any], Any]
+            runtimeContext     # type: RuntimeContext
+           ):
+        # type: (...) -> Generator[ExpressionTool.ExpressionJob, None, None]
+        builder = self._init_job(job_order, runtimeContext)
+
+        job = ExpressionTool.ExpressionJob(
+            builder, self.tool["expression"], output_callbacks,
+            self.requirements, self.hints)
+        job.prov_obj = runtimeContext.prov_obj
+        yield job
+
+
+def remove_path(f):  # type: (Dict[Text, Any]) -> None
+    if "path" in f:
+        del f["path"]
+
+
+def revmap_file(builder, outdir, f):
+    # type: (Builder, Text, Dict[Text, Any]) -> Union[Dict[Text, Any], None]
+    """
+    Remap a file from internal path to external path.
+
+    For Docker, this maps from the path inside tho container to the path
+    outside the container. Recognizes files in the pathmapper or remaps
+    internal output directories to the external directory.
+    """
+    split = urllib.parse.urlsplit(outdir)
+    if not split.scheme:
+        outdir = file_uri(str(outdir))
+
+    # builder.outdir is the inner (container/compute node) output directory
+    # outdir is the outer (host/storage system) output directory
+
+    if "location" in f and "path" not in f:
+        if f["location"].startswith("file://"):
+            f["path"] = convert_pathsep_to_unix(uri_file_path(f["location"]))
+        else:
+            return f
+
+    if "path" in f:
+        path = f["path"]
+        uripath = file_uri(path)
+        del f["path"]
+
+        if "basename" not in f:
+            f["basename"] = os.path.basename(path)
+
+        if not builder.pathmapper:
+            raise ValueError("Do not call revmap_file using a builder that doesn't have a pathmapper.")
+        revmap_f = builder.pathmapper.reversemap(path)
+
+        if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"):
+            f["location"] = revmap_f[1]
+        elif uripath == outdir or uripath.startswith(outdir+os.sep) or uripath.startswith(outdir+'/'):
+            f["location"] = file_uri(path)
+        elif path == builder.outdir or path.startswith(builder.outdir+os.sep) or path.startswith(builder.outdir+'/'):
+            f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir) + 1:])
+        elif not os.path.isabs(path):
+            f["location"] = builder.fs_access.join(outdir, path)
+        else:
+            raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input "
+                                    u"file pass through." % (path, builder.outdir))
+        return f
+
+    raise WorkflowException(u"Output File object is missing both 'location' "
+                            "and 'path' fields: %s" % f)
+
+
+class CallbackJob(object):
+    def __init__(self, job, output_callback, cachebuilder, jobcache):
+        # type: (CommandLineTool, Callable[[Any, Any], Any], Builder, Text) -> None
+        """Initialize this CallbackJob."""
+        self.job = job
+        self.output_callback = output_callback
+        self.cachebuilder = cachebuilder
+        self.outdir = jobcache
+        self.prov_obj = None  # type: Optional[ProvenanceProfile]
+
+    def run(self,
+            runtimeContext,   # type: RuntimeContext
+            tmpdir_lock=None  # type: Optional[threading.Lock]
+            ):  # type: (...) -> None
+        self.output_callback(self.job.collect_output_ports(
+            self.job.tool["outputs"],
+            self.cachebuilder,
+            self.outdir,
+            getdefault(runtimeContext.compute_checksum, True)), "success")
+
+
+def check_adjust(builder, file_o):
+    # type: (Builder, Dict[Text, Any]) -> Dict[Text, Any]
+    """
+    Map files to assigned path inside a container.
+
+    We need to also explicitly walk over input, as implicit reassignment
+    doesn't reach everything in builder.bindings
+    """
+    if not builder.pathmapper:
+            raise ValueError("Do not call check_adjust using a builder that doesn't have a pathmapper.")
+    file_o["path"] = docker_windows_path_adjust(
+        builder.pathmapper.mapper(file_o["location"])[1])
+    dn, bn = os.path.split(file_o["path"])
+    if file_o.get("dirname") != dn:
+        file_o["dirname"] = Text(dn)
+    if file_o.get("basename") != bn:
+        file_o["basename"] = Text(bn)
+    if file_o["class"] == "File":
+        nr, ne = os.path.splitext(file_o["basename"])
+        if file_o.get("nameroot") != nr:
+            file_o["nameroot"] = Text(nr)
+        if file_o.get("nameext") != ne:
+            file_o["nameext"] = Text(ne)
+    if not ACCEPTLIST_RE.match(file_o["basename"]):
+        raise WorkflowException(
+            "Invalid filename: '{}' contains illegal characters".format(
+                file_o["basename"]))
+    return file_o
+
+def check_valid_locations(fs_access, ob):  # type: (StdFsAccess, Dict[Text, Any]) -> None
+    if ob["location"].startswith("_:"):
+        pass
+    if ob["class"] == "File" and not fs_access.isfile(ob["location"]):
+        raise validate.ValidationException("Does not exist or is not a File: '%s'" % ob["location"])
+    if ob["class"] == "Directory" and not fs_access.isdir(ob["location"]):
+        raise validate.ValidationException("Does not exist or is not a Directory: '%s'" % ob["location"])
+
+
+OutputPorts = Dict[Text, Union[None, Text, List[Union[Dict[Text, Any], Text]], Dict[Text, Any]]]
+
+class CommandLineTool(Process):
+    def __init__(self, toolpath_object, loadingContext):
+        # type: (MutableMapping[Text, Any], LoadingContext) -> None
+        """Initialize this CommandLineTool."""
+        super(CommandLineTool, self).__init__(toolpath_object, loadingContext)
+        self.prov_obj = loadingContext.prov_obj
+
+    def make_job_runner(self,
+                        runtimeContext       # type: RuntimeContext
+                       ):  # type: (...) -> Type[JobBase]
+        dockerReq, _ = self.get_requirement("DockerRequirement")
+        if not dockerReq and runtimeContext.use_container:
+            if runtimeContext.find_default_container is not None:
+                default_container = runtimeContext.find_default_container(self)
+                if default_container is not None:
+                    self.requirements.insert(0, {
+                        "class": "DockerRequirement",
+                        "dockerPull": default_container
+                    })
+                    dockerReq = self.requirements[0]
+                    if default_container == windows_default_container_id \
+                            and runtimeContext.use_container and onWindows():
+                        _logger.warning(
+                            DEFAULT_CONTAINER_MSG, windows_default_container_id,
+                            windows_default_container_id)
+
+        if dockerReq is not None and runtimeContext.use_container:
+            if runtimeContext.singularity:
+                return SingularityCommandLineJob
+            return DockerCommandLineJob
+        for t in reversed(self.requirements):
+            if t["class"] == "DockerRequirement":
+                raise UnsupportedRequirement(
+                    "--no-container, but this CommandLineTool has "
+                    "DockerRequirement under 'requirements'.")
+        return CommandLineJob
+
+    def make_path_mapper(self, reffiles, stagedir, runtimeContext, separateDirs):
+        # type: (List[Any], Text, RuntimeContext, bool) -> PathMapper
+        return PathMapper(reffiles, runtimeContext.basedir, stagedir, separateDirs)
+
+    def updatePathmap(self, outdir, pathmap, fn):
+        # type: (Text, PathMapper, Dict[Text, Any]) -> None
+        if "location" in fn and fn["location"] in pathmap:
+            pathmap.update(fn["location"], pathmap.mapper(fn["location"]).resolved,
+                           os.path.join(outdir, fn["basename"]),
+                           ("Writable" if fn.get("writable") else "") + fn["class"], False)
+        for sf in fn.get("secondaryFiles", []):
+            self.updatePathmap(outdir, pathmap, sf)
+        for ls in fn.get("listing", []):
+            self.updatePathmap(os.path.join(outdir, fn["basename"]), pathmap, ls)
+
+    def job(self,
+            job_order,         # type: Mapping[Text, Text]
+            output_callbacks,  # type: Callable[[Any, Any], Any]
+            runtimeContext     # type: RuntimeContext
+           ):
+        # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]
+
+        workReuse, _ = self.get_requirement("WorkReuse")
+        enableReuse = workReuse.get("enableReuse", True) if workReuse else True
+
+        jobname = uniquename(runtimeContext.name or shortname(self.tool.get("id", "job")))
+        if runtimeContext.cachedir and enableReuse:
+            cachecontext = runtimeContext.copy()
+            cachecontext.outdir = "/out"
+            cachecontext.tmpdir = "/tmp"  # nosec
+            cachecontext.stagedir = "/stage"
+            cachebuilder = self._init_job(job_order, cachecontext)
+            cachebuilder.pathmapper = PathMapper(cachebuilder.files,
+                                                 runtimeContext.basedir,
+                                                 cachebuilder.stagedir,
+                                                 separateDirs=False)
+            _check_adjust = partial(check_adjust, cachebuilder)
+            visit_class([cachebuilder.files, cachebuilder.bindings],
+                        ("File", "Directory"), _check_adjust)
+
+            cmdline = flatten(list(map(cachebuilder.generate_arg, cachebuilder.bindings)))
+            docker_req, _ = self.get_requirement("DockerRequirement")
+            if docker_req is not None and runtimeContext.use_container:
+                dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull")
+            elif runtimeContext.default_container is not None and runtimeContext.use_container:
+                dockerimg = runtimeContext.default_container
+            else:
+                dockerimg = None
+
+            if dockerimg is not None:
+                cmdline = ["docker", "run", dockerimg] + cmdline
+                # not really run using docker, just for hashing purposes
+            keydict = {u"cmdline": cmdline}  # type: Dict[Text, Union[Dict[Text, Any], List[Any]]]
+
+            for shortcut in ["stdin", "stdout", "stderr"]:
+                if shortcut in self.tool:
+                    keydict[shortcut] = self.tool[shortcut]
+
+            for location, fobj in cachebuilder.pathmapper.items():
+                if fobj.type == "File":
+                    checksum = next(
+                        (e['checksum'] for e in cachebuilder.files
+                         if 'location' in e and e['location'] == location
+                         and 'checksum' in e
+                         and e['checksum'] != 'sha1$hash'), None)
+                    fobj_stat = os.stat(fobj.resolved)
+                    if checksum is not None:
+                        keydict[fobj.resolved] = [fobj_stat.st_size, checksum]
+                    else:
+                        keydict[fobj.resolved] = [fobj_stat.st_size,
+                                                  int(fobj_stat.st_mtime * 1000)]
+
+            interesting = {"DockerRequirement",
+                           "EnvVarRequirement",
+                           "InitialWorkDirRequirement",
+                           "ShellCommandRequirement",
+                           "NetworkAccess"}
+            for rh in (self.original_requirements, self.original_hints):
+                for r in reversed(rh):
+                    if r["class"] in interesting and r["class"] not in keydict:
+                        keydict[r["class"]] = r
+
+            keydictstr = json_dumps(keydict, separators=(',', ':'),
+                                    sort_keys=True)
+            cachekey = hashlib.md5(  # nosec
+                keydictstr.encode('utf-8')).hexdigest()
+
+            _logger.debug("[job %s] keydictstr is %s -> %s", jobname,
+                          keydictstr, cachekey)
+
+            jobcache = os.path.join(runtimeContext.cachedir, cachekey)
+
+            # Create a lockfile to manage cache status.
+            jobcachepending = "{}.status".format(jobcache)
+            jobcachelock = None
+            jobstatus = None
+
+            # Opens the file for read/write, or creates an empty file.
+            jobcachelock = open(jobcachepending, "a+")
+
+            # get the shared lock to ensure no other process is trying
+            # to write to this cache
+            shared_file_lock(jobcachelock)
+            jobcachelock.seek(0)
+            jobstatus = jobcachelock.read()
+
+            if os.path.isdir(jobcache) and jobstatus == "success":
+                if docker_req and runtimeContext.use_container:
+                    cachebuilder.outdir = runtimeContext.docker_outdir or random_outdir()
+                else:
+                    cachebuilder.outdir = jobcache
+
+                _logger.info("[job %s] Using cached output in %s", jobname, jobcache)
+                yield CallbackJob(self, output_callbacks, cachebuilder, jobcache)
+                # we're done with the cache so release lock
+                jobcachelock.close()
+                return
+            else:
+                _logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache)
+
+                # turn shared lock into an exclusive lock since we'll
+                # be writing the cache directory
+                upgrade_lock(jobcachelock)
+
+                shutil.rmtree(jobcache, True)
+                os.makedirs(jobcache)
+                runtimeContext = runtimeContext.copy()
+                runtimeContext.outdir = jobcache
+
+                def update_status_output_callback(
+                        output_callbacks,  # type: Callable[[List[Dict[Text, Any]], Text], None]
+                        jobcachelock,      # type: IO[Any]
+                        outputs,           # type: List[Dict[Text, Any]]
+                        processStatus      # type: Text
+                        ):  # type: (...) -> None
+                    # save status to the lockfile then release the lock
+                    jobcachelock.seek(0)
+                    jobcachelock.truncate()
+                    jobcachelock.write(processStatus)
+                    jobcachelock.close()
+                    output_callbacks(outputs, processStatus)
+
+                output_callbacks = partial(
+                    update_status_output_callback, output_callbacks, jobcachelock)
+
+        builder = self._init_job(job_order, runtimeContext)
+
+        reffiles = copy.deepcopy(builder.files)
+
+        j = self.make_job_runner(runtimeContext)(
+            builder, builder.job, self.make_path_mapper, self.requirements,
+            self.hints, jobname)
+        j.prov_obj = self.prov_obj
+
+        j.successCodes = self.tool.get("successCodes", [])
+        j.temporaryFailCodes = self.tool.get("temporaryFailCodes", [])
+        j.permanentFailCodes = self.tool.get("permanentFailCodes", [])
+
+        debug = _logger.isEnabledFor(logging.DEBUG)
+
+        if debug:
+            _logger.debug(u"[job %s] initializing from %s%s",
+                          j.name,
+                          self.tool.get("id", ""),
+                          u" as part of %s" % runtimeContext.part_of
+                          if runtimeContext.part_of else "")
+            _logger.debug(u"[job %s] %s", j.name, json_dumps(builder.job,
+                                                             indent=4))
+
+        builder.pathmapper = self.make_path_mapper(
+            reffiles, builder.stagedir, runtimeContext, True)
+        builder.requirements = j.requirements
+
+        _check_adjust = partial(check_adjust, builder)
+
+        visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust)
+
+        initialWorkdir, _ = self.get_requirement("InitialWorkDirRequirement")
+        if initialWorkdir is not None:
+            ls = []  # type: List[Dict[Text, Any]]
+            if isinstance(initialWorkdir["listing"], string_types):
+                ls = builder.do_eval(initialWorkdir["listing"])
+            else:
+                for t in initialWorkdir["listing"]:
+                    if isinstance(t, Mapping) and "entry" in t:
+                        entry_exp = builder.do_eval(t["entry"], strip_whitespace=False)
+                        for entry in aslist(entry_exp):
+                            et = {u"entry": entry}
+                            if "entryname" in t:
+                                et["entryname"] = builder.do_eval(t["entryname"])
+                            else:
+                                et["entryname"] = None
+                            et["writable"] = t.get("writable", False)
+                            if et[u"entry"] is not None:
+                                ls.append(et)
+                    else:
+                        initwd_item = builder.do_eval(t)
+                        if not initwd_item:
+                            continue
+                        if isinstance(initwd_item, MutableSequence):
+                            ls.extend(initwd_item)
+                        else:
+                            ls.append(initwd_item)
+            for i, t in enumerate(ls):
+                if "entry" in t:
+                    if isinstance(t["entry"], string_types):
+                        ls[i] = {
+                            "class": "File",
+                            "basename": t["entryname"],
+                            "contents": t["entry"],
+                            "writable": t.get("writable")
+                        }
+                    else:
+                        if t.get("entryname") or t.get("writable"):
+                            t = copy.deepcopy(t)
+                            if t.get("entryname"):
+                                t["entry"]["basename"] = t["entryname"]
+                            t["entry"]["writable"] = t.get("writable")
+                        ls[i] = t["entry"]
+            j.generatefiles["listing"] = ls
+            for l in ls:
+                self.updatePathmap(builder.outdir, builder.pathmapper, l)
+            visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust)
+
+        if debug:
+            _logger.debug(u"[job %s] path mappings is %s", j.name,
+                          json_dumps({p: builder.pathmapper.mapper(p)
+                                      for p in builder.pathmapper.files()},
+                                     indent=4))
+
+        if self.tool.get("stdin"):
+            with SourceLine(self.tool, "stdin", validate.ValidationException, debug):
+                j.stdin = builder.do_eval(self.tool["stdin"])
+                if j.stdin:
+                    reffiles.append({"class": "File", "path": j.stdin})
+
+        if self.tool.get("stderr"):
+            with SourceLine(self.tool, "stderr", validate.ValidationException, debug):
+                j.stderr = builder.do_eval(self.tool["stderr"])
+                if j.stderr:
+                    if os.path.isabs(j.stderr) or ".." in j.stderr:
+                        raise validate.ValidationException(
+                            "stderr must be a relative path, got '%s'" % j.stderr)
+
+        if self.tool.get("stdout"):
+            with SourceLine(self.tool, "stdout", validate.ValidationException, debug):
+                j.stdout = builder.do_eval(self.tool["stdout"])
+                if j.stdout:
+                    if os.path.isabs(j.stdout) or ".." in j.stdout or not j.stdout:
+                        raise validate.ValidationException(
+                            "stdout must be a relative path, got '%s'" % j.stdout)
+
+        if debug:
+            _logger.debug(u"[job %s] command line bindings is %s", j.name,
+                          json_dumps(builder.bindings, indent=4))
+        dockerReq, _ = self.get_requirement("DockerRequirement")
+        if dockerReq is not None and runtimeContext.use_container:
+            out_dir, out_prefix = os.path.split(
+                runtimeContext.tmp_outdir_prefix)
+            j.outdir = runtimeContext.outdir or \
+                tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)
+            tmpdir_dir, tmpdir_prefix = os.path.split(
+                runtimeContext.tmpdir_prefix)
+            j.tmpdir = runtimeContext.tmpdir or \
+                tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir)
+            j.stagedir = tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir)
+        else:
+            j.outdir = builder.outdir
+            j.tmpdir = builder.tmpdir
+            j.stagedir = builder.stagedir
+
+        inplaceUpdateReq, _ = self.get_requirement("InplaceUpdateRequirement")
+        if inplaceUpdateReq is not None:
+            j.inplace_update = inplaceUpdateReq["inplaceUpdate"]
+        normalizeFilesDirs(j.generatefiles)
+
+        readers = {}  # type: Dict[Text, Any]
+        muts = set()  # type: Set[Text]
+
+        if builder.mutation_manager is not None:
+            def register_mut(f):  # type: (Dict[Text, Any]) -> None
+                mm = cast(MutationManager, builder.mutation_manager)
+                muts.add(f["location"])
+                mm.register_mutation(j.name, f)
+
+            def register_reader(f):  # type: (Dict[Text, Any]) -> None
+                mm = cast(MutationManager, builder.mutation_manager)
+                if f["location"] not in muts:
+                    mm.register_reader(j.name, f)
+                    readers[f["location"]] = copy.deepcopy(f)
+
+            for li in j.generatefiles["listing"]:
+                li = cast(Dict[Text, Any], li)
+                if li.get("writable") and j.inplace_update:
+                    adjustFileObjs(li, register_mut)
+                    adjustDirObjs(li, register_mut)
+                else:
+                    adjustFileObjs(li, register_reader)
+                    adjustDirObjs(li, register_reader)
+
+            adjustFileObjs(builder.files, register_reader)
+            adjustFileObjs(builder.bindings, register_reader)
+            adjustDirObjs(builder.files, register_reader)
+            adjustDirObjs(builder.bindings, register_reader)
+
+        timelimit, _ = self.get_requirement("ToolTimeLimit")
+        if timelimit is not None:
+            with SourceLine(timelimit, "timelimit", validate.ValidationException, debug):
+                j.timelimit = builder.do_eval(timelimit["timelimit"])
+                if not isinstance(j.timelimit, int) or j.timelimit < 0:
+                    raise Exception("timelimit must be an integer >= 0, got: %s" % j.timelimit)
+
+        networkaccess, _ = self.get_requirement("NetworkAccess")
+        if networkaccess is not None:
+            with SourceLine(networkaccess, "networkAccess", validate.ValidationException, debug):
+                j.networkaccess = builder.do_eval(networkaccess["networkAccess"])
+                if not isinstance(j.networkaccess, bool):
+                    raise Exception("networkAccess must be a boolean, got: %s" % j.networkaccess)
+
+        j.environment = {}
+        evr, _ = self.get_requirement("EnvVarRequirement")
+        if evr is not None:
+            for t in evr["envDef"]:
+                j.environment[t["envName"]] = builder.do_eval(t["envValue"])
+
+        shellcmd, _ = self.get_requirement("ShellCommandRequirement")
+        if shellcmd is not None:
+            cmd = []  # type: List[Text]
+            for b in builder.bindings:
+                arg = builder.generate_arg(b)
+                if b.get("shellQuote", True):
+                    arg = [shellescape.quote(a) for a in aslist(arg)]
+                cmd.extend(aslist(arg))
+            j.command_line = ["/bin/sh", "-c", " ".join(cmd)]
+        else:
+            j.command_line = flatten(list(map(builder.generate_arg, builder.bindings)))
+
+        j.pathmapper = builder.pathmapper
+        j.collect_outputs = partial(
+            self.collect_output_ports, self.tool["outputs"], builder,
+            compute_checksum=getdefault(runtimeContext.compute_checksum, True),
+            jobname=jobname,
+            readers=readers)
+        j.output_callback = output_callbacks
+
+        yield j
+
+    def collect_output_ports(self,
+                             ports,                  # type: Set[Dict[Text, Any]]
+                             builder,                # type: Builder
+                             outdir,                 # type: Text
+                             rcode,                  # type: int
+                             compute_checksum=True,  # type: bool
+                             jobname="",             # type: Text
+                             readers=None            # type: Optional[Dict[Text, Any]]
+                            ):  # type: (...) -> OutputPorts
+        ret = {}  # type: OutputPorts
+        debug = _logger.isEnabledFor(logging.DEBUG)
+        cwl_version = self.metadata.get(
+            "http://commonwl.org/cwltool#original_cwlVersion", None)
+        if cwl_version != "v1.0":
+            builder.resources["exitCode"] = rcode
+        try:
+            fs_access = builder.make_fs_access(outdir)
+            custom_output = fs_access.join(outdir, "cwl.output.json")
+            if fs_access.exists(custom_output):
+                with fs_access.open(custom_output, "r") as f:
+                    ret = json.load(f)
+                if debug:
+                    _logger.debug(u"Raw output from %s: %s", custom_output,
+                                  json_dumps(ret, indent=4))
+            else:
+                for i, port in enumerate(ports):
+                    class ParameterOutputWorkflowException(WorkflowException):
+                        def __init__(self, msg, **kwargs):  # type: (Text, **Any) -> None
+                            super(ParameterOutputWorkflowException, self).__init__(
+                                u"Error collecting output for parameter '%s':\n%s"
+                                % (shortname(port["id"]), msg), kwargs)
+                    with SourceLine(ports, i, ParameterOutputWorkflowException, debug):
+                        fragment = shortname(port["id"])
+                        ret[fragment] = self.collect_output(port, builder, outdir, fs_access,
+                                                            compute_checksum=compute_checksum)
+            if ret:
+                revmap = partial(revmap_file, builder, outdir)
+                adjustDirObjs(ret, trim_listing)
+                visit_class(ret, ("File", "Directory"), cast(Callable[[Any], Any], revmap))
+                visit_class(ret, ("File", "Directory"), remove_path)
+                normalizeFilesDirs(ret)
+                visit_class(ret, ("File", "Directory"), partial(check_valid_locations, fs_access))
+
+                if compute_checksum:
+                    adjustFileObjs(ret, partial(compute_checksums, fs_access))
+            expected_schema = cast(Schema, self.names.get_name(
+                "outputs_record_schema", ""))
+            validate.validate_ex(expected_schema, ret,
+                strict=False, logger=_logger_validation_warnings)
+            if ret is not None and builder.mutation_manager is not None:
+                adjustFileObjs(ret, builder.mutation_manager.set_generation)
+            return ret if ret is not None else {}
+        except validate.ValidationException as e:
+            raise_from(WorkflowException(
+                "Error validating output record. " + Text(e) + "\n in "
+                + json_dumps(ret, indent=4)), e)
+        finally:
+            if builder.mutation_manager and readers:
+                for r in readers.values():
+                    builder.mutation_manager.release_reader(jobname, r)
+
+    def collect_output(self,
+                       schema,                # type: Dict[Text, Any]
+                       builder,               # type: Builder
+                       outdir,                # type: Text
+                       fs_access,             # type: StdFsAccess
+                       compute_checksum=True  # type: bool
+                      ):
+        # type: (...) -> Optional[Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]]
+        r = []  # type: List[Any]
+        empty_and_optional = False
+        debug = _logger.isEnabledFor(logging.DEBUG)
+        if "outputBinding" in schema:
+            binding = schema["outputBinding"]
+            globpatterns = []  # type: List[Text]
+
+            revmap = partial(revmap_file, builder, outdir)
+
+            if "glob" in binding:
+                with SourceLine(binding, "glob", WorkflowException, debug):
+                    for gb in aslist(binding["glob"]):
+                        gb = builder.do_eval(gb)
+                        if gb:
+                            globpatterns.extend(aslist(gb))
+
+                    for gb in globpatterns:
+                        if gb.startswith(builder.outdir):
+                            gb = gb[len(builder.outdir) + 1:]
+                        elif gb == ".":
+                            gb = outdir
+                        elif gb.startswith("/"):
+                            raise WorkflowException(
+                                "glob patterns must not start with '/'")
+                        try:
+                            prefix = fs_access.glob(outdir)
+                            r.extend([{"location": g,
+                                       "path": fs_access.join(builder.outdir,
+                                           g[len(prefix[0])+1:]),
+                                       "basename": os.path.basename(g),
+                                       "nameroot": os.path.splitext(
+                                           os.path.basename(g))[0],
+                                       "nameext": os.path.splitext(
+                                           os.path.basename(g))[1],
+                                       "class": "File" if fs_access.isfile(g)
+                                       else "Directory"}
+                                      for g in sorted(fs_access.glob(
+                                          fs_access.join(outdir, gb)),
+                                          key=cmp_to_key(cast(
+                                              Callable[[Text, Text],
+                                                  int], locale.strcoll)))])
+                        except (OSError, IOError) as e:
+                            _logger.warning(Text(e))
+                        except Exception:
+                            _logger.error("Unexpected error from fs_access", exc_info=True)
+                            raise
+
+                for files in r:
+                    rfile = files.copy()
+                    revmap(rfile)
+                    if files["class"] == "Directory":
+                        ll = schema.get("loadListing") or builder.loadListing
+                        if ll and ll != "no_listing":
+                            get_listing(fs_access, files, (ll == "deep_listing"))
+                    else:
+                        if binding.get("loadContents"):
+                            with fs_access.open(rfile["location"], "rb") as f:
+                                files["contents"] = content_limit_respected_read_bytes(f).decode("utf-8")
+                        if compute_checksum:
+                            with fs_access.open(rfile["location"], "rb") as f:
+                                checksum = hashlib.sha1()  # nosec
+                                contents = f.read(1024 * 1024)
+                                while contents != b"":
+                                    checksum.update(contents)
+                                    contents = f.read(1024 * 1024)
+                                files["checksum"] = "sha1$%s" % checksum.hexdigest()
+                        files["size"] = fs_access.size(rfile["location"])
+
+            optional = False
+            single = False
+            if isinstance(schema["type"], MutableSequence):
+                if "null" in schema["type"]:
+                    optional = True
+                if "File" in schema["type"] or "Directory" in schema["type"]:
+                    single = True
+            elif schema["type"] == "File" or schema["type"] == "Directory":
+                single = True
+
+            if "outputEval" in binding:
+                with SourceLine(binding, "outputEval", WorkflowException, debug):
+                    r = builder.do_eval(binding["outputEval"], context=r)
+
+            if single:
+                if not r and not optional:
+                    with SourceLine(binding, "glob", WorkflowException, debug):
+                        raise WorkflowException("Did not find output file with glob pattern: '{}'".format(globpatterns))
+                elif not r and optional:
+                    pass
+                elif isinstance(r, MutableSequence):
+                    if len(r) > 1:
+                        raise WorkflowException("Multiple matches for output item that is a single file.")
+                    else:
+                        r = r[0]
+
+            if "secondaryFiles" in schema:
+                with SourceLine(schema, "secondaryFiles", WorkflowException, debug):
+                    for primary in aslist(r):
+                        if isinstance(primary, MutableMapping):
+                            primary.setdefault("secondaryFiles", [])
+                            pathprefix = primary["path"][0:primary["path"].rindex("/")+1]
+                            for sf in aslist(schema["secondaryFiles"]):
+                                if 'required' in sf:
+                                    sf_required = builder.do_eval(sf['required'], context=primary)
+                                else:
+                                    sf_required = False
+
+                                if "$(" in sf["pattern"] or "${" in sf["pattern"]:
+                                    sfpath = builder.do_eval(sf["pattern"], context=primary)
+                                else:
+                                    sfpath = substitute(primary["basename"], sf["pattern"])
+
+                                for sfitem in aslist(sfpath):
+                                    if not sfitem:
+                                        continue
+                                    if isinstance(sfitem, string_types):
+                                        sfitem = {"path": pathprefix+sfitem}
+                                    if not fs_access.exists(sfitem['path']) and sf_required:
+                                        raise WorkflowException(
+                                            "Missing required secondary file '%s'" % (
+                                                sfitem["path"]))
+                                    if "path" in sfitem and "location" not in sfitem:
+                                        revmap(sfitem)
+                                    if fs_access.isfile(sfitem["location"]):
+                                        sfitem["class"] = "File"
+                                        primary["secondaryFiles"].append(sfitem)
+                                    elif fs_access.isdir(sfitem["location"]):
+                                        sfitem["class"] = "Directory"
+                                        primary["secondaryFiles"].append(sfitem)
+
+            if "format" in schema:
+                for primary in aslist(r):
+                    primary["format"] = builder.do_eval(schema["format"], context=primary)
+
+            # Ensure files point to local references outside of the run environment
+            adjustFileObjs(r, revmap)
+
+            if not r and optional:
+                # Don't convert zero or empty string to None
+                if r in [0, '']:
+                    return r
+                # For [] or None, return None
+                else:
+                    return None
+
+        if (not empty_and_optional and isinstance(schema["type"], MutableMapping)
+                and schema["type"]["type"] == "record"):
+            out = {}
+            for f in schema["type"]["fields"]:
+                out[shortname(f["name"])] = self.collect_output(  # type: ignore
+                    f, builder, outdir, fs_access,
+                    compute_checksum=compute_checksum)
+            return out
+        return r