diff env/lib/python3.7/site-packages/cwltool/process.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/process.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1067 +0,0 @@
-from __future__ import absolute_import
-
-import abc
-import copy
-import errno
-import functools
-import hashlib
-import json
-import logging
-import os
-import shutil
-import stat
-import tempfile
-import textwrap
-import uuid
-
-from io import open
-from typing import (Any, Callable, Dict, Generator, Iterator, List,
-                    Mapping, MutableMapping, MutableSequence, Optional, Set, Tuple,
-                    Type, Union, cast)
-
-from pkg_resources import resource_stream
-from rdflib import Graph  # pylint: disable=unused-import
-from ruamel.yaml.comments import CommentedMap, CommentedSeq
-from six import PY3, iteritems, itervalues, string_types, with_metaclass
-from six.moves import urllib
-from future.utils import raise_from
-from typing_extensions import (TYPE_CHECKING,  # pylint: disable=unused-import
-                               Text)
-from schema_salad import schema, validate
-from schema_salad.ref_resolver import Loader, file_uri, uri_file_path
-from schema_salad.sourceline import SourceLine, strip_dup_lineno
-
-from . import expression
-from .builder import Builder, HasReqsHints
-from .context import LoadingContext  # pylint: disable=unused-import
-from .context import RuntimeContext, getdefault
-from .errors import UnsupportedRequirement, WorkflowException
-from .loghandler import _logger
-from .mutation import MutationManager  # pylint: disable=unused-import
-from .pathmapper import (PathMapper, adjustDirObjs, ensure_writable,
-                         get_listing, normalizeFilesDirs, visit_class,
-                         MapperEnt)
-from .secrets import SecretStore  # pylint: disable=unused-import
-from .software_requirements import (  # pylint: disable=unused-import
-    DependenciesConfiguration)
-from .stdfsaccess import StdFsAccess
-from .utils import (DEFAULT_TMP_PREFIX, aslist, cmp_like_py2,
-                    copytree_with_merge, onWindows, random_outdir)
-from .validate_js import validate_js_expressions
-from .update import INTERNAL_VERSION
-
-try:
-    from os import scandir  # type: ignore
-except ImportError:
-    from scandir import scandir  # type: ignore
-if TYPE_CHECKING:
-    from .provenance import ProvenanceProfile  # pylint: disable=unused-import
-
-if PY3:
-    from collections.abc import Iterable # only works on python 3.3+
-else:
-    from collections import Iterable  # pylint: disable=unused-import
-
-class LogAsDebugFilter(logging.Filter):
-    def __init__(self, name, parent):  # type: (Text, logging.Logger) -> None
-        """Initialize."""
-        name = str(name)
-        super(LogAsDebugFilter, self).__init__(name)
-        self.parent = parent
-
-    def filter(self, record):  # type: (logging.LogRecord) -> bool
-        return self.parent.isEnabledFor(logging.DEBUG)
-
-
-_logger_validation_warnings = logging.getLogger("cwltool.validation_warnings")
-_logger_validation_warnings.setLevel(_logger.getEffectiveLevel())
-_logger_validation_warnings.addFilter(LogAsDebugFilter("cwltool.validation_warnings", _logger))
-
-supportedProcessRequirements = ["DockerRequirement",
-                                "SchemaDefRequirement",
-                                "EnvVarRequirement",
-                                "ScatterFeatureRequirement",
-                                "SubworkflowFeatureRequirement",
-                                "MultipleInputFeatureRequirement",
-                                "InlineJavascriptRequirement",
-                                "ShellCommandRequirement",
-                                "StepInputExpressionRequirement",
-                                "ResourceRequirement",
-                                "InitialWorkDirRequirement",
-                                "ToolTimeLimit",
-                                "WorkReuse",
-                                "NetworkAccess",
-                                "InplaceUpdateRequirement",
-                                "LoadListingRequirement",
-                                "http://commonwl.org/cwltool#TimeLimit",
-                                "http://commonwl.org/cwltool#WorkReuse",
-                                "http://commonwl.org/cwltool#NetworkAccess",
-                                "http://commonwl.org/cwltool#LoadListingRequirement",
-                                "http://commonwl.org/cwltool#InplaceUpdateRequirement"]
-
-cwl_files = (
-    "Workflow.yml",
-    "CommandLineTool.yml",
-    "CommonWorkflowLanguage.yml",
-    "Process.yml",
-    "concepts.md",
-    "contrib.md",
-    "intro.md",
-    "invocation.md")
-
-salad_files = ('metaschema.yml',
-               'metaschema_base.yml',
-               'salad.md',
-               'field_name.yml',
-               'import_include.md',
-               'link_res.yml',
-               'ident_res.yml',
-               'vocab_res.yml',
-               'vocab_res.yml',
-               'field_name_schema.yml',
-               'field_name_src.yml',
-               'field_name_proc.yml',
-               'ident_res_schema.yml',
-               'ident_res_src.yml',
-               'ident_res_proc.yml',
-               'link_res_schema.yml',
-               'link_res_src.yml',
-               'link_res_proc.yml',
-               'vocab_res_schema.yml',
-               'vocab_res_src.yml',
-               'vocab_res_proc.yml')
-
-SCHEMA_CACHE = {}  # type: Dict[Text, Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text, Any], Loader]]
-SCHEMA_FILE = None  # type: Optional[Dict[Text, Any]]
-SCHEMA_DIR = None  # type: Optional[Dict[Text, Any]]
-SCHEMA_ANY = None  # type: Optional[Dict[Text, Any]]
-
-custom_schemas = {}  # type: Dict[Text, Tuple[Text, Text]]
-
-def use_standard_schema(version):
-    # type: (Text) -> None
-    if version in custom_schemas:
-        del custom_schemas[version]
-    if version in SCHEMA_CACHE:
-        del SCHEMA_CACHE[version]
-
-def use_custom_schema(version, name, text):
-    # type: (Text, Text, Union[Text, bytes]) -> None
-    if isinstance(text, bytes):
-        text2 = text.decode()
-    else:
-        text2 = text
-    custom_schemas[version] = (name, text2)
-    if version in SCHEMA_CACHE:
-        del SCHEMA_CACHE[version]
-
-def get_schema(version):
-    # type: (Text) -> Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text,Any], Loader]
-
-    if version in SCHEMA_CACHE:
-        return SCHEMA_CACHE[version]
-
-    cache = {}  # type: Dict[Text, Any]
-    version = version.split("#")[-1]
-    if '.dev' in version:
-        version = ".".join(version.split(".")[:-1])
-    for f in cwl_files:
-        try:
-            res = resource_stream(__name__, 'schemas/%s/%s' % (version, f))
-            cache["https://w3id.org/cwl/" + f] = res.read()
-            res.close()
-        except IOError:
-            pass
-
-    for f in salad_files:
-        try:
-            res = resource_stream(
-                __name__, 'schemas/{}/salad/schema_salad/metaschema/{}'.format(
-                    version, f))
-            cache["https://w3id.org/cwl/salad/schema_salad/metaschema/"
-                  + f] = res.read()
-            res.close()
-        except IOError:
-            pass
-
-    if version in custom_schemas:
-        cache[custom_schemas[version][0]] = custom_schemas[version][1]
-        SCHEMA_CACHE[version] = schema.load_schema(
-            custom_schemas[version][0], cache=cache)
-    else:
-        SCHEMA_CACHE[version] = schema.load_schema(
-            "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache)
-
-    return SCHEMA_CACHE[version]
-
-
-def shortname(inputid):
-    # type: (Text) -> Text
-    d = urllib.parse.urlparse(inputid)
-    if d.fragment:
-        return d.fragment.split(u"/")[-1]
-    return d.path.split(u"/")[-1]
-
-
-def checkRequirements(rec, supported_process_requirements):
-    # type: (Any, Iterable[Any]) -> None
-    if isinstance(rec, MutableMapping):
-        if "requirements" in rec:
-            for i, entry in enumerate(rec["requirements"]):
-                with SourceLine(rec["requirements"], i, UnsupportedRequirement):
-                    if entry["class"] not in supported_process_requirements:
-                        raise UnsupportedRequirement(
-                            u"Unsupported requirement {}".format(entry["class"]))
-        for key in rec:
-            checkRequirements(rec[key], supported_process_requirements)
-    if isinstance(rec, MutableSequence):
-        for entry in rec:
-            checkRequirements(entry, supported_process_requirements)
-
-
-def stage_files(pathmapper,             # type: PathMapper
-                stage_func=None,        # type: Optional[Callable[..., Any]]
-                ignore_writable=False,  # type: bool
-                symlink=True,           # type: bool
-                secret_store=None,      # type: Optional[SecretStore]
-                fix_conflicts=False     # type: bool
-               ):  # type: (...) -> None
-    """Link or copy files to their targets. Create them as needed."""
-
-    targets = {}  # type: Dict[Text, MapperEnt]
-    for key, entry in pathmapper.items():
-        if not 'File' in entry.type:
-            continue
-        if entry.target not in targets:
-            targets[entry.target] = entry
-        elif targets[entry.target].resolved != entry.resolved:
-            if fix_conflicts:
-                tgt = entry.target
-                i = 2
-                tgt = "%s_%s" % (tgt, i)
-                while tgt in targets:
-                    i += 1
-                    tgt = "%s_%s" % (tgt, i)
-                targets[tgt] = pathmapper.update(key, entry.resolved, tgt, entry.type, entry.staged)
-            else:
-                raise WorkflowException("File staging conflict, trying to stage both %s and %s to the same target %s" % (
-                    targets[entry.target].resolved, entry.resolved, entry.target))
-
-    for key, entry in pathmapper.items():
-        if not entry.staged:
-            continue
-        if not os.path.exists(os.path.dirname(entry.target)):
-            os.makedirs(os.path.dirname(entry.target))
-        if entry.type in ("File", "Directory") and os.path.exists(entry.resolved):
-            if symlink:  # Use symlink func if allowed
-                if onWindows():
-                    if entry.type == "File":
-                        shutil.copy(entry.resolved, entry.target)
-                    elif entry.type == "Directory":
-                        if os.path.exists(entry.target) \
-                                and os.path.isdir(entry.target):
-                            shutil.rmtree(entry.target)
-                        copytree_with_merge(entry.resolved, entry.target)
-                else:
-                    os.symlink(entry.resolved, entry.target)
-            elif stage_func is not None:
-                stage_func(entry.resolved, entry.target)
-        elif entry.type == "Directory" and not os.path.exists(entry.target) \
-                and entry.resolved.startswith("_:"):
-            os.makedirs(entry.target)
-        elif entry.type == "WritableFile" and not ignore_writable:
-            shutil.copy(entry.resolved, entry.target)
-            ensure_writable(entry.target)
-        elif entry.type == "WritableDirectory" and not ignore_writable:
-            if entry.resolved.startswith("_:"):
-                os.makedirs(entry.target)
-            else:
-                shutil.copytree(entry.resolved, entry.target)
-                ensure_writable(entry.target)
-        elif entry.type == "CreateFile" or entry.type == "CreateWritableFile":
-            with open(entry.target, "wb") as new:
-                if secret_store is not None:
-                    new.write(
-                        secret_store.retrieve(entry.resolved).encode("utf-8"))
-                else:
-                    new.write(entry.resolved.encode("utf-8"))
-            if entry.type == "CreateFile":
-                os.chmod(entry.target, stat.S_IRUSR)  # Read only
-            else:  # it is a "CreateWritableFile"
-                ensure_writable(entry.target)
-            pathmapper.update(
-                key, entry.target, entry.target, entry.type, entry.staged)
-
-
-def relocateOutputs(outputObj,              # type: Union[Dict[Text, Any], List[Dict[Text, Any]]]
-                    destination_path,       # type: Text
-                    source_directories,     # type: Set[Text]
-                    action,                 # type: Text
-                    fs_access,              # type: StdFsAccess
-                    compute_checksum=True,  # type: bool
-                    path_mapper=PathMapper  # type: Type[PathMapper]
-                    ):
-    # type: (...) -> Union[Dict[Text, Any], List[Dict[Text, Any]]]
-    adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True))
-
-    if action not in ("move", "copy"):
-        return outputObj
-
-    def _collectDirEntries(obj):
-        # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> Iterator[Dict[Text, Any]]
-        if isinstance(obj, dict):
-            if obj.get("class") in ("File", "Directory"):
-                yield obj
-            else:
-                for sub_obj in obj.values():
-                    for dir_entry in _collectDirEntries(sub_obj):
-                        yield dir_entry
-        elif isinstance(obj, MutableSequence):
-            for sub_obj in obj:
-                for dir_entry in _collectDirEntries(sub_obj):
-                    yield dir_entry
-
-    def _relocate(src, dst):  # type: (Text, Text) -> None
-        if src == dst:
-            return
-
-        # If the source is not contained in source_directories we're not allowed to delete it
-        src = fs_access.realpath(src)
-        src_can_deleted = any(os.path.commonprefix([p, src]) == p for p in source_directories)
-
-        _action = "move" if action == "move" and src_can_deleted else "copy"
-
-        if _action == "move":
-            _logger.debug("Moving %s to %s", src, dst)
-            if fs_access.isdir(src) and fs_access.isdir(dst):
-                # merge directories
-                for dir_entry in scandir(src):
-                    _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name))
-            else:
-                shutil.move(src, dst)
-
-        elif _action == "copy":
-            _logger.debug("Copying %s to %s", src, dst)
-            if fs_access.isdir(src):
-                if os.path.isdir(dst):
-                    shutil.rmtree(dst)
-                elif os.path.isfile(dst):
-                    os.unlink(dst)
-                shutil.copytree(src, dst)
-            else:
-                shutil.copy2(src, dst)
-
-    def _realpath(ob):  # type: (Dict[Text, Any]) -> None
-        if ob["location"].startswith("file:"):
-            ob["location"] = file_uri(os.path.realpath(uri_file_path(ob["location"])))
-        if ob["location"].startswith("/"):
-            ob["location"] = os.path.realpath(ob["location"])
-
-    outfiles = list(_collectDirEntries(outputObj))
-    visit_class(outfiles, ("File", "Directory"), _realpath)
-    pm = path_mapper(outfiles, "", destination_path, separateDirs=False)
-    stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True)
-
-    def _check_adjust(a_file):  # type: (Dict[Text, Text]) -> Dict[Text, Text]
-        a_file["location"] = file_uri(pm.mapper(a_file["location"])[1])
-        if "contents" in a_file:
-            del a_file["contents"]
-        return a_file
-
-    visit_class(outputObj, ("File", "Directory"), _check_adjust)
-
-    if compute_checksum:
-        visit_class(outputObj, ("File",), functools.partial(
-            compute_checksums, fs_access))
-    return outputObj
-
-
-def cleanIntermediate(output_dirs):  # type: (Iterable[Text]) -> None
-    for a in output_dirs:
-        if os.path.exists(a):
-            _logger.debug(u"Removing intermediate output directory %s", a)
-            shutil.rmtree(a, True)
-
-def add_sizes(fsaccess, obj):  # type: (StdFsAccess, Dict[Text, Any]) -> None
-    if 'location' in obj:
-        try:
-            if "size" not in obj:
-                obj["size"] = fsaccess.size(obj["location"])
-        except OSError:
-            pass
-    elif 'contents' in obj:
-        obj["size"] = len(obj['contents'])
-    else:
-        return  # best effort
-
-def fill_in_defaults(inputs,   # type: List[Dict[Text, Text]]
-                     job,      # type: Dict[Text, expression.JSON]
-                     fsaccess  # type: StdFsAccess
-                    ):  # type: (...) -> None
-    for e, inp in enumerate(inputs):
-        with SourceLine(inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
-            fieldname = shortname(inp[u"id"])
-            if job.get(fieldname) is not None:
-                pass
-            elif job.get(fieldname) is None and u"default" in inp:
-                job[fieldname] = copy.deepcopy(inp[u"default"])
-            elif job.get(fieldname) is None and u"null" in aslist(inp[u"type"]):
-                job[fieldname] = None
-            else:
-                raise WorkflowException("Missing required input parameter '%s'" % shortname(inp["id"]))
-
-
-def avroize_type(field_type, name_prefix=""):
-    # type: (Union[List[Dict[Text, Any]], Dict[Text, Any]], Text) -> Any
-    """Add missing information to a type so that CWL types are valid."""
-    if isinstance(field_type, MutableSequence):
-        for field in field_type:
-            avroize_type(field, name_prefix)
-    elif isinstance(field_type, MutableMapping):
-        if field_type["type"] in ("enum", "record"):
-            if "name" not in field_type:
-                field_type["name"] = name_prefix + Text(uuid.uuid4())
-        if field_type["type"] == "record":
-            avroize_type(field_type["fields"], name_prefix)
-        if field_type["type"] == "array":
-            avroize_type(field_type["items"], name_prefix)
-        if isinstance(field_type["type"], MutableSequence):
-            for ctype in field_type["type"]:
-                avroize_type(ctype, name_prefix)
-    return field_type
-
-def get_overrides(overrides, toolid):  # type: (List[Dict[Text, Any]], Text) -> Dict[Text, Any]
-    req = {}  # type: Dict[Text, Any]
-    if not isinstance(overrides, MutableSequence):
-        raise validate.ValidationException("Expected overrides to be a list, but was %s" % type(overrides))
-    for ov in overrides:
-        if ov["overrideTarget"] == toolid:
-            req.update(ov)
-    return req
-
-
-_VAR_SPOOL_ERROR = textwrap.dedent(
-    """
-    Non-portable reference to /var/spool/cwl detected: '{}'.
-    To fix, replace /var/spool/cwl with $(runtime.outdir) or add
-    DockerRequirement to the 'requirements' section and declare
-    'dockerOutputDirectory: /var/spool/cwl'.
-    """)
-
-
-def var_spool_cwl_detector(obj,           # type: Union[MutableMapping[Text, Text], List[Dict[Text, Any]], Text]
-                           item=None,     # type: Optional[Any]
-                           obj_key=None,  # type: Optional[Any]
-                          ):              # type: (...)->bool
-    """Detect any textual reference to /var/spool/cwl."""
-    r = False
-    if isinstance(obj, string_types):
-        if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory":
-            _logger.warning(
-                SourceLine(item=item, key=obj_key, raise_type=Text).makeError(
-                    _VAR_SPOOL_ERROR.format(obj)))
-            r = True
-    elif isinstance(obj, MutableMapping):
-        for mkey, mvalue in iteritems(obj):
-            r = var_spool_cwl_detector(mvalue, obj, mkey) or r
-    elif isinstance(obj, MutableSequence):
-        for lkey, lvalue in enumerate(obj):
-            r = var_spool_cwl_detector(lvalue, obj, lkey) or r
-    return r
-
-def eval_resource(builder, resource_req):  # type: (Builder, Text) -> Any
-    if expression.needs_parsing(resource_req):
-        return builder.do_eval(resource_req)
-    return resource_req
-
-
-# Threshold where the "too many files" warning kicks in
-FILE_COUNT_WARNING = 5000
-
-class Process(with_metaclass(abc.ABCMeta, HasReqsHints)):
-    def __init__(self,
-                 toolpath_object,      # type: MutableMapping[Text, Any]
-                 loadingContext        # type: LoadingContext
-                ):  # type: (...) -> None
-        """Build a Process object from the provided dictionary."""
-        self.metadata = getdefault(loadingContext.metadata, {})  # type: Dict[Text,Any]
-        self.provenance_object = None  # type: Optional[ProvenanceProfile]
-        self.parent_wf = None          # type: Optional[ProvenanceProfile]
-        global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY  # pylint: disable=global-statement
-        if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None:
-            get_schema("v1.0")
-            SCHEMA_ANY = cast(Dict[Text, Any],
-                              SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"])
-            SCHEMA_FILE = cast(Dict[Text, Any],
-                               SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"])
-            SCHEMA_DIR = cast(Dict[Text, Any],
-                              SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"])
-
-        self.names = schema.make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY],
-                                             Loader({}))
-        self.tool = toolpath_object
-        self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
-        self.requirements.extend(self.tool.get("requirements", []))
-        if "id" not in self.tool:
-            self.tool["id"] = "_:" + Text(uuid.uuid4())
-        self.requirements.extend(get_overrides(getdefault(loadingContext.overrides_list, []),
-                                               self.tool["id"]).get("requirements", []))
-        self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
-        self.hints.extend(self.tool.get("hints", []))
-        # Versions of requirements and hints which aren't mutated.
-        self.original_requirements = copy.deepcopy(self.requirements)
-        self.original_hints = copy.deepcopy(self.hints)
-        self.doc_loader = loadingContext.loader
-        self.doc_schema = loadingContext.avsc_names
-
-        self.formatgraph = None  # type: Optional[Graph]
-        if self.doc_loader is not None:
-            self.formatgraph = self.doc_loader.graph
-
-        checkRequirements(self.tool, supportedProcessRequirements)
-        self.validate_hints(loadingContext.avsc_names, self.tool.get("hints", []),
-                            strict=getdefault(loadingContext.strict, False))
-
-        self.schemaDefs = {}  # type: Dict[Text,Dict[Text, Any]]
-
-        sd, _ = self.get_requirement("SchemaDefRequirement")
-
-        if sd is not None:
-            sdtypes = avroize_type(sd["types"])
-            av = schema.make_valid_avro(sdtypes, {t["name"]: t for t in sdtypes}, set())
-            for i in av:
-                self.schemaDefs[i["name"]] = i  # type: ignore
-            schema.make_avsc_object(schema.convert_to_dict(av), self.names)
-
-        # Build record schema from inputs
-        self.inputs_record_schema = {
-            "name": "input_record_schema", "type": "record",
-            "fields": []}  # type: Dict[Text, Any]
-        self.outputs_record_schema = {
-            "name": "outputs_record_schema", "type": "record",
-            "fields": []}  # type: Dict[Text, Any]
-
-        for key in ("inputs", "outputs"):
-            for i in self.tool[key]:
-                c = copy.deepcopy(i)
-                c["name"] = shortname(c["id"])
-                del c["id"]
-
-                if "type" not in c:
-                    raise validate.ValidationException(
-                        u"Missing 'type' in parameter '{}'".format(c["name"]))
-
-                if "default" in c and "null" not in aslist(c["type"]):
-                    nullable = ["null"]
-                    nullable.extend(aslist(c["type"]))
-                    c["type"] = nullable
-                else:
-                    c["type"] = c["type"]
-                c["type"] = avroize_type(c["type"], c["name"])
-                if key == "inputs":
-                    self.inputs_record_schema["fields"].append(c)
-                elif key == "outputs":
-                    self.outputs_record_schema["fields"].append(c)
-
-        with SourceLine(toolpath_object, "inputs", validate.ValidationException):
-            self.inputs_record_schema = cast(
-                Dict[Text, Any], schema.make_valid_avro(
-                    self.inputs_record_schema, {}, set()))
-            schema.make_avsc_object(
-                schema.convert_to_dict(self.inputs_record_schema), self.names)
-        with SourceLine(toolpath_object, "outputs", validate.ValidationException):
-            self.outputs_record_schema = cast(
-                Dict[Text, Any],
-                schema.make_valid_avro(self.outputs_record_schema, {}, set()))
-            schema.make_avsc_object(
-                schema.convert_to_dict(self.outputs_record_schema), self.names)
-
-        if toolpath_object.get("class") is not None \
-                and not getdefault(loadingContext.disable_js_validation, False):
-            if loadingContext.js_hint_options_file is not None:
-                try:
-                    with open(loadingContext.js_hint_options_file) as options_file:
-                        validate_js_options = json.load(options_file)
-                except (OSError, ValueError) as err:
-                    _logger.error(
-                        "Failed to read options file %s",
-                        loadingContext.js_hint_options_file)
-                    raise
-            else:
-                validate_js_options = None
-            if self.doc_schema is not None:
-                validate_js_expressions(
-                    cast(CommentedMap, toolpath_object),
-                    self.doc_schema.names[toolpath_object["class"]],
-                    validate_js_options)
-
-        dockerReq, is_req = self.get_requirement("DockerRequirement")
-
-        if dockerReq is not None and "dockerOutputDirectory" in dockerReq\
-                and is_req is not None and not is_req:
-            _logger.warning(SourceLine(
-                item=dockerReq, raise_type=Text).makeError(
-                    "When 'dockerOutputDirectory' is declared, DockerRequirement "
-                    "should go in the 'requirements' section, not 'hints'."""))
-
-        if dockerReq is not None and is_req is not None\
-                and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl":
-            if is_req:
-                # In this specific case, it is legal to have /var/spool/cwl, so skip the check.
-                pass
-            else:
-                # Must be a requirement
-                var_spool_cwl_detector(self.tool)
-        else:
-            var_spool_cwl_detector(self.tool)
-
-    def _init_job(self, joborder, runtime_context):
-        # type: (Mapping[Text, Text], RuntimeContext) -> Builder
-
-        if self.metadata.get("cwlVersion") != INTERNAL_VERSION:
-            raise WorkflowException("Process object loaded with version '%s', must update to '%s' in order to execute." % (
-                self.metadata.get("cwlVersion"), INTERNAL_VERSION))
-
-        job = cast(Dict[Text, expression.JSON], copy.deepcopy(joborder))
-
-        make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess)
-        fs_access = make_fs_access(runtime_context.basedir)
-
-        load_listing_req, _ = self.get_requirement(
-            "LoadListingRequirement")
-
-        if load_listing_req is not None:
-            load_listing = load_listing_req.get("loadListing")
-        else:
-            load_listing = "no_listing"
-
-        # Validate job order
-        try:
-            fill_in_defaults(self.tool[u"inputs"], job, fs_access)
-
-            normalizeFilesDirs(job)
-            schema = self.names.get_name("input_record_schema", "")
-            if schema is None:
-                raise WorkflowException("Missing input record schema: "
-                    "{}".format(self.names))
-            validate.validate_ex(schema, job, strict=False,
-                                 logger=_logger_validation_warnings)
-
-            if load_listing and load_listing != "no_listing":
-                get_listing(fs_access, job, recursive=(load_listing == "deep_listing"))
-
-            visit_class(job, ("File",), functools.partial(add_sizes, fs_access))
-
-            if load_listing == "deep_listing":
-                for i, inparm in enumerate(self.tool["inputs"]):
-                    k = shortname(inparm["id"])
-                    if k not in job:
-                        continue
-                    v = job[k]
-                    dircount = [0]
-
-                    def inc(d):  # type: (List[int]) -> None
-                        d[0] += 1
-                    visit_class(v, ("Directory",), lambda x: inc(dircount))
-                    if dircount[0] == 0:
-                        continue
-                    filecount = [0]
-                    visit_class(v, ("File",), lambda x: inc(filecount))
-                    if filecount[0] > FILE_COUNT_WARNING:
-                        # Long lines in this message are okay, will be reflowed based on terminal columns.
-                        _logger.warning(strip_dup_lineno(SourceLine(self.tool["inputs"], i, Text).makeError(
-                    """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'.  This may negatively affect workflow performance and memory use.
-
-If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior:
-
-$namespaces:
-  cwltool: "http://commonwl.org/cwltool#"
-hints:
-  cwltool:LoadListingRequirement:
-    loadListing: shallow_listing
-
-""" % (filecount[0], k))))
-
-        except (validate.ValidationException, WorkflowException) as err:
-            raise_from(WorkflowException("Invalid job input record:\n" + Text(err)), err)
-
-        files = []  # type: List[Dict[Text, Text]]
-        bindings = CommentedSeq()
-        tmpdir = u""
-        stagedir = u""
-
-        docker_req, _ = self.get_requirement("DockerRequirement")
-        default_docker = None
-
-        if docker_req is None and runtime_context.default_container:
-            default_docker = runtime_context.default_container
-
-        if (docker_req or default_docker) and runtime_context.use_container:
-            if docker_req is not None:
-                # Check if docker output directory is absolute
-                if docker_req.get("dockerOutputDirectory") and \
-                        docker_req.get("dockerOutputDirectory").startswith('/'):
-                    outdir = docker_req.get("dockerOutputDirectory")
-                else:
-                    outdir = docker_req.get("dockerOutputDirectory") or \
-                        runtime_context.docker_outdir or random_outdir()
-            elif default_docker is not None:
-                outdir = runtime_context.docker_outdir or random_outdir()
-            tmpdir = runtime_context.docker_tmpdir or "/tmp"  # nosec
-            stagedir = runtime_context.docker_stagedir or "/var/lib/cwl"
-        else:
-            outdir = fs_access.realpath(
-                runtime_context.outdir or tempfile.mkdtemp(
-                    prefix=getdefault(runtime_context.tmp_outdir_prefix,
-                                      DEFAULT_TMP_PREFIX)))
-            if self.tool[u"class"] != 'Workflow':
-                tmpdir = fs_access.realpath(runtime_context.tmpdir
-                                            or tempfile.mkdtemp())
-                stagedir = fs_access.realpath(runtime_context.stagedir
-                                              or tempfile.mkdtemp())
-
-        builder = Builder(job,
-                          files,
-                          bindings,
-                          self.schemaDefs,
-                          self.names,
-                          self.requirements,
-                          self.hints,
-                          {},
-                          runtime_context.mutation_manager,
-                          self.formatgraph,
-                          make_fs_access,
-                          fs_access,
-                          runtime_context.job_script_provider,
-                          runtime_context.eval_timeout,
-                          runtime_context.debug,
-                          runtime_context.js_console,
-                          runtime_context.force_docker_pull,
-                          load_listing,
-                          outdir,
-                          tmpdir,
-                          stagedir)
-
-        bindings.extend(builder.bind_input(
-            self.inputs_record_schema, job,
-            discover_secondaryFiles=getdefault(runtime_context.toplevel, False)))
-
-        if self.tool.get("baseCommand"):
-            for index, command in enumerate(aslist(self.tool["baseCommand"])):
-                bindings.append({
-                    "position": [-1000000, index],
-                    "datum": command
-                })
-
-        if self.tool.get("arguments"):
-            for i, arg in enumerate(self.tool["arguments"]):
-                lc = self.tool["arguments"].lc.data[i]
-                filename = self.tool["arguments"].lc.filename
-                bindings.lc.add_kv_line_col(len(bindings), lc)
-                if isinstance(arg, MutableMapping):
-                    arg = copy.deepcopy(arg)
-                    if arg.get("position"):
-                        position = arg.get("position")
-                        if isinstance(position, str):  # no need to test the
-                                                       # CWLVersion as the v1.0
-                                                       # schema only allows ints
-                            position = builder.do_eval(position)
-                            if position is None:
-                                position = 0
-                        arg["position"] = [position, i]
-                    else:
-                        arg["position"] = [0, i]
-                    bindings.append(arg)
-                elif ("$(" in arg) or ("${" in arg):
-                    cm = CommentedMap((
-                        ("position", [0, i]),
-                        ("valueFrom", arg)
-                    ))
-                    cm.lc.add_kv_line_col("valueFrom", lc)
-                    cm.lc.filename = filename
-                    bindings.append(cm)
-                else:
-                    cm = CommentedMap((
-                        ("position", [0, i]),
-                        ("datum", arg)
-                    ))
-                    cm.lc.add_kv_line_col("datum", lc)
-                    cm.lc.filename = filename
-                    bindings.append(cm)
-
-        # use python2 like sorting of heterogeneous lists
-        # (containing str and int types),
-        if PY3:
-            key = functools.cmp_to_key(cmp_like_py2)
-        else:  # PY2
-            key = lambda d: d["position"]
-
-        # This awkward construction replaces the contents of
-        # "bindings" in place (because Builder expects it to be
-        # mutated in place, sigh, I'm sorry) with its contents sorted,
-        # supporting different versions of Python and ruamel.yaml with
-        # different behaviors/bugs in CommentedSeq.
-        bindings_copy = copy.deepcopy(bindings)
-        del bindings[:]
-        bindings.extend(sorted(bindings_copy, key=key))
-
-        if self.tool[u"class"] != 'Workflow':
-            builder.resources = self.evalResources(builder, runtime_context)
-        return builder
-
-    def evalResources(self, builder, runtimeContext):
-        # type: (Builder, RuntimeContext) -> Dict[str, int]
-        resourceReq, _ = self.get_requirement("ResourceRequirement")
-        if resourceReq is None:
-            resourceReq = {}
-        cwl_version = self.metadata.get(
-            "http://commonwl.org/cwltool#original_cwlVersion", None)
-        if cwl_version == "v1.0":
-            ram = 1024
-        else:
-            ram = 256
-        request = {
-            "coresMin": 1,
-            "coresMax": 1,
-            "ramMin": ram,
-            "ramMax": ram,
-            "tmpdirMin": 1024,
-            "tmpdirMax": 1024,
-            "outdirMin": 1024,
-            "outdirMax": 1024
-        }  # type: Dict[str, int]
-        for a in ("cores", "ram", "tmpdir", "outdir"):
-            mn = None
-            mx = None
-            if resourceReq.get(a + "Min"):
-                mn = eval_resource(builder, resourceReq[a + "Min"])
-            if resourceReq.get(a + "Max"):
-                mx = eval_resource(builder, resourceReq[a + "Max"])
-            if mn is None:
-                mn = mx
-            elif mx is None:
-                mx = mn
-
-            if mn is not None:
-                request[a + "Min"] = cast(int, mn)
-                request[a + "Max"] = cast(int, mx)
-
-        if runtimeContext.select_resources is not None:
-            return runtimeContext.select_resources(request, runtimeContext)
-        return {
-            "cores": request["coresMin"],
-            "ram": request["ramMin"],
-            "tmpdirSize": request["tmpdirMin"],
-            "outdirSize": request["outdirMin"],
-        }
-
-    def validate_hints(self, avsc_names, hints, strict):
-        # type: (Any, List[Dict[Text, Any]], bool) -> None
-        for i, r in enumerate(hints):
-            sl = SourceLine(hints, i, validate.ValidationException)
-            with sl:
-                if avsc_names.get_name(r["class"], "") is not None and self.doc_loader is not None:
-                    plain_hint = dict((key, r[key]) for key in r if key not in
-                                      self.doc_loader.identifiers)  # strip identifiers
-                    validate.validate_ex(
-                        avsc_names.get_name(plain_hint["class"], ""),
-                        plain_hint, strict=strict)
-                elif r["class"] in ("NetworkAccess", "LoadListingRequirement"):
-                    pass
-                else:
-                    _logger.info(Text(sl.makeError(u"Unknown hint %s" % (r["class"]))))
-
-    def visit(self, op):  # type: (Callable[[MutableMapping[Text, Any]], None]) -> None
-        op(self.tool)
-
-    @abc.abstractmethod
-    def job(self,
-            job_order,         # type: Mapping[Text, Text]
-            output_callbacks,  # type: Callable[[Any, Any], Any]
-            runtimeContext     # type: RuntimeContext
-           ):  # type: (...) -> Generator[Any, None, None]
-        # FIXME: Declare base type for what Generator yields
-        pass
-
-
-_names = set()  # type: Set[Text]
-
-
-def uniquename(stem, names=None):  # type: (Text, Optional[Set[Text]]) -> Text
-    global _names
-    if names is None:
-        names = _names
-    c = 1
-    u = stem
-    while u in names:
-        c += 1
-        u = u"%s_%s" % (stem, c)
-    names.add(u)
-    return u
-
-
-def nestdir(base, deps):
-    # type: (Text, Dict[Text, Any]) -> Dict[Text, Any]
-    dirname = os.path.dirname(base) + "/"
-    subid = deps["location"]
-    if subid.startswith(dirname):
-        s2 = subid[len(dirname):]
-        sp = s2.split('/')
-        sp.pop()
-        while sp:
-            nx = sp.pop()
-            deps = {
-                "class": "Directory",
-                "basename": nx,
-                "listing": [deps]
-            }
-    return deps
-
-
-def mergedirs(listing):
-    # type: (List[Dict[Text, Any]]) -> List[Dict[Text, Any]]
-    r = []  # type: List[Dict[Text, Any]]
-    ents = {}  # type: Dict[Text, Any]
-    collided = set()  # type: Set[Text]
-    for e in listing:
-        if e["basename"] not in ents:
-            ents[e["basename"]] = e
-        elif e["class"] == "Directory":
-            if e.get("listing"):
-                ents[e["basename"]].setdefault("listing", []).extend(e["listing"])
-            if ents[e["basename"]]["location"].startswith("_:"):
-                ents[e["basename"]]["location"] = e["location"]
-        elif e["location"] != ents[e["basename"]]["location"]:
-            # same basename, different location, collision,
-            # rename both.
-            collided.add(e["basename"])
-            e2 = ents[e["basename"]]
-
-            e["basename"] = urllib.parse.quote(e["location"], safe="")
-            e2["basename"] = urllib.parse.quote(e2["location"], safe="")
-
-            e["nameroot"], e["nameext"] = os.path.splitext(e["basename"])
-            e2["nameroot"], e2["nameext"] = os.path.splitext(e2["basename"])
-
-            ents[e["basename"]] = e
-            ents[e2["basename"]] = e2
-    for c in collided:
-        del ents[c]
-    for e in itervalues(ents):
-        if e["class"] == "Directory" and "listing" in e:
-            e["listing"] = mergedirs(e["listing"])
-    r.extend(itervalues(ents))
-    return r
-
-
-CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
-
-def scandeps(base,                          # type: Text
-             doc,                           # type: Any
-             reffields,                     # type: Set[Text]
-             urlfields,                     # type: Set[Text]
-             loadref,                       # type: Callable[[Text, Text], Text]
-             urljoin=urllib.parse.urljoin,  # type: Callable[[Text, Text], Text]
-             nestdirs=True                  # type: bool
-            ):  # type: (...) -> List[Dict[Text, Text]]
-    r = []  # type: List[Dict[Text, Text]]
-    if isinstance(doc, MutableMapping):
-        if "id" in doc:
-            if doc["id"].startswith("file://"):
-                df, _ = urllib.parse.urldefrag(doc["id"])
-                if base != df:
-                    r.append({
-                        "class": "File",
-                        "location": df,
-                        "format": CWL_IANA
-                    })
-                    base = df
-
-        if doc.get("class") in ("File", "Directory") and "location" in urlfields:
-            u = doc.get("location", doc.get("path"))
-            if u and not u.startswith("_:"):
-                deps = {"class": doc["class"],
-                        "location": urljoin(base, u)
-                       }  # type: Dict[Text, Any]
-                if "basename" in doc:
-                    deps["basename"] = doc["basename"]
-                if doc["class"] == "Directory" and "listing" in doc:
-                    deps["listing"] = doc["listing"]
-                if doc["class"] == "File" and "secondaryFiles" in doc:
-                    deps["secondaryFiles"] = doc["secondaryFiles"]
-                if nestdirs:
-                    deps = nestdir(base, deps)
-                r.append(deps)
-            else:
-                if doc["class"] == "Directory" and "listing" in doc:
-                    r.extend(scandeps(
-                        base, doc["listing"], reffields, urlfields, loadref,
-                        urljoin=urljoin, nestdirs=nestdirs))
-                elif doc["class"] == "File" and "secondaryFiles" in doc:
-                    r.extend(scandeps(
-                        base, doc["secondaryFiles"], reffields, urlfields,
-                        loadref, urljoin=urljoin, nestdirs=nestdirs))
-
-        for k, v in iteritems(doc):
-            if k in reffields:
-                for u in aslist(v):
-                    if isinstance(u, MutableMapping):
-                        r.extend(scandeps(
-                            base, u, reffields, urlfields, loadref,
-                            urljoin=urljoin, nestdirs=nestdirs))
-                    else:
-                        subid = urljoin(base, u)
-                        basedf, _ = urllib.parse.urldefrag(base)
-                        subiddf, _ = urllib.parse.urldefrag(subid)
-                        if basedf == subiddf:
-                            continue
-                        sub = loadref(base, u)
-                        deps = {
-                            "class": "File",
-                            "location": subid,
-                            "format": CWL_IANA
-                        }
-                        sf = scandeps(
-                            subid, sub, reffields, urlfields, loadref,
-                            urljoin=urljoin, nestdirs=nestdirs)
-                        if sf:
-                            deps["secondaryFiles"] = sf
-                        if nestdirs:
-                            deps = nestdir(base, deps)
-                        r.append(deps)
-            elif k in urlfields and k != "location":
-                for u in aslist(v):
-                    deps = {
-                        "class": "File",
-                        "location": urljoin(base, u)
-                    }
-                    if nestdirs:
-                        deps = nestdir(base, deps)
-                    r.append(deps)
-            elif k not in ("listing", "secondaryFiles"):
-                r.extend(scandeps(
-                    base, v, reffields, urlfields, loadref, urljoin=urljoin,
-                    nestdirs=nestdirs))
-    elif isinstance(doc, MutableSequence):
-        for d in doc:
-            r.extend(scandeps(
-                base, d, reffields, urlfields, loadref, urljoin=urljoin,
-                nestdirs=nestdirs))
-
-    if r:
-        normalizeFilesDirs(r)
-        r = mergedirs(r)
-
-    return r
-
-
-def compute_checksums(fs_access, fileobj):  # type: (StdFsAccess, Dict[Text, Any]) -> None
-    if "checksum" not in fileobj:
-        checksum = hashlib.sha1()  # nosec
-        with fs_access.open(fileobj["location"], "rb") as f:
-            contents = f.read(1024 * 1024)
-            while contents != b"":
-                checksum.update(contents)
-                contents = f.read(1024 * 1024)
-        fileobj["checksum"] = "sha1$%s" % checksum.hexdigest()
-        fileobj["size"] = fs_access.size(fileobj["location"])