view env/lib/python3.7/site-packages/cwltool/command_line_tool.py @ 4:79f47841a781 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:47:39 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

"""Implementation of CommandLineTool."""
from __future__ import absolute_import

import copy
import hashlib
import json
import locale
import logging
import os
import re
import shutil
import tempfile
import threading
from functools import cmp_to_key, partial
from typing import (Any, Callable, Dict, Generator, IO, List, Mapping,
                    MutableMapping, MutableSequence, Optional, Set, Union, cast)

from typing_extensions import Text, Type, TYPE_CHECKING  # pylint: disable=unused-import
# move to a regular typing import when Python 3.3-3.6 is no longer supported

import shellescape
from schema_salad import validate
from schema_salad.avro.schema import Schema
from schema_salad.ref_resolver import file_uri, uri_file_path
from schema_salad.sourceline import SourceLine
from six import string_types
from future.utils import raise_from

from six.moves import map, urllib
from typing_extensions import (TYPE_CHECKING,  # pylint: disable=unused-import
                               Text, Type)
# move to a regular typing import when Python 3.3-3.6 is no longer supported

from .builder import (Builder, content_limit_respected_read_bytes, # pylint: disable=unused-import
                      substitute)
from .context import LoadingContext  # pylint: disable=unused-import
from .context import RuntimeContext, getdefault
from .docker import DockerCommandLineJob
from .errors import WorkflowException
from .flatten import flatten
from .job import CommandLineJob, JobBase  # pylint: disable=unused-import
from .loghandler import _logger
from .mutation import MutationManager  # pylint: disable=unused-import
from .pathmapper import (PathMapper, adjustDirObjs, adjustFileObjs,
                         get_listing, trim_listing, visit_class)
from .process import (Process, UnsupportedRequirement,
                      _logger_validation_warnings, compute_checksums,
                      normalizeFilesDirs, shortname, uniquename)
from .singularity import SingularityCommandLineJob
from .software_requirements import (  # pylint: disable=unused-import
    DependenciesConfiguration)
from .stdfsaccess import StdFsAccess  # pylint: disable=unused-import
from .utils import (aslist, convert_pathsep_to_unix,
                    docker_windows_path_adjust, json_dumps, onWindows,
                    random_outdir, windows_default_container_id,
                    shared_file_lock, upgrade_lock)
if TYPE_CHECKING:
    from .provenance import ProvenanceProfile  # pylint: disable=unused-import

ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")  # Accept anything
ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE
DEFAULT_CONTAINER_MSG = """
We are on Microsoft Windows and not all components of this CWL description have a
container specified. This means that these steps will be executed in the default container,
which is %s.

Note, this could affect portability if this CWL description relies on non-POSIX features
or commands in this container. For best results add the following to your CWL
description's hints section:

hints:
  DockerRequirement:
    dockerPull: %s
"""


class ExpressionTool(Process):
    class ExpressionJob(object):
        """Job for ExpressionTools."""

        def __init__(self,
                     builder,          # type: Builder
                     script,           # type: Dict[Text, Text]
                     output_callback,  # type: Callable[[Any, Any], Any]
                     requirements,     # type: List[Dict[Text, Text]]
                     hints,            # type: List[Dict[Text, Text]]
                     outdir=None,      # type: Optional[Text]
                     tmpdir=None,      # type: Optional[Text]
                    ):  # type: (...) -> None
            """Initializet this ExpressionJob."""
            self.builder = builder
            self.requirements = requirements
            self.hints = hints
            self.collect_outputs = None  # type: Optional[Callable[[Any], Any]]
            self.output_callback = output_callback
            self.outdir = outdir
            self.tmpdir = tmpdir
            self.script = script
            self.prov_obj = None  # type: Optional[ProvenanceProfile]

        def run(self,
                runtimeContext,   # type: RuntimeContext
                tmpdir_lock=None  # type: Optional[threading.Lock]
               ):  # type: (...) -> None
            try:
                normalizeFilesDirs(self.builder.job)
                ev = self.builder.do_eval(self.script)
                normalizeFilesDirs(ev)
                self.output_callback(ev, "success")
            except Exception as err:
                _logger.warning(u"Failed to evaluate expression:\n%s",
                                Text(err), exc_info=runtimeContext.debug)
                self.output_callback({}, "permanentFail")

    def job(self,
            job_order,         # type: Mapping[Text, Text]
            output_callbacks,  # type: Callable[[Any, Any], Any]
            runtimeContext     # type: RuntimeContext
           ):
        # type: (...) -> Generator[ExpressionTool.ExpressionJob, None, None]
        builder = self._init_job(job_order, runtimeContext)

        job = ExpressionTool.ExpressionJob(
            builder, self.tool["expression"], output_callbacks,
            self.requirements, self.hints)
        job.prov_obj = runtimeContext.prov_obj
        yield job


def remove_path(f):  # type: (Dict[Text, Any]) -> None
    if "path" in f:
        del f["path"]


def revmap_file(builder, outdir, f):
    # type: (Builder, Text, Dict[Text, Any]) -> Union[Dict[Text, Any], None]
    """
    Remap a file from internal path to external path.

    For Docker, this maps from the path inside tho container to the path
    outside the container. Recognizes files in the pathmapper or remaps
    internal output directories to the external directory.
    """
    split = urllib.parse.urlsplit(outdir)
    if not split.scheme:
        outdir = file_uri(str(outdir))

    # builder.outdir is the inner (container/compute node) output directory
    # outdir is the outer (host/storage system) output directory

    if "location" in f and "path" not in f:
        if f["location"].startswith("file://"):
            f["path"] = convert_pathsep_to_unix(uri_file_path(f["location"]))
        else:
            return f

    if "path" in f:
        path = f["path"]
        uripath = file_uri(path)
        del f["path"]

        if "basename" not in f:
            f["basename"] = os.path.basename(path)

        if not builder.pathmapper:
            raise ValueError("Do not call revmap_file using a builder that doesn't have a pathmapper.")
        revmap_f = builder.pathmapper.reversemap(path)

        if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"):
            f["location"] = revmap_f[1]
        elif uripath == outdir or uripath.startswith(outdir+os.sep) or uripath.startswith(outdir+'/'):
            f["location"] = file_uri(path)
        elif path == builder.outdir or path.startswith(builder.outdir+os.sep) or path.startswith(builder.outdir+'/'):
            f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir) + 1:])
        elif not os.path.isabs(path):
            f["location"] = builder.fs_access.join(outdir, path)
        else:
            raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input "
                                    u"file pass through." % (path, builder.outdir))
        return f

    raise WorkflowException(u"Output File object is missing both 'location' "
                            "and 'path' fields: %s" % f)


class CallbackJob(object):
    def __init__(self, job, output_callback, cachebuilder, jobcache):
        # type: (CommandLineTool, Callable[[Any, Any], Any], Builder, Text) -> None
        """Initialize this CallbackJob."""
        self.job = job
        self.output_callback = output_callback
        self.cachebuilder = cachebuilder
        self.outdir = jobcache
        self.prov_obj = None  # type: Optional[ProvenanceProfile]

    def run(self,
            runtimeContext,   # type: RuntimeContext
            tmpdir_lock=None  # type: Optional[threading.Lock]
            ):  # type: (...) -> None
        self.output_callback(self.job.collect_output_ports(
            self.job.tool["outputs"],
            self.cachebuilder,
            self.outdir,
            getdefault(runtimeContext.compute_checksum, True)), "success")


def check_adjust(builder, file_o):
    # type: (Builder, Dict[Text, Any]) -> Dict[Text, Any]
    """
    Map files to assigned path inside a container.

    We need to also explicitly walk over input, as implicit reassignment
    doesn't reach everything in builder.bindings
    """
    if not builder.pathmapper:
            raise ValueError("Do not call check_adjust using a builder that doesn't have a pathmapper.")
    file_o["path"] = docker_windows_path_adjust(
        builder.pathmapper.mapper(file_o["location"])[1])
    dn, bn = os.path.split(file_o["path"])
    if file_o.get("dirname") != dn:
        file_o["dirname"] = Text(dn)
    if file_o.get("basename") != bn:
        file_o["basename"] = Text(bn)
    if file_o["class"] == "File":
        nr, ne = os.path.splitext(file_o["basename"])
        if file_o.get("nameroot") != nr:
            file_o["nameroot"] = Text(nr)
        if file_o.get("nameext") != ne:
            file_o["nameext"] = Text(ne)
    if not ACCEPTLIST_RE.match(file_o["basename"]):
        raise WorkflowException(
            "Invalid filename: '{}' contains illegal characters".format(
                file_o["basename"]))
    return file_o

def check_valid_locations(fs_access, ob):  # type: (StdFsAccess, Dict[Text, Any]) -> None
    if ob["location"].startswith("_:"):
        pass
    if ob["class"] == "File" and not fs_access.isfile(ob["location"]):
        raise validate.ValidationException("Does not exist or is not a File: '%s'" % ob["location"])
    if ob["class"] == "Directory" and not fs_access.isdir(ob["location"]):
        raise validate.ValidationException("Does not exist or is not a Directory: '%s'" % ob["location"])


OutputPorts = Dict[Text, Union[None, Text, List[Union[Dict[Text, Any], Text]], Dict[Text, Any]]]

class CommandLineTool(Process):
    def __init__(self, toolpath_object, loadingContext):
        # type: (MutableMapping[Text, Any], LoadingContext) -> None
        """Initialize this CommandLineTool."""
        super(CommandLineTool, self).__init__(toolpath_object, loadingContext)
        self.prov_obj = loadingContext.prov_obj

    def make_job_runner(self,
                        runtimeContext       # type: RuntimeContext
                       ):  # type: (...) -> Type[JobBase]
        dockerReq, _ = self.get_requirement("DockerRequirement")
        if not dockerReq and runtimeContext.use_container:
            if runtimeContext.find_default_container is not None:
                default_container = runtimeContext.find_default_container(self)
                if default_container is not None:
                    self.requirements.insert(0, {
                        "class": "DockerRequirement",
                        "dockerPull": default_container
                    })
                    dockerReq = self.requirements[0]
                    if default_container == windows_default_container_id \
                            and runtimeContext.use_container and onWindows():
                        _logger.warning(
                            DEFAULT_CONTAINER_MSG, windows_default_container_id,
                            windows_default_container_id)

        if dockerReq is not None and runtimeContext.use_container:
            if runtimeContext.singularity:
                return SingularityCommandLineJob
            return DockerCommandLineJob
        for t in reversed(self.requirements):
            if t["class"] == "DockerRequirement":
                raise UnsupportedRequirement(
                    "--no-container, but this CommandLineTool has "
                    "DockerRequirement under 'requirements'.")
        return CommandLineJob

    def make_path_mapper(self, reffiles, stagedir, runtimeContext, separateDirs):
        # type: (List[Any], Text, RuntimeContext, bool) -> PathMapper
        return PathMapper(reffiles, runtimeContext.basedir, stagedir, separateDirs)

    def updatePathmap(self, outdir, pathmap, fn):
        # type: (Text, PathMapper, Dict[Text, Any]) -> None
        if "location" in fn and fn["location"] in pathmap:
            pathmap.update(fn["location"], pathmap.mapper(fn["location"]).resolved,
                           os.path.join(outdir, fn["basename"]),
                           ("Writable" if fn.get("writable") else "") + fn["class"], False)
        for sf in fn.get("secondaryFiles", []):
            self.updatePathmap(outdir, pathmap, sf)
        for ls in fn.get("listing", []):
            self.updatePathmap(os.path.join(outdir, fn["basename"]), pathmap, ls)

    def job(self,
            job_order,         # type: Mapping[Text, Text]
            output_callbacks,  # type: Callable[[Any, Any], Any]
            runtimeContext     # type: RuntimeContext
           ):
        # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]

        workReuse, _ = self.get_requirement("WorkReuse")
        enableReuse = workReuse.get("enableReuse", True) if workReuse else True

        jobname = uniquename(runtimeContext.name or shortname(self.tool.get("id", "job")))
        if runtimeContext.cachedir and enableReuse:
            cachecontext = runtimeContext.copy()
            cachecontext.outdir = "/out"
            cachecontext.tmpdir = "/tmp"  # nosec
            cachecontext.stagedir = "/stage"
            cachebuilder = self._init_job(job_order, cachecontext)
            cachebuilder.pathmapper = PathMapper(cachebuilder.files,
                                                 runtimeContext.basedir,
                                                 cachebuilder.stagedir,
                                                 separateDirs=False)
            _check_adjust = partial(check_adjust, cachebuilder)
            visit_class([cachebuilder.files, cachebuilder.bindings],
                        ("File", "Directory"), _check_adjust)

            cmdline = flatten(list(map(cachebuilder.generate_arg, cachebuilder.bindings)))
            docker_req, _ = self.get_requirement("DockerRequirement")
            if docker_req is not None and runtimeContext.use_container:
                dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull")
            elif runtimeContext.default_container is not None and runtimeContext.use_container:
                dockerimg = runtimeContext.default_container
            else:
                dockerimg = None

            if dockerimg is not None:
                cmdline = ["docker", "run", dockerimg] + cmdline
                # not really run using docker, just for hashing purposes
            keydict = {u"cmdline": cmdline}  # type: Dict[Text, Union[Dict[Text, Any], List[Any]]]

            for shortcut in ["stdin", "stdout", "stderr"]:
                if shortcut in self.tool:
                    keydict[shortcut] = self.tool[shortcut]

            for location, fobj in cachebuilder.pathmapper.items():
                if fobj.type == "File":
                    checksum = next(
                        (e['checksum'] for e in cachebuilder.files
                         if 'location' in e and e['location'] == location
                         and 'checksum' in e
                         and e['checksum'] != 'sha1$hash'), None)
                    fobj_stat = os.stat(fobj.resolved)
                    if checksum is not None:
                        keydict[fobj.resolved] = [fobj_stat.st_size, checksum]
                    else:
                        keydict[fobj.resolved] = [fobj_stat.st_size,
                                                  int(fobj_stat.st_mtime * 1000)]

            interesting = {"DockerRequirement",
                           "EnvVarRequirement",
                           "InitialWorkDirRequirement",
                           "ShellCommandRequirement",
                           "NetworkAccess"}
            for rh in (self.original_requirements, self.original_hints):
                for r in reversed(rh):
                    if r["class"] in interesting and r["class"] not in keydict:
                        keydict[r["class"]] = r

            keydictstr = json_dumps(keydict, separators=(',', ':'),
                                    sort_keys=True)
            cachekey = hashlib.md5(  # nosec
                keydictstr.encode('utf-8')).hexdigest()

            _logger.debug("[job %s] keydictstr is %s -> %s", jobname,
                          keydictstr, cachekey)

            jobcache = os.path.join(runtimeContext.cachedir, cachekey)

            # Create a lockfile to manage cache status.
            jobcachepending = "{}.status".format(jobcache)
            jobcachelock = None
            jobstatus = None

            # Opens the file for read/write, or creates an empty file.
            jobcachelock = open(jobcachepending, "a+")

            # get the shared lock to ensure no other process is trying
            # to write to this cache
            shared_file_lock(jobcachelock)
            jobcachelock.seek(0)
            jobstatus = jobcachelock.read()

            if os.path.isdir(jobcache) and jobstatus == "success":
                if docker_req and runtimeContext.use_container:
                    cachebuilder.outdir = runtimeContext.docker_outdir or random_outdir()
                else:
                    cachebuilder.outdir = jobcache

                _logger.info("[job %s] Using cached output in %s", jobname, jobcache)
                yield CallbackJob(self, output_callbacks, cachebuilder, jobcache)
                # we're done with the cache so release lock
                jobcachelock.close()
                return
            else:
                _logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache)

                # turn shared lock into an exclusive lock since we'll
                # be writing the cache directory
                upgrade_lock(jobcachelock)

                shutil.rmtree(jobcache, True)
                os.makedirs(jobcache)
                runtimeContext = runtimeContext.copy()
                runtimeContext.outdir = jobcache

                def update_status_output_callback(
                        output_callbacks,  # type: Callable[[List[Dict[Text, Any]], Text], None]
                        jobcachelock,      # type: IO[Any]
                        outputs,           # type: List[Dict[Text, Any]]
                        processStatus      # type: Text
                        ):  # type: (...) -> None
                    # save status to the lockfile then release the lock
                    jobcachelock.seek(0)
                    jobcachelock.truncate()
                    jobcachelock.write(processStatus)
                    jobcachelock.close()
                    output_callbacks(outputs, processStatus)

                output_callbacks = partial(
                    update_status_output_callback, output_callbacks, jobcachelock)

        builder = self._init_job(job_order, runtimeContext)

        reffiles = copy.deepcopy(builder.files)

        j = self.make_job_runner(runtimeContext)(
            builder, builder.job, self.make_path_mapper, self.requirements,
            self.hints, jobname)
        j.prov_obj = self.prov_obj

        j.successCodes = self.tool.get("successCodes", [])
        j.temporaryFailCodes = self.tool.get("temporaryFailCodes", [])
        j.permanentFailCodes = self.tool.get("permanentFailCodes", [])

        debug = _logger.isEnabledFor(logging.DEBUG)

        if debug:
            _logger.debug(u"[job %s] initializing from %s%s",
                          j.name,
                          self.tool.get("id", ""),
                          u" as part of %s" % runtimeContext.part_of
                          if runtimeContext.part_of else "")
            _logger.debug(u"[job %s] %s", j.name, json_dumps(builder.job,
                                                             indent=4))

        builder.pathmapper = self.make_path_mapper(
            reffiles, builder.stagedir, runtimeContext, True)
        builder.requirements = j.requirements

        _check_adjust = partial(check_adjust, builder)

        visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust)

        initialWorkdir, _ = self.get_requirement("InitialWorkDirRequirement")
        if initialWorkdir is not None:
            ls = []  # type: List[Dict[Text, Any]]
            if isinstance(initialWorkdir["listing"], string_types):
                ls = builder.do_eval(initialWorkdir["listing"])
            else:
                for t in initialWorkdir["listing"]:
                    if isinstance(t, Mapping) and "entry" in t:
                        entry_exp = builder.do_eval(t["entry"], strip_whitespace=False)
                        for entry in aslist(entry_exp):
                            et = {u"entry": entry}
                            if "entryname" in t:
                                et["entryname"] = builder.do_eval(t["entryname"])
                            else:
                                et["entryname"] = None
                            et["writable"] = t.get("writable", False)
                            if et[u"entry"] is not None:
                                ls.append(et)
                    else:
                        initwd_item = builder.do_eval(t)
                        if not initwd_item:
                            continue
                        if isinstance(initwd_item, MutableSequence):
                            ls.extend(initwd_item)
                        else:
                            ls.append(initwd_item)
            for i, t in enumerate(ls):
                if "entry" in t:
                    if isinstance(t["entry"], string_types):
                        ls[i] = {
                            "class": "File",
                            "basename": t["entryname"],
                            "contents": t["entry"],
                            "writable": t.get("writable")
                        }
                    else:
                        if t.get("entryname") or t.get("writable"):
                            t = copy.deepcopy(t)
                            if t.get("entryname"):
                                t["entry"]["basename"] = t["entryname"]
                            t["entry"]["writable"] = t.get("writable")
                        ls[i] = t["entry"]
            j.generatefiles["listing"] = ls
            for l in ls:
                self.updatePathmap(builder.outdir, builder.pathmapper, l)
            visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust)

        if debug:
            _logger.debug(u"[job %s] path mappings is %s", j.name,
                          json_dumps({p: builder.pathmapper.mapper(p)
                                      for p in builder.pathmapper.files()},
                                     indent=4))

        if self.tool.get("stdin"):
            with SourceLine(self.tool, "stdin", validate.ValidationException, debug):
                j.stdin = builder.do_eval(self.tool["stdin"])
                if j.stdin:
                    reffiles.append({"class": "File", "path": j.stdin})

        if self.tool.get("stderr"):
            with SourceLine(self.tool, "stderr", validate.ValidationException, debug):
                j.stderr = builder.do_eval(self.tool["stderr"])
                if j.stderr:
                    if os.path.isabs(j.stderr) or ".." in j.stderr:
                        raise validate.ValidationException(
                            "stderr must be a relative path, got '%s'" % j.stderr)

        if self.tool.get("stdout"):
            with SourceLine(self.tool, "stdout", validate.ValidationException, debug):
                j.stdout = builder.do_eval(self.tool["stdout"])
                if j.stdout:
                    if os.path.isabs(j.stdout) or ".." in j.stdout or not j.stdout:
                        raise validate.ValidationException(
                            "stdout must be a relative path, got '%s'" % j.stdout)

        if debug:
            _logger.debug(u"[job %s] command line bindings is %s", j.name,
                          json_dumps(builder.bindings, indent=4))
        dockerReq, _ = self.get_requirement("DockerRequirement")
        if dockerReq is not None and runtimeContext.use_container:
            out_dir, out_prefix = os.path.split(
                runtimeContext.tmp_outdir_prefix)
            j.outdir = runtimeContext.outdir or \
                tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)
            tmpdir_dir, tmpdir_prefix = os.path.split(
                runtimeContext.tmpdir_prefix)
            j.tmpdir = runtimeContext.tmpdir or \
                tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir)
            j.stagedir = tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir)
        else:
            j.outdir = builder.outdir
            j.tmpdir = builder.tmpdir
            j.stagedir = builder.stagedir

        inplaceUpdateReq, _ = self.get_requirement("InplaceUpdateRequirement")
        if inplaceUpdateReq is not None:
            j.inplace_update = inplaceUpdateReq["inplaceUpdate"]
        normalizeFilesDirs(j.generatefiles)

        readers = {}  # type: Dict[Text, Any]
        muts = set()  # type: Set[Text]

        if builder.mutation_manager is not None:
            def register_mut(f):  # type: (Dict[Text, Any]) -> None
                mm = cast(MutationManager, builder.mutation_manager)
                muts.add(f["location"])
                mm.register_mutation(j.name, f)

            def register_reader(f):  # type: (Dict[Text, Any]) -> None
                mm = cast(MutationManager, builder.mutation_manager)
                if f["location"] not in muts:
                    mm.register_reader(j.name, f)
                    readers[f["location"]] = copy.deepcopy(f)

            for li in j.generatefiles["listing"]:
                li = cast(Dict[Text, Any], li)
                if li.get("writable") and j.inplace_update:
                    adjustFileObjs(li, register_mut)
                    adjustDirObjs(li, register_mut)
                else:
                    adjustFileObjs(li, register_reader)
                    adjustDirObjs(li, register_reader)

            adjustFileObjs(builder.files, register_reader)
            adjustFileObjs(builder.bindings, register_reader)
            adjustDirObjs(builder.files, register_reader)
            adjustDirObjs(builder.bindings, register_reader)

        timelimit, _ = self.get_requirement("ToolTimeLimit")
        if timelimit is not None:
            with SourceLine(timelimit, "timelimit", validate.ValidationException, debug):
                j.timelimit = builder.do_eval(timelimit["timelimit"])
                if not isinstance(j.timelimit, int) or j.timelimit < 0:
                    raise Exception("timelimit must be an integer >= 0, got: %s" % j.timelimit)

        networkaccess, _ = self.get_requirement("NetworkAccess")
        if networkaccess is not None:
            with SourceLine(networkaccess, "networkAccess", validate.ValidationException, debug):
                j.networkaccess = builder.do_eval(networkaccess["networkAccess"])
                if not isinstance(j.networkaccess, bool):
                    raise Exception("networkAccess must be a boolean, got: %s" % j.networkaccess)

        j.environment = {}
        evr, _ = self.get_requirement("EnvVarRequirement")
        if evr is not None:
            for t in evr["envDef"]:
                j.environment[t["envName"]] = builder.do_eval(t["envValue"])

        shellcmd, _ = self.get_requirement("ShellCommandRequirement")
        if shellcmd is not None:
            cmd = []  # type: List[Text]
            for b in builder.bindings:
                arg = builder.generate_arg(b)
                if b.get("shellQuote", True):
                    arg = [shellescape.quote(a) for a in aslist(arg)]
                cmd.extend(aslist(arg))
            j.command_line = ["/bin/sh", "-c", " ".join(cmd)]
        else:
            j.command_line = flatten(list(map(builder.generate_arg, builder.bindings)))

        j.pathmapper = builder.pathmapper
        j.collect_outputs = partial(
            self.collect_output_ports, self.tool["outputs"], builder,
            compute_checksum=getdefault(runtimeContext.compute_checksum, True),
            jobname=jobname,
            readers=readers)
        j.output_callback = output_callbacks

        yield j

    def collect_output_ports(self,
                             ports,                  # type: Set[Dict[Text, Any]]
                             builder,                # type: Builder
                             outdir,                 # type: Text
                             rcode,                  # type: int
                             compute_checksum=True,  # type: bool
                             jobname="",             # type: Text
                             readers=None            # type: Optional[Dict[Text, Any]]
                            ):  # type: (...) -> OutputPorts
        ret = {}  # type: OutputPorts
        debug = _logger.isEnabledFor(logging.DEBUG)
        cwl_version = self.metadata.get(
            "http://commonwl.org/cwltool#original_cwlVersion", None)
        if cwl_version != "v1.0":
            builder.resources["exitCode"] = rcode
        try:
            fs_access = builder.make_fs_access(outdir)
            custom_output = fs_access.join(outdir, "cwl.output.json")
            if fs_access.exists(custom_output):
                with fs_access.open(custom_output, "r") as f:
                    ret = json.load(f)
                if debug:
                    _logger.debug(u"Raw output from %s: %s", custom_output,
                                  json_dumps(ret, indent=4))
            else:
                for i, port in enumerate(ports):
                    class ParameterOutputWorkflowException(WorkflowException):
                        def __init__(self, msg, **kwargs):  # type: (Text, **Any) -> None
                            super(ParameterOutputWorkflowException, self).__init__(
                                u"Error collecting output for parameter '%s':\n%s"
                                % (shortname(port["id"]), msg), kwargs)
                    with SourceLine(ports, i, ParameterOutputWorkflowException, debug):
                        fragment = shortname(port["id"])
                        ret[fragment] = self.collect_output(port, builder, outdir, fs_access,
                                                            compute_checksum=compute_checksum)
            if ret:
                revmap = partial(revmap_file, builder, outdir)
                adjustDirObjs(ret, trim_listing)
                visit_class(ret, ("File", "Directory"), cast(Callable[[Any], Any], revmap))
                visit_class(ret, ("File", "Directory"), remove_path)
                normalizeFilesDirs(ret)
                visit_class(ret, ("File", "Directory"), partial(check_valid_locations, fs_access))

                if compute_checksum:
                    adjustFileObjs(ret, partial(compute_checksums, fs_access))
            expected_schema = cast(Schema, self.names.get_name(
                "outputs_record_schema", ""))
            validate.validate_ex(expected_schema, ret,
                strict=False, logger=_logger_validation_warnings)
            if ret is not None and builder.mutation_manager is not None:
                adjustFileObjs(ret, builder.mutation_manager.set_generation)
            return ret if ret is not None else {}
        except validate.ValidationException as e:
            raise_from(WorkflowException(
                "Error validating output record. " + Text(e) + "\n in "
                + json_dumps(ret, indent=4)), e)
        finally:
            if builder.mutation_manager and readers:
                for r in readers.values():
                    builder.mutation_manager.release_reader(jobname, r)

    def collect_output(self,
                       schema,                # type: Dict[Text, Any]
                       builder,               # type: Builder
                       outdir,                # type: Text
                       fs_access,             # type: StdFsAccess
                       compute_checksum=True  # type: bool
                      ):
        # type: (...) -> Optional[Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]]
        r = []  # type: List[Any]
        empty_and_optional = False
        debug = _logger.isEnabledFor(logging.DEBUG)
        if "outputBinding" in schema:
            binding = schema["outputBinding"]
            globpatterns = []  # type: List[Text]

            revmap = partial(revmap_file, builder, outdir)

            if "glob" in binding:
                with SourceLine(binding, "glob", WorkflowException, debug):
                    for gb in aslist(binding["glob"]):
                        gb = builder.do_eval(gb)
                        if gb:
                            globpatterns.extend(aslist(gb))

                    for gb in globpatterns:
                        if gb.startswith(builder.outdir):
                            gb = gb[len(builder.outdir) + 1:]
                        elif gb == ".":
                            gb = outdir
                        elif gb.startswith("/"):
                            raise WorkflowException(
                                "glob patterns must not start with '/'")
                        try:
                            prefix = fs_access.glob(outdir)
                            r.extend([{"location": g,
                                       "path": fs_access.join(builder.outdir,
                                           g[len(prefix[0])+1:]),
                                       "basename": os.path.basename(g),
                                       "nameroot": os.path.splitext(
                                           os.path.basename(g))[0],
                                       "nameext": os.path.splitext(
                                           os.path.basename(g))[1],
                                       "class": "File" if fs_access.isfile(g)
                                       else "Directory"}
                                      for g in sorted(fs_access.glob(
                                          fs_access.join(outdir, gb)),
                                          key=cmp_to_key(cast(
                                              Callable[[Text, Text],
                                                  int], locale.strcoll)))])
                        except (OSError, IOError) as e:
                            _logger.warning(Text(e))
                        except Exception:
                            _logger.error("Unexpected error from fs_access", exc_info=True)
                            raise

                for files in r:
                    rfile = files.copy()
                    revmap(rfile)
                    if files["class"] == "Directory":
                        ll = schema.get("loadListing") or builder.loadListing
                        if ll and ll != "no_listing":
                            get_listing(fs_access, files, (ll == "deep_listing"))
                    else:
                        if binding.get("loadContents"):
                            with fs_access.open(rfile["location"], "rb") as f:
                                files["contents"] = content_limit_respected_read_bytes(f).decode("utf-8")
                        if compute_checksum:
                            with fs_access.open(rfile["location"], "rb") as f:
                                checksum = hashlib.sha1()  # nosec
                                contents = f.read(1024 * 1024)
                                while contents != b"":
                                    checksum.update(contents)
                                    contents = f.read(1024 * 1024)
                                files["checksum"] = "sha1$%s" % checksum.hexdigest()
                        files["size"] = fs_access.size(rfile["location"])

            optional = False
            single = False
            if isinstance(schema["type"], MutableSequence):
                if "null" in schema["type"]:
                    optional = True
                if "File" in schema["type"] or "Directory" in schema["type"]:
                    single = True
            elif schema["type"] == "File" or schema["type"] == "Directory":
                single = True

            if "outputEval" in binding:
                with SourceLine(binding, "outputEval", WorkflowException, debug):
                    r = builder.do_eval(binding["outputEval"], context=r)

            if single:
                if not r and not optional:
                    with SourceLine(binding, "glob", WorkflowException, debug):
                        raise WorkflowException("Did not find output file with glob pattern: '{}'".format(globpatterns))
                elif not r and optional:
                    pass
                elif isinstance(r, MutableSequence):
                    if len(r) > 1:
                        raise WorkflowException("Multiple matches for output item that is a single file.")
                    else:
                        r = r[0]

            if "secondaryFiles" in schema:
                with SourceLine(schema, "secondaryFiles", WorkflowException, debug):
                    for primary in aslist(r):
                        if isinstance(primary, MutableMapping):
                            primary.setdefault("secondaryFiles", [])
                            pathprefix = primary["path"][0:primary["path"].rindex("/")+1]
                            for sf in aslist(schema["secondaryFiles"]):
                                if 'required' in sf:
                                    sf_required = builder.do_eval(sf['required'], context=primary)
                                else:
                                    sf_required = False

                                if "$(" in sf["pattern"] or "${" in sf["pattern"]:
                                    sfpath = builder.do_eval(sf["pattern"], context=primary)
                                else:
                                    sfpath = substitute(primary["basename"], sf["pattern"])

                                for sfitem in aslist(sfpath):
                                    if not sfitem:
                                        continue
                                    if isinstance(sfitem, string_types):
                                        sfitem = {"path": pathprefix+sfitem}
                                    if not fs_access.exists(sfitem['path']) and sf_required:
                                        raise WorkflowException(
                                            "Missing required secondary file '%s'" % (
                                                sfitem["path"]))
                                    if "path" in sfitem and "location" not in sfitem:
                                        revmap(sfitem)
                                    if fs_access.isfile(sfitem["location"]):
                                        sfitem["class"] = "File"
                                        primary["secondaryFiles"].append(sfitem)
                                    elif fs_access.isdir(sfitem["location"]):
                                        sfitem["class"] = "Directory"
                                        primary["secondaryFiles"].append(sfitem)

            if "format" in schema:
                for primary in aslist(r):
                    primary["format"] = builder.do_eval(schema["format"], context=primary)

            # Ensure files point to local references outside of the run environment
            adjustFileObjs(r, revmap)

            if not r and optional:
                # Don't convert zero or empty string to None
                if r in [0, '']:
                    return r
                # For [] or None, return None
                else:
                    return None

        if (not empty_and_optional and isinstance(schema["type"], MutableMapping)
                and schema["type"]["type"] == "record"):
            out = {}
            for f in schema["type"]["fields"]:
                out[shortname(f["name"])] = self.collect_output(  # type: ignore
                    f, builder, outdir, fs_access,
                    compute_checksum=compute_checksum)
            return out
        return r