diff env/lib/python3.7/site-packages/cwltool/process.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/cwltool/process.py	Thu May 14 14:56:58 2020 -0400
@@ -0,0 +1,1067 @@
+from __future__ import absolute_import
+
+import abc
+import copy
+import errno
+import functools
+import hashlib
+import json
+import logging
+import os
+import shutil
+import stat
+import tempfile
+import textwrap
+import uuid
+
+from io import open
+from typing import (Any, Callable, Dict, Generator, Iterator, List,
+                    Mapping, MutableMapping, MutableSequence, Optional, Set, Tuple,
+                    Type, Union, cast)
+
+from pkg_resources import resource_stream
+from rdflib import Graph  # pylint: disable=unused-import
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
+from six import PY3, iteritems, itervalues, string_types, with_metaclass
+from six.moves import urllib
+from future.utils import raise_from
+from typing_extensions import (TYPE_CHECKING,  # pylint: disable=unused-import
+                               Text)
+from schema_salad import schema, validate
+from schema_salad.ref_resolver import Loader, file_uri, uri_file_path
+from schema_salad.sourceline import SourceLine, strip_dup_lineno
+
+from . import expression
+from .builder import Builder, HasReqsHints
+from .context import LoadingContext  # pylint: disable=unused-import
+from .context import RuntimeContext, getdefault
+from .errors import UnsupportedRequirement, WorkflowException
+from .loghandler import _logger
+from .mutation import MutationManager  # pylint: disable=unused-import
+from .pathmapper import (PathMapper, adjustDirObjs, ensure_writable,
+                         get_listing, normalizeFilesDirs, visit_class,
+                         MapperEnt)
+from .secrets import SecretStore  # pylint: disable=unused-import
+from .software_requirements import (  # pylint: disable=unused-import
+    DependenciesConfiguration)
+from .stdfsaccess import StdFsAccess
+from .utils import (DEFAULT_TMP_PREFIX, aslist, cmp_like_py2,
+                    copytree_with_merge, onWindows, random_outdir)
+from .validate_js import validate_js_expressions
+from .update import INTERNAL_VERSION
+
+try:
+    from os import scandir  # type: ignore
+except ImportError:
+    from scandir import scandir  # type: ignore
+if TYPE_CHECKING:
+    from .provenance import ProvenanceProfile  # pylint: disable=unused-import
+
+if PY3:
+    from collections.abc import Iterable # only works on python 3.3+
+else:
+    from collections import Iterable  # pylint: disable=unused-import
+
+class LogAsDebugFilter(logging.Filter):
+    def __init__(self, name, parent):  # type: (Text, logging.Logger) -> None
+        """Initialize."""
+        name = str(name)
+        super(LogAsDebugFilter, self).__init__(name)
+        self.parent = parent
+
+    def filter(self, record):  # type: (logging.LogRecord) -> bool
+        return self.parent.isEnabledFor(logging.DEBUG)
+
+
+_logger_validation_warnings = logging.getLogger("cwltool.validation_warnings")
+_logger_validation_warnings.setLevel(_logger.getEffectiveLevel())
+_logger_validation_warnings.addFilter(LogAsDebugFilter("cwltool.validation_warnings", _logger))
+
+supportedProcessRequirements = ["DockerRequirement",
+                                "SchemaDefRequirement",
+                                "EnvVarRequirement",
+                                "ScatterFeatureRequirement",
+                                "SubworkflowFeatureRequirement",
+                                "MultipleInputFeatureRequirement",
+                                "InlineJavascriptRequirement",
+                                "ShellCommandRequirement",
+                                "StepInputExpressionRequirement",
+                                "ResourceRequirement",
+                                "InitialWorkDirRequirement",
+                                "ToolTimeLimit",
+                                "WorkReuse",
+                                "NetworkAccess",
+                                "InplaceUpdateRequirement",
+                                "LoadListingRequirement",
+                                "http://commonwl.org/cwltool#TimeLimit",
+                                "http://commonwl.org/cwltool#WorkReuse",
+                                "http://commonwl.org/cwltool#NetworkAccess",
+                                "http://commonwl.org/cwltool#LoadListingRequirement",
+                                "http://commonwl.org/cwltool#InplaceUpdateRequirement"]
+
+cwl_files = (
+    "Workflow.yml",
+    "CommandLineTool.yml",
+    "CommonWorkflowLanguage.yml",
+    "Process.yml",
+    "concepts.md",
+    "contrib.md",
+    "intro.md",
+    "invocation.md")
+
+salad_files = ('metaschema.yml',
+               'metaschema_base.yml',
+               'salad.md',
+               'field_name.yml',
+               'import_include.md',
+               'link_res.yml',
+               'ident_res.yml',
+               'vocab_res.yml',
+               'vocab_res.yml',
+               'field_name_schema.yml',
+               'field_name_src.yml',
+               'field_name_proc.yml',
+               'ident_res_schema.yml',
+               'ident_res_src.yml',
+               'ident_res_proc.yml',
+               'link_res_schema.yml',
+               'link_res_src.yml',
+               'link_res_proc.yml',
+               'vocab_res_schema.yml',
+               'vocab_res_src.yml',
+               'vocab_res_proc.yml')
+
+SCHEMA_CACHE = {}  # type: Dict[Text, Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text, Any], Loader]]
+SCHEMA_FILE = None  # type: Optional[Dict[Text, Any]]
+SCHEMA_DIR = None  # type: Optional[Dict[Text, Any]]
+SCHEMA_ANY = None  # type: Optional[Dict[Text, Any]]
+
+custom_schemas = {}  # type: Dict[Text, Tuple[Text, Text]]
+
+def use_standard_schema(version):
+    # type: (Text) -> None
+    if version in custom_schemas:
+        del custom_schemas[version]
+    if version in SCHEMA_CACHE:
+        del SCHEMA_CACHE[version]
+
+def use_custom_schema(version, name, text):
+    # type: (Text, Text, Union[Text, bytes]) -> None
+    if isinstance(text, bytes):
+        text2 = text.decode()
+    else:
+        text2 = text
+    custom_schemas[version] = (name, text2)
+    if version in SCHEMA_CACHE:
+        del SCHEMA_CACHE[version]
+
+def get_schema(version):
+    # type: (Text) -> Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text,Any], Loader]
+
+    if version in SCHEMA_CACHE:
+        return SCHEMA_CACHE[version]
+
+    cache = {}  # type: Dict[Text, Any]
+    version = version.split("#")[-1]
+    if '.dev' in version:
+        version = ".".join(version.split(".")[:-1])
+    for f in cwl_files:
+        try:
+            res = resource_stream(__name__, 'schemas/%s/%s' % (version, f))
+            cache["https://w3id.org/cwl/" + f] = res.read()
+            res.close()
+        except IOError:
+            pass
+
+    for f in salad_files:
+        try:
+            res = resource_stream(
+                __name__, 'schemas/{}/salad/schema_salad/metaschema/{}'.format(
+                    version, f))
+            cache["https://w3id.org/cwl/salad/schema_salad/metaschema/"
+                  + f] = res.read()
+            res.close()
+        except IOError:
+            pass
+
+    if version in custom_schemas:
+        cache[custom_schemas[version][0]] = custom_schemas[version][1]
+        SCHEMA_CACHE[version] = schema.load_schema(
+            custom_schemas[version][0], cache=cache)
+    else:
+        SCHEMA_CACHE[version] = schema.load_schema(
+            "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache)
+
+    return SCHEMA_CACHE[version]
+
+
+def shortname(inputid):
+    # type: (Text) -> Text
+    d = urllib.parse.urlparse(inputid)
+    if d.fragment:
+        return d.fragment.split(u"/")[-1]
+    return d.path.split(u"/")[-1]
+
+
+def checkRequirements(rec, supported_process_requirements):
+    # type: (Any, Iterable[Any]) -> None
+    if isinstance(rec, MutableMapping):
+        if "requirements" in rec:
+            for i, entry in enumerate(rec["requirements"]):
+                with SourceLine(rec["requirements"], i, UnsupportedRequirement):
+                    if entry["class"] not in supported_process_requirements:
+                        raise UnsupportedRequirement(
+                            u"Unsupported requirement {}".format(entry["class"]))
+        for key in rec:
+            checkRequirements(rec[key], supported_process_requirements)
+    if isinstance(rec, MutableSequence):
+        for entry in rec:
+            checkRequirements(entry, supported_process_requirements)
+
+
+def stage_files(pathmapper,             # type: PathMapper
+                stage_func=None,        # type: Optional[Callable[..., Any]]
+                ignore_writable=False,  # type: bool
+                symlink=True,           # type: bool
+                secret_store=None,      # type: Optional[SecretStore]
+                fix_conflicts=False     # type: bool
+               ):  # type: (...) -> None
+    """Link or copy files to their targets. Create them as needed."""
+
+    targets = {}  # type: Dict[Text, MapperEnt]
+    for key, entry in pathmapper.items():
+        if not 'File' in entry.type:
+            continue
+        if entry.target not in targets:
+            targets[entry.target] = entry
+        elif targets[entry.target].resolved != entry.resolved:
+            if fix_conflicts:
+                tgt = entry.target
+                i = 2
+                tgt = "%s_%s" % (tgt, i)
+                while tgt in targets:
+                    i += 1
+                    tgt = "%s_%s" % (tgt, i)
+                targets[tgt] = pathmapper.update(key, entry.resolved, tgt, entry.type, entry.staged)
+            else:
+                raise WorkflowException("File staging conflict, trying to stage both %s and %s to the same target %s" % (
+                    targets[entry.target].resolved, entry.resolved, entry.target))
+
+    for key, entry in pathmapper.items():
+        if not entry.staged:
+            continue
+        if not os.path.exists(os.path.dirname(entry.target)):
+            os.makedirs(os.path.dirname(entry.target))
+        if entry.type in ("File", "Directory") and os.path.exists(entry.resolved):
+            if symlink:  # Use symlink func if allowed
+                if onWindows():
+                    if entry.type == "File":
+                        shutil.copy(entry.resolved, entry.target)
+                    elif entry.type == "Directory":
+                        if os.path.exists(entry.target) \
+                                and os.path.isdir(entry.target):
+                            shutil.rmtree(entry.target)
+                        copytree_with_merge(entry.resolved, entry.target)
+                else:
+                    os.symlink(entry.resolved, entry.target)
+            elif stage_func is not None:
+                stage_func(entry.resolved, entry.target)
+        elif entry.type == "Directory" and not os.path.exists(entry.target) \
+                and entry.resolved.startswith("_:"):
+            os.makedirs(entry.target)
+        elif entry.type == "WritableFile" and not ignore_writable:
+            shutil.copy(entry.resolved, entry.target)
+            ensure_writable(entry.target)
+        elif entry.type == "WritableDirectory" and not ignore_writable:
+            if entry.resolved.startswith("_:"):
+                os.makedirs(entry.target)
+            else:
+                shutil.copytree(entry.resolved, entry.target)
+                ensure_writable(entry.target)
+        elif entry.type == "CreateFile" or entry.type == "CreateWritableFile":
+            with open(entry.target, "wb") as new:
+                if secret_store is not None:
+                    new.write(
+                        secret_store.retrieve(entry.resolved).encode("utf-8"))
+                else:
+                    new.write(entry.resolved.encode("utf-8"))
+            if entry.type == "CreateFile":
+                os.chmod(entry.target, stat.S_IRUSR)  # Read only
+            else:  # it is a "CreateWritableFile"
+                ensure_writable(entry.target)
+            pathmapper.update(
+                key, entry.target, entry.target, entry.type, entry.staged)
+
+
+def relocateOutputs(outputObj,              # type: Union[Dict[Text, Any], List[Dict[Text, Any]]]
+                    destination_path,       # type: Text
+                    source_directories,     # type: Set[Text]
+                    action,                 # type: Text
+                    fs_access,              # type: StdFsAccess
+                    compute_checksum=True,  # type: bool
+                    path_mapper=PathMapper  # type: Type[PathMapper]
+                    ):
+    # type: (...) -> Union[Dict[Text, Any], List[Dict[Text, Any]]]
+    adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True))
+
+    if action not in ("move", "copy"):
+        return outputObj
+
+    def _collectDirEntries(obj):
+        # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> Iterator[Dict[Text, Any]]
+        if isinstance(obj, dict):
+            if obj.get("class") in ("File", "Directory"):
+                yield obj
+            else:
+                for sub_obj in obj.values():
+                    for dir_entry in _collectDirEntries(sub_obj):
+                        yield dir_entry
+        elif isinstance(obj, MutableSequence):
+            for sub_obj in obj:
+                for dir_entry in _collectDirEntries(sub_obj):
+                    yield dir_entry
+
+    def _relocate(src, dst):  # type: (Text, Text) -> None
+        if src == dst:
+            return
+
+        # If the source is not contained in source_directories we're not allowed to delete it
+        src = fs_access.realpath(src)
+        src_can_deleted = any(os.path.commonprefix([p, src]) == p for p in source_directories)
+
+        _action = "move" if action == "move" and src_can_deleted else "copy"
+
+        if _action == "move":
+            _logger.debug("Moving %s to %s", src, dst)
+            if fs_access.isdir(src) and fs_access.isdir(dst):
+                # merge directories
+                for dir_entry in scandir(src):
+                    _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name))
+            else:
+                shutil.move(src, dst)
+
+        elif _action == "copy":
+            _logger.debug("Copying %s to %s", src, dst)
+            if fs_access.isdir(src):
+                if os.path.isdir(dst):
+                    shutil.rmtree(dst)
+                elif os.path.isfile(dst):
+                    os.unlink(dst)
+                shutil.copytree(src, dst)
+            else:
+                shutil.copy2(src, dst)
+
+    def _realpath(ob):  # type: (Dict[Text, Any]) -> None
+        if ob["location"].startswith("file:"):
+            ob["location"] = file_uri(os.path.realpath(uri_file_path(ob["location"])))
+        if ob["location"].startswith("/"):
+            ob["location"] = os.path.realpath(ob["location"])
+
+    outfiles = list(_collectDirEntries(outputObj))
+    visit_class(outfiles, ("File", "Directory"), _realpath)
+    pm = path_mapper(outfiles, "", destination_path, separateDirs=False)
+    stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True)
+
+    def _check_adjust(a_file):  # type: (Dict[Text, Text]) -> Dict[Text, Text]
+        a_file["location"] = file_uri(pm.mapper(a_file["location"])[1])
+        if "contents" in a_file:
+            del a_file["contents"]
+        return a_file
+
+    visit_class(outputObj, ("File", "Directory"), _check_adjust)
+
+    if compute_checksum:
+        visit_class(outputObj, ("File",), functools.partial(
+            compute_checksums, fs_access))
+    return outputObj
+
+
+def cleanIntermediate(output_dirs):  # type: (Iterable[Text]) -> None
+    for a in output_dirs:
+        if os.path.exists(a):
+            _logger.debug(u"Removing intermediate output directory %s", a)
+            shutil.rmtree(a, True)
+
+def add_sizes(fsaccess, obj):  # type: (StdFsAccess, Dict[Text, Any]) -> None
+    if 'location' in obj:
+        try:
+            if "size" not in obj:
+                obj["size"] = fsaccess.size(obj["location"])
+        except OSError:
+            pass
+    elif 'contents' in obj:
+        obj["size"] = len(obj['contents'])
+    else:
+        return  # best effort
+
+def fill_in_defaults(inputs,   # type: List[Dict[Text, Text]]
+                     job,      # type: Dict[Text, expression.JSON]
+                     fsaccess  # type: StdFsAccess
+                    ):  # type: (...) -> None
+    for e, inp in enumerate(inputs):
+        with SourceLine(inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
+            fieldname = shortname(inp[u"id"])
+            if job.get(fieldname) is not None:
+                pass
+            elif job.get(fieldname) is None and u"default" in inp:
+                job[fieldname] = copy.deepcopy(inp[u"default"])
+            elif job.get(fieldname) is None and u"null" in aslist(inp[u"type"]):
+                job[fieldname] = None
+            else:
+                raise WorkflowException("Missing required input parameter '%s'" % shortname(inp["id"]))
+
+
+def avroize_type(field_type, name_prefix=""):
+    # type: (Union[List[Dict[Text, Any]], Dict[Text, Any]], Text) -> Any
+    """Add missing information to a type so that CWL types are valid."""
+    if isinstance(field_type, MutableSequence):
+        for field in field_type:
+            avroize_type(field, name_prefix)
+    elif isinstance(field_type, MutableMapping):
+        if field_type["type"] in ("enum", "record"):
+            if "name" not in field_type:
+                field_type["name"] = name_prefix + Text(uuid.uuid4())
+        if field_type["type"] == "record":
+            avroize_type(field_type["fields"], name_prefix)
+        if field_type["type"] == "array":
+            avroize_type(field_type["items"], name_prefix)
+        if isinstance(field_type["type"], MutableSequence):
+            for ctype in field_type["type"]:
+                avroize_type(ctype, name_prefix)
+    return field_type
+
+def get_overrides(overrides, toolid):  # type: (List[Dict[Text, Any]], Text) -> Dict[Text, Any]
+    req = {}  # type: Dict[Text, Any]
+    if not isinstance(overrides, MutableSequence):
+        raise validate.ValidationException("Expected overrides to be a list, but was %s" % type(overrides))
+    for ov in overrides:
+        if ov["overrideTarget"] == toolid:
+            req.update(ov)
+    return req
+
+
+_VAR_SPOOL_ERROR = textwrap.dedent(
+    """
+    Non-portable reference to /var/spool/cwl detected: '{}'.
+    To fix, replace /var/spool/cwl with $(runtime.outdir) or add
+    DockerRequirement to the 'requirements' section and declare
+    'dockerOutputDirectory: /var/spool/cwl'.
+    """)
+
+
+def var_spool_cwl_detector(obj,           # type: Union[MutableMapping[Text, Text], List[Dict[Text, Any]], Text]
+                           item=None,     # type: Optional[Any]
+                           obj_key=None,  # type: Optional[Any]
+                          ):              # type: (...)->bool
+    """Detect any textual reference to /var/spool/cwl."""
+    r = False
+    if isinstance(obj, string_types):
+        if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory":
+            _logger.warning(
+                SourceLine(item=item, key=obj_key, raise_type=Text).makeError(
+                    _VAR_SPOOL_ERROR.format(obj)))
+            r = True
+    elif isinstance(obj, MutableMapping):
+        for mkey, mvalue in iteritems(obj):
+            r = var_spool_cwl_detector(mvalue, obj, mkey) or r
+    elif isinstance(obj, MutableSequence):
+        for lkey, lvalue in enumerate(obj):
+            r = var_spool_cwl_detector(lvalue, obj, lkey) or r
+    return r
+
+def eval_resource(builder, resource_req):  # type: (Builder, Text) -> Any
+    if expression.needs_parsing(resource_req):
+        return builder.do_eval(resource_req)
+    return resource_req
+
+
+# Threshold where the "too many files" warning kicks in
+FILE_COUNT_WARNING = 5000
+
+class Process(with_metaclass(abc.ABCMeta, HasReqsHints)):
+    def __init__(self,
+                 toolpath_object,      # type: MutableMapping[Text, Any]
+                 loadingContext        # type: LoadingContext
+                ):  # type: (...) -> None
+        """Build a Process object from the provided dictionary."""
+        self.metadata = getdefault(loadingContext.metadata, {})  # type: Dict[Text,Any]
+        self.provenance_object = None  # type: Optional[ProvenanceProfile]
+        self.parent_wf = None          # type: Optional[ProvenanceProfile]
+        global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY  # pylint: disable=global-statement
+        if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None:
+            get_schema("v1.0")
+            SCHEMA_ANY = cast(Dict[Text, Any],
+                              SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"])
+            SCHEMA_FILE = cast(Dict[Text, Any],
+                               SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"])
+            SCHEMA_DIR = cast(Dict[Text, Any],
+                              SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"])
+
+        self.names = schema.make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY],
+                                             Loader({}))
+        self.tool = toolpath_object
+        self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
+        self.requirements.extend(self.tool.get("requirements", []))
+        if "id" not in self.tool:
+            self.tool["id"] = "_:" + Text(uuid.uuid4())
+        self.requirements.extend(get_overrides(getdefault(loadingContext.overrides_list, []),
+                                               self.tool["id"]).get("requirements", []))
+        self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
+        self.hints.extend(self.tool.get("hints", []))
+        # Versions of requirements and hints which aren't mutated.
+        self.original_requirements = copy.deepcopy(self.requirements)
+        self.original_hints = copy.deepcopy(self.hints)
+        self.doc_loader = loadingContext.loader
+        self.doc_schema = loadingContext.avsc_names
+
+        self.formatgraph = None  # type: Optional[Graph]
+        if self.doc_loader is not None:
+            self.formatgraph = self.doc_loader.graph
+
+        checkRequirements(self.tool, supportedProcessRequirements)
+        self.validate_hints(loadingContext.avsc_names, self.tool.get("hints", []),
+                            strict=getdefault(loadingContext.strict, False))
+
+        self.schemaDefs = {}  # type: Dict[Text,Dict[Text, Any]]
+
+        sd, _ = self.get_requirement("SchemaDefRequirement")
+
+        if sd is not None:
+            sdtypes = avroize_type(sd["types"])
+            av = schema.make_valid_avro(sdtypes, {t["name"]: t for t in sdtypes}, set())
+            for i in av:
+                self.schemaDefs[i["name"]] = i  # type: ignore
+            schema.make_avsc_object(schema.convert_to_dict(av), self.names)
+
+        # Build record schema from inputs
+        self.inputs_record_schema = {
+            "name": "input_record_schema", "type": "record",
+            "fields": []}  # type: Dict[Text, Any]
+        self.outputs_record_schema = {
+            "name": "outputs_record_schema", "type": "record",
+            "fields": []}  # type: Dict[Text, Any]
+
+        for key in ("inputs", "outputs"):
+            for i in self.tool[key]:
+                c = copy.deepcopy(i)
+                c["name"] = shortname(c["id"])
+                del c["id"]
+
+                if "type" not in c:
+                    raise validate.ValidationException(
+                        u"Missing 'type' in parameter '{}'".format(c["name"]))
+
+                if "default" in c and "null" not in aslist(c["type"]):
+                    nullable = ["null"]
+                    nullable.extend(aslist(c["type"]))
+                    c["type"] = nullable
+                else:
+                    c["type"] = c["type"]
+                c["type"] = avroize_type(c["type"], c["name"])
+                if key == "inputs":
+                    self.inputs_record_schema["fields"].append(c)
+                elif key == "outputs":
+                    self.outputs_record_schema["fields"].append(c)
+
+        with SourceLine(toolpath_object, "inputs", validate.ValidationException):
+            self.inputs_record_schema = cast(
+                Dict[Text, Any], schema.make_valid_avro(
+                    self.inputs_record_schema, {}, set()))
+            schema.make_avsc_object(
+                schema.convert_to_dict(self.inputs_record_schema), self.names)
+        with SourceLine(toolpath_object, "outputs", validate.ValidationException):
+            self.outputs_record_schema = cast(
+                Dict[Text, Any],
+                schema.make_valid_avro(self.outputs_record_schema, {}, set()))
+            schema.make_avsc_object(
+                schema.convert_to_dict(self.outputs_record_schema), self.names)
+
+        if toolpath_object.get("class") is not None \
+                and not getdefault(loadingContext.disable_js_validation, False):
+            if loadingContext.js_hint_options_file is not None:
+                try:
+                    with open(loadingContext.js_hint_options_file) as options_file:
+                        validate_js_options = json.load(options_file)
+                except (OSError, ValueError) as err:
+                    _logger.error(
+                        "Failed to read options file %s",
+                        loadingContext.js_hint_options_file)
+                    raise
+            else:
+                validate_js_options = None
+            if self.doc_schema is not None:
+                validate_js_expressions(
+                    cast(CommentedMap, toolpath_object),
+                    self.doc_schema.names[toolpath_object["class"]],
+                    validate_js_options)
+
+        dockerReq, is_req = self.get_requirement("DockerRequirement")
+
+        if dockerReq is not None and "dockerOutputDirectory" in dockerReq\
+                and is_req is not None and not is_req:
+            _logger.warning(SourceLine(
+                item=dockerReq, raise_type=Text).makeError(
+                    "When 'dockerOutputDirectory' is declared, DockerRequirement "
+                    "should go in the 'requirements' section, not 'hints'."""))
+
+        if dockerReq is not None and is_req is not None\
+                and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl":
+            if is_req:
+                # In this specific case, it is legal to have /var/spool/cwl, so skip the check.
+                pass
+            else:
+                # Must be a requirement
+                var_spool_cwl_detector(self.tool)
+        else:
+            var_spool_cwl_detector(self.tool)
+
+    def _init_job(self, joborder, runtime_context):
+        # type: (Mapping[Text, Text], RuntimeContext) -> Builder
+
+        if self.metadata.get("cwlVersion") != INTERNAL_VERSION:
+            raise WorkflowException("Process object loaded with version '%s', must update to '%s' in order to execute." % (
+                self.metadata.get("cwlVersion"), INTERNAL_VERSION))
+
+        job = cast(Dict[Text, expression.JSON], copy.deepcopy(joborder))
+
+        make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess)
+        fs_access = make_fs_access(runtime_context.basedir)
+
+        load_listing_req, _ = self.get_requirement(
+            "LoadListingRequirement")
+
+        if load_listing_req is not None:
+            load_listing = load_listing_req.get("loadListing")
+        else:
+            load_listing = "no_listing"
+
+        # Validate job order
+        try:
+            fill_in_defaults(self.tool[u"inputs"], job, fs_access)
+
+            normalizeFilesDirs(job)
+            schema = self.names.get_name("input_record_schema", "")
+            if schema is None:
+                raise WorkflowException("Missing input record schema: "
+                    "{}".format(self.names))
+            validate.validate_ex(schema, job, strict=False,
+                                 logger=_logger_validation_warnings)
+
+            if load_listing and load_listing != "no_listing":
+                get_listing(fs_access, job, recursive=(load_listing == "deep_listing"))
+
+            visit_class(job, ("File",), functools.partial(add_sizes, fs_access))
+
+            if load_listing == "deep_listing":
+                for i, inparm in enumerate(self.tool["inputs"]):
+                    k = shortname(inparm["id"])
+                    if k not in job:
+                        continue
+                    v = job[k]
+                    dircount = [0]
+
+                    def inc(d):  # type: (List[int]) -> None
+                        d[0] += 1
+                    visit_class(v, ("Directory",), lambda x: inc(dircount))
+                    if dircount[0] == 0:
+                        continue
+                    filecount = [0]
+                    visit_class(v, ("File",), lambda x: inc(filecount))
+                    if filecount[0] > FILE_COUNT_WARNING:
+                        # Long lines in this message are okay, will be reflowed based on terminal columns.
+                        _logger.warning(strip_dup_lineno(SourceLine(self.tool["inputs"], i, Text).makeError(
+                    """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'.  This may negatively affect workflow performance and memory use.
+
+If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior:
+
+$namespaces:
+  cwltool: "http://commonwl.org/cwltool#"
+hints:
+  cwltool:LoadListingRequirement:
+    loadListing: shallow_listing
+
+""" % (filecount[0], k))))
+
+        except (validate.ValidationException, WorkflowException) as err:
+            raise_from(WorkflowException("Invalid job input record:\n" + Text(err)), err)
+
+        files = []  # type: List[Dict[Text, Text]]
+        bindings = CommentedSeq()
+        tmpdir = u""
+        stagedir = u""
+
+        docker_req, _ = self.get_requirement("DockerRequirement")
+        default_docker = None
+
+        if docker_req is None and runtime_context.default_container:
+            default_docker = runtime_context.default_container
+
+        if (docker_req or default_docker) and runtime_context.use_container:
+            if docker_req is not None:
+                # Check if docker output directory is absolute
+                if docker_req.get("dockerOutputDirectory") and \
+                        docker_req.get("dockerOutputDirectory").startswith('/'):
+                    outdir = docker_req.get("dockerOutputDirectory")
+                else:
+                    outdir = docker_req.get("dockerOutputDirectory") or \
+                        runtime_context.docker_outdir or random_outdir()
+            elif default_docker is not None:
+                outdir = runtime_context.docker_outdir or random_outdir()
+            tmpdir = runtime_context.docker_tmpdir or "/tmp"  # nosec
+            stagedir = runtime_context.docker_stagedir or "/var/lib/cwl"
+        else:
+            outdir = fs_access.realpath(
+                runtime_context.outdir or tempfile.mkdtemp(
+                    prefix=getdefault(runtime_context.tmp_outdir_prefix,
+                                      DEFAULT_TMP_PREFIX)))
+            if self.tool[u"class"] != 'Workflow':
+                tmpdir = fs_access.realpath(runtime_context.tmpdir
+                                            or tempfile.mkdtemp())
+                stagedir = fs_access.realpath(runtime_context.stagedir
+                                              or tempfile.mkdtemp())
+
+        builder = Builder(job,
+                          files,
+                          bindings,
+                          self.schemaDefs,
+                          self.names,
+                          self.requirements,
+                          self.hints,
+                          {},
+                          runtime_context.mutation_manager,
+                          self.formatgraph,
+                          make_fs_access,
+                          fs_access,
+                          runtime_context.job_script_provider,
+                          runtime_context.eval_timeout,
+                          runtime_context.debug,
+                          runtime_context.js_console,
+                          runtime_context.force_docker_pull,
+                          load_listing,
+                          outdir,
+                          tmpdir,
+                          stagedir)
+
+        bindings.extend(builder.bind_input(
+            self.inputs_record_schema, job,
+            discover_secondaryFiles=getdefault(runtime_context.toplevel, False)))
+
+        if self.tool.get("baseCommand"):
+            for index, command in enumerate(aslist(self.tool["baseCommand"])):
+                bindings.append({
+                    "position": [-1000000, index],
+                    "datum": command
+                })
+
+        if self.tool.get("arguments"):
+            for i, arg in enumerate(self.tool["arguments"]):
+                lc = self.tool["arguments"].lc.data[i]
+                filename = self.tool["arguments"].lc.filename
+                bindings.lc.add_kv_line_col(len(bindings), lc)
+                if isinstance(arg, MutableMapping):
+                    arg = copy.deepcopy(arg)
+                    if arg.get("position"):
+                        position = arg.get("position")
+                        if isinstance(position, str):  # no need to test the
+                                                       # CWLVersion as the v1.0
+                                                       # schema only allows ints
+                            position = builder.do_eval(position)
+                            if position is None:
+                                position = 0
+                        arg["position"] = [position, i]
+                    else:
+                        arg["position"] = [0, i]
+                    bindings.append(arg)
+                elif ("$(" in arg) or ("${" in arg):
+                    cm = CommentedMap((
+                        ("position", [0, i]),
+                        ("valueFrom", arg)
+                    ))
+                    cm.lc.add_kv_line_col("valueFrom", lc)
+                    cm.lc.filename = filename
+                    bindings.append(cm)
+                else:
+                    cm = CommentedMap((
+                        ("position", [0, i]),
+                        ("datum", arg)
+                    ))
+                    cm.lc.add_kv_line_col("datum", lc)
+                    cm.lc.filename = filename
+                    bindings.append(cm)
+
+        # use python2 like sorting of heterogeneous lists
+        # (containing str and int types),
+        if PY3:
+            key = functools.cmp_to_key(cmp_like_py2)
+        else:  # PY2
+            key = lambda d: d["position"]
+
+        # This awkward construction replaces the contents of
+        # "bindings" in place (because Builder expects it to be
+        # mutated in place, sigh, I'm sorry) with its contents sorted,
+        # supporting different versions of Python and ruamel.yaml with
+        # different behaviors/bugs in CommentedSeq.
+        bindings_copy = copy.deepcopy(bindings)
+        del bindings[:]
+        bindings.extend(sorted(bindings_copy, key=key))
+
+        if self.tool[u"class"] != 'Workflow':
+            builder.resources = self.evalResources(builder, runtime_context)
+        return builder
+
+    def evalResources(self, builder, runtimeContext):
+        # type: (Builder, RuntimeContext) -> Dict[str, int]
+        resourceReq, _ = self.get_requirement("ResourceRequirement")
+        if resourceReq is None:
+            resourceReq = {}
+        cwl_version = self.metadata.get(
+            "http://commonwl.org/cwltool#original_cwlVersion", None)
+        if cwl_version == "v1.0":
+            ram = 1024
+        else:
+            ram = 256
+        request = {
+            "coresMin": 1,
+            "coresMax": 1,
+            "ramMin": ram,
+            "ramMax": ram,
+            "tmpdirMin": 1024,
+            "tmpdirMax": 1024,
+            "outdirMin": 1024,
+            "outdirMax": 1024
+        }  # type: Dict[str, int]
+        for a in ("cores", "ram", "tmpdir", "outdir"):
+            mn = None
+            mx = None
+            if resourceReq.get(a + "Min"):
+                mn = eval_resource(builder, resourceReq[a + "Min"])
+            if resourceReq.get(a + "Max"):
+                mx = eval_resource(builder, resourceReq[a + "Max"])
+            if mn is None:
+                mn = mx
+            elif mx is None:
+                mx = mn
+
+            if mn is not None:
+                request[a + "Min"] = cast(int, mn)
+                request[a + "Max"] = cast(int, mx)
+
+        if runtimeContext.select_resources is not None:
+            return runtimeContext.select_resources(request, runtimeContext)
+        return {
+            "cores": request["coresMin"],
+            "ram": request["ramMin"],
+            "tmpdirSize": request["tmpdirMin"],
+            "outdirSize": request["outdirMin"],
+        }
+
+    def validate_hints(self, avsc_names, hints, strict):
+        # type: (Any, List[Dict[Text, Any]], bool) -> None
+        for i, r in enumerate(hints):
+            sl = SourceLine(hints, i, validate.ValidationException)
+            with sl:
+                if avsc_names.get_name(r["class"], "") is not None and self.doc_loader is not None:
+                    plain_hint = dict((key, r[key]) for key in r if key not in
+                                      self.doc_loader.identifiers)  # strip identifiers
+                    validate.validate_ex(
+                        avsc_names.get_name(plain_hint["class"], ""),
+                        plain_hint, strict=strict)
+                elif r["class"] in ("NetworkAccess", "LoadListingRequirement"):
+                    pass
+                else:
+                    _logger.info(Text(sl.makeError(u"Unknown hint %s" % (r["class"]))))
+
+    def visit(self, op):  # type: (Callable[[MutableMapping[Text, Any]], None]) -> None
+        op(self.tool)
+
+    @abc.abstractmethod
+    def job(self,
+            job_order,         # type: Mapping[Text, Text]
+            output_callbacks,  # type: Callable[[Any, Any], Any]
+            runtimeContext     # type: RuntimeContext
+           ):  # type: (...) -> Generator[Any, None, None]
+        # FIXME: Declare base type for what Generator yields
+        pass
+
+
+_names = set()  # type: Set[Text]
+
+
+def uniquename(stem, names=None):  # type: (Text, Optional[Set[Text]]) -> Text
+    global _names
+    if names is None:
+        names = _names
+    c = 1
+    u = stem
+    while u in names:
+        c += 1
+        u = u"%s_%s" % (stem, c)
+    names.add(u)
+    return u
+
+
+def nestdir(base, deps):
+    # type: (Text, Dict[Text, Any]) -> Dict[Text, Any]
+    dirname = os.path.dirname(base) + "/"
+    subid = deps["location"]
+    if subid.startswith(dirname):
+        s2 = subid[len(dirname):]
+        sp = s2.split('/')
+        sp.pop()
+        while sp:
+            nx = sp.pop()
+            deps = {
+                "class": "Directory",
+                "basename": nx,
+                "listing": [deps]
+            }
+    return deps
+
+
+def mergedirs(listing):
+    # type: (List[Dict[Text, Any]]) -> List[Dict[Text, Any]]
+    r = []  # type: List[Dict[Text, Any]]
+    ents = {}  # type: Dict[Text, Any]
+    collided = set()  # type: Set[Text]
+    for e in listing:
+        if e["basename"] not in ents:
+            ents[e["basename"]] = e
+        elif e["class"] == "Directory":
+            if e.get("listing"):
+                ents[e["basename"]].setdefault("listing", []).extend(e["listing"])
+            if ents[e["basename"]]["location"].startswith("_:"):
+                ents[e["basename"]]["location"] = e["location"]
+        elif e["location"] != ents[e["basename"]]["location"]:
+            # same basename, different location, collision,
+            # rename both.
+            collided.add(e["basename"])
+            e2 = ents[e["basename"]]
+
+            e["basename"] = urllib.parse.quote(e["location"], safe="")
+            e2["basename"] = urllib.parse.quote(e2["location"], safe="")
+
+            e["nameroot"], e["nameext"] = os.path.splitext(e["basename"])
+            e2["nameroot"], e2["nameext"] = os.path.splitext(e2["basename"])
+
+            ents[e["basename"]] = e
+            ents[e2["basename"]] = e2
+    for c in collided:
+        del ents[c]
+    for e in itervalues(ents):
+        if e["class"] == "Directory" and "listing" in e:
+            e["listing"] = mergedirs(e["listing"])
+    r.extend(itervalues(ents))
+    return r
+
+
+CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
+
+def scandeps(base,                          # type: Text
+             doc,                           # type: Any
+             reffields,                     # type: Set[Text]
+             urlfields,                     # type: Set[Text]
+             loadref,                       # type: Callable[[Text, Text], Text]
+             urljoin=urllib.parse.urljoin,  # type: Callable[[Text, Text], Text]
+             nestdirs=True                  # type: bool
+            ):  # type: (...) -> List[Dict[Text, Text]]
+    r = []  # type: List[Dict[Text, Text]]
+    if isinstance(doc, MutableMapping):
+        if "id" in doc:
+            if doc["id"].startswith("file://"):
+                df, _ = urllib.parse.urldefrag(doc["id"])
+                if base != df:
+                    r.append({
+                        "class": "File",
+                        "location": df,
+                        "format": CWL_IANA
+                    })
+                    base = df
+
+        if doc.get("class") in ("File", "Directory") and "location" in urlfields:
+            u = doc.get("location", doc.get("path"))
+            if u and not u.startswith("_:"):
+                deps = {"class": doc["class"],
+                        "location": urljoin(base, u)
+                       }  # type: Dict[Text, Any]
+                if "basename" in doc:
+                    deps["basename"] = doc["basename"]
+                if doc["class"] == "Directory" and "listing" in doc:
+                    deps["listing"] = doc["listing"]
+                if doc["class"] == "File" and "secondaryFiles" in doc:
+                    deps["secondaryFiles"] = doc["secondaryFiles"]
+                if nestdirs:
+                    deps = nestdir(base, deps)
+                r.append(deps)
+            else:
+                if doc["class"] == "Directory" and "listing" in doc:
+                    r.extend(scandeps(
+                        base, doc["listing"], reffields, urlfields, loadref,
+                        urljoin=urljoin, nestdirs=nestdirs))
+                elif doc["class"] == "File" and "secondaryFiles" in doc:
+                    r.extend(scandeps(
+                        base, doc["secondaryFiles"], reffields, urlfields,
+                        loadref, urljoin=urljoin, nestdirs=nestdirs))
+
+        for k, v in iteritems(doc):
+            if k in reffields:
+                for u in aslist(v):
+                    if isinstance(u, MutableMapping):
+                        r.extend(scandeps(
+                            base, u, reffields, urlfields, loadref,
+                            urljoin=urljoin, nestdirs=nestdirs))
+                    else:
+                        subid = urljoin(base, u)
+                        basedf, _ = urllib.parse.urldefrag(base)
+                        subiddf, _ = urllib.parse.urldefrag(subid)
+                        if basedf == subiddf:
+                            continue
+                        sub = loadref(base, u)
+                        deps = {
+                            "class": "File",
+                            "location": subid,
+                            "format": CWL_IANA
+                        }
+                        sf = scandeps(
+                            subid, sub, reffields, urlfields, loadref,
+                            urljoin=urljoin, nestdirs=nestdirs)
+                        if sf:
+                            deps["secondaryFiles"] = sf
+                        if nestdirs:
+                            deps = nestdir(base, deps)
+                        r.append(deps)
+            elif k in urlfields and k != "location":
+                for u in aslist(v):
+                    deps = {
+                        "class": "File",
+                        "location": urljoin(base, u)
+                    }
+                    if nestdirs:
+                        deps = nestdir(base, deps)
+                    r.append(deps)
+            elif k not in ("listing", "secondaryFiles"):
+                r.extend(scandeps(
+                    base, v, reffields, urlfields, loadref, urljoin=urljoin,
+                    nestdirs=nestdirs))
+    elif isinstance(doc, MutableSequence):
+        for d in doc:
+            r.extend(scandeps(
+                base, d, reffields, urlfields, loadref, urljoin=urljoin,
+                nestdirs=nestdirs))
+
+    if r:
+        normalizeFilesDirs(r)
+        r = mergedirs(r)
+
+    return r
+
+
+def compute_checksums(fs_access, fileobj):  # type: (StdFsAccess, Dict[Text, Any]) -> None
+    if "checksum" not in fileobj:
+        checksum = hashlib.sha1()  # nosec
+        with fs_access.open(fileobj["location"], "rb") as f:
+            contents = f.read(1024 * 1024)
+            while contents != b"":
+                checksum.update(contents)
+                contents = f.read(1024 * 1024)
+        fileobj["checksum"] = "sha1$%s" % checksum.hexdigest()
+        fileobj["size"] = fs_access.size(fileobj["location"])