diff env/lib/python3.7/site-packages/cwltool/builder.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/builder.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,451 +0,0 @@
-from __future__ import absolute_import
-
-import copy
-import os
-import logging
-from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence,
-                    Optional, Set, Tuple, Union)
-
-from typing_extensions import Text, Type, TYPE_CHECKING  # pylint: disable=unused-import
-# move to a regular typing import when Python 3.3-3.6 is no longer supported
-
-from rdflib import Graph, URIRef  # pylint: disable=unused-import
-from rdflib.namespace import OWL, RDFS
-from ruamel.yaml.comments import CommentedMap
-from schema_salad import validate
-from schema_salad.schema import Names, convert_to_dict
-from schema_salad.avro.schema import make_avsc_object, Schema
-from schema_salad.sourceline import SourceLine
-from schema_salad.ref_resolver import uri_file_path
-from six import iteritems, string_types
-from future.utils import raise_from
-from typing import IO
-from typing_extensions import (TYPE_CHECKING,  # pylint: disable=unused-import
-                               Text, Type)
-# move to a regular typing import when Python 3.3-3.6 is no longer supported
-
-from . import expression
-from .errors import WorkflowException
-from .loghandler import _logger
-from .mutation import MutationManager  # pylint: disable=unused-import
-from .pathmapper import PathMapper  # pylint: disable=unused-import
-from .pathmapper import CONTENT_LIMIT, get_listing, normalizeFilesDirs, visit_class
-from .stdfsaccess import StdFsAccess  # pylint: disable=unused-import
-from .utils import aslist, docker_windows_path_adjust, json_dumps, onWindows
-
-
-
-if TYPE_CHECKING:
-    from .provenance import ProvenanceProfile  # pylint: disable=unused-import
-
-
-def content_limit_respected_read_bytes(f):  # type: (IO[bytes]) -> bytes
-    contents = f.read(CONTENT_LIMIT + 1)
-    if len(contents) > CONTENT_LIMIT:
-        raise WorkflowException("loadContents handling encountered buffer that is exceeds maximum lenght of %d bytes" % CONTENT_LIMIT)
-    return contents
-
-
-def content_limit_respected_read(f):  # type: (IO[bytes]) -> Text
-    return content_limit_respected_read_bytes(f).decode("utf-8")
-
-
-def substitute(value, replace):  # type: (Text, Text) -> Text
-    if replace.startswith("^"):
-        try:
-            return substitute(value[0:value.rindex('.')], replace[1:])
-        except ValueError:
-            # No extension to remove
-            return value + replace.lstrip("^")
-    return value + replace
-
-def formatSubclassOf(fmt, cls, ontology, visited):
-    # type: (Text, Text, Optional[Graph], Set[Text]) -> bool
-    """Determine if `fmt` is a subclass of `cls`."""
-    if URIRef(fmt) == URIRef(cls):
-        return True
-
-    if ontology is None:
-        return False
-
-    if fmt in visited:
-        return False
-
-    visited.add(fmt)
-
-    uriRefFmt = URIRef(fmt)
-
-    for s, p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)):
-        # Find parent classes of `fmt` and search upward
-        if formatSubclassOf(o, cls, ontology, visited):
-            return True
-
-    for s, p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)):
-        # Find equivalent classes of `fmt` and search horizontally
-        if formatSubclassOf(o, cls, ontology, visited):
-            return True
-
-    for s, p, o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)):
-        # Find equivalent classes of `fmt` and search horizontally
-        if formatSubclassOf(s, cls, ontology, visited):
-            return True
-
-    return False
-
-def check_format(actual_file,    # type: Union[Dict[Text, Any], List[Dict[Text, Any]], Text]
-                 input_formats,  # type: Union[List[Text], Text]
-                 ontology        # type: Optional[Graph]
-                ):  # type: (...) -> None
-    """Confirm that the format present is valid for the allowed formats."""
-    for afile in aslist(actual_file):
-        if not afile:
-            continue
-        if "format" not in afile:
-            raise validate.ValidationException(
-                u"File has no 'format' defined: {}".format(
-                    json_dumps(afile, indent=4)))
-        for inpf in aslist(input_formats):
-            if afile["format"] == inpf or \
-                    formatSubclassOf(afile["format"], inpf, ontology, set()):
-                return
-        raise validate.ValidationException(
-            u"File has an incompatible format: {}".format(
-                json_dumps(afile, indent=4)))
-
-class HasReqsHints(object):
-    def __init__(self):  # type: () -> None
-        """Initialize this reqs decorator."""
-        self.requirements = []  # type: List[Dict[Text, Any]]
-        self.hints = []         # type: List[Dict[Text, Any]]
-
-    def get_requirement(self,
-                        feature  # type: Text
-                       ):  # type: (...) -> Tuple[Optional[Any], Optional[bool]]
-        for item in reversed(self.requirements):
-            if item["class"] == feature:
-                return (item, True)
-        for item in reversed(self.hints):
-            if item["class"] == feature:
-                return (item, False)
-        return (None, None)
-
-class Builder(HasReqsHints):
-    def __init__(self,
-                 job,                  # type: Dict[Text, expression.JSON]
-                 files,                # type: List[Dict[Text, Text]]
-                 bindings,             # type: List[Dict[Text, Any]]
-                 schemaDefs,           # type: Dict[Text, Dict[Text, Any]]
-                 names,                # type: Names
-                 requirements,         # type: List[Dict[Text, Any]]
-                 hints,                # type: List[Dict[Text, Any]]
-                 resources,            # type: Dict[str, int]
-                 mutation_manager,     # type: Optional[MutationManager]
-                 formatgraph,          # type: Optional[Graph]
-                 make_fs_access,       # type: Type[StdFsAccess]
-                 fs_access,            # type: StdFsAccess
-                 job_script_provider,  # type: Optional[Any]
-                 timeout,              # type: float
-                 debug,                # type: bool
-                 js_console,           # type: bool
-                 force_docker_pull,    # type: bool
-                 loadListing,          # type: Text
-                 outdir,               # type: Text
-                 tmpdir,               # type: Text
-                 stagedir             # type: Text
-                ):  # type: (...) -> None
-        """Initialize this Builder."""
-        self.job = job
-        self.files = files
-        self.bindings = bindings
-        self.schemaDefs = schemaDefs
-        self.names = names
-        self.requirements = requirements
-        self.hints = hints
-        self.resources = resources
-        self.mutation_manager = mutation_manager
-        self.formatgraph = formatgraph
-
-        self.make_fs_access = make_fs_access
-        self.fs_access = fs_access
-
-        self.job_script_provider = job_script_provider
-
-        self.timeout = timeout
-
-        self.debug = debug
-        self.js_console = js_console
-        self.force_docker_pull = force_docker_pull
-
-        # One of "no_listing", "shallow_listing", "deep_listing"
-        self.loadListing = loadListing
-
-        self.outdir = outdir
-        self.tmpdir = tmpdir
-        self.stagedir = stagedir
-
-        self.pathmapper = None  # type: Optional[PathMapper]
-        self.prov_obj = None  # type: Optional[ProvenanceProfile]
-        self.find_default_container = None  # type: Optional[Callable[[], Text]]
-
-    def build_job_script(self, commands):
-        # type: (List[Text]) -> Text
-        build_job_script_method = getattr(self.job_script_provider, "build_job_script", None)  # type: Callable[[Builder, Union[List[str],List[Text]]], Text]
-        if build_job_script_method is not None:
-            return build_job_script_method(self, commands)
-        return None
-
-    def bind_input(self,
-                   schema,                   # type: MutableMapping[Text, Any]
-                   datum,                    # type: Any
-                   discover_secondaryFiles,  # type: bool
-                   lead_pos=None,            # type: Optional[Union[int, List[int]]]
-                   tail_pos=None,            # type: Optional[List[int]]
-                  ):  # type: (...) -> List[MutableMapping[Text, Any]]
-
-        if tail_pos is None:
-            tail_pos = []
-        if lead_pos is None:
-            lead_pos = []
-
-        bindings = []  # type: List[MutableMapping[Text, Text]]
-        binding = {}  # type: Union[MutableMapping[Text, Text], CommentedMap]
-        value_from_expression = False
-        if "inputBinding" in schema and isinstance(schema["inputBinding"], MutableMapping):
-            binding = CommentedMap(schema["inputBinding"].items())
-
-            bp = list(aslist(lead_pos))
-            if "position" in binding:
-                position = binding["position"]
-                if isinstance(position, str):   # no need to test the CWL Version
-                                                # the schema for v1.0 only allow ints
-                    binding['position'] = self.do_eval(position, context=datum)
-                    bp.append(binding['position'])
-                else:
-                    bp.extend(aslist(binding['position']))
-            else:
-                bp.append(0)
-            bp.extend(aslist(tail_pos))
-            binding["position"] = bp
-
-            binding["datum"] = datum
-            if "valueFrom" in binding:
-                value_from_expression = True
-
-        # Handle union types
-        if isinstance(schema["type"], MutableSequence):
-            bound_input = False
-            for t in schema["type"]:
-                avsc = None  # type: Optional[Schema]
-                if isinstance(t, string_types) and self.names.has_name(t, ""):
-                    avsc = self.names.get_name(t, "")
-                elif isinstance(t, MutableMapping) and "name" in t and self.names.has_name(t["name"], ""):
-                    avsc = self.names.get_name(t["name"], "")
-                if not avsc:
-                    avsc = make_avsc_object(convert_to_dict(t), self.names)
-                if validate.validate(avsc, datum):
-                    schema = copy.deepcopy(schema)
-                    schema["type"] = t
-                    if not value_from_expression:
-                        return self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)
-                    else:
-                        self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)
-                        bound_input = True
-            if not bound_input:
-                raise validate.ValidationException(u"'%s' is not a valid union %s" % (datum, schema["type"]))
-        elif isinstance(schema["type"], MutableMapping):
-            st = copy.deepcopy(schema["type"])
-            if binding and "inputBinding" not in st\
-                    and "type" in st\
-                    and st["type"] == "array"\
-                    and "itemSeparator" not in binding:
-                st["inputBinding"] = {}
-            for k in ("secondaryFiles", "format", "streamable"):
-                if k in schema:
-                    st[k] = schema[k]
-            if value_from_expression:
-                self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)
-            else:
-                bindings.extend(self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles))
-        else:
-            if schema["type"] in self.schemaDefs:
-                schema = self.schemaDefs[schema["type"]]
-
-            if schema["type"] == "record":
-                for f in schema["fields"]:
-                    if f["name"] in datum and datum[f["name"]] is not None:
-                        bindings.extend(self.bind_input(f, datum[f["name"]], lead_pos=lead_pos, tail_pos=f["name"], discover_secondaryFiles=discover_secondaryFiles))
-                    else:
-                        datum[f["name"]] = f.get("default")
-
-            if schema["type"] == "array":
-                for n, item in enumerate(datum):
-                    b2 = None
-                    if binding:
-                        b2 = copy.deepcopy(binding)
-                        b2["datum"] = item
-                    itemschema = {
-                        u"type": schema["items"],
-                        u"inputBinding": b2
-                    }
-                    for k in ("secondaryFiles", "format", "streamable"):
-                        if k in schema:
-                            itemschema[k] = schema[k]
-                    bindings.extend(
-                        self.bind_input(itemschema, item, lead_pos=n, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles))
-                binding = {}
-
-            def _capture_files(f):  # type: (Dict[Text, Text]) -> Dict[Text, Text]
-                self.files.append(f)
-                return f
-
-            if schema["type"] == "File":
-                self.files.append(datum)
-                if (binding and binding.get("loadContents")) or schema.get("loadContents"):
-                    with self.fs_access.open(datum["location"], "rb") as f:
-                        datum["contents"] = content_limit_respected_read(f)
-
-                if "secondaryFiles" in schema:
-                    if "secondaryFiles" not in datum:
-                        datum["secondaryFiles"] = []
-                    for sf in aslist(schema["secondaryFiles"]):
-                        if 'required' in sf:
-                            sf_required = self.do_eval(sf['required'], context=datum)
-                        else:
-                            sf_required = True
-
-
-                        if "$(" in sf["pattern"] or "${" in sf["pattern"]:
-                            sfpath = self.do_eval(sf["pattern"], context=datum)
-                        else:
-                            sfpath = substitute(datum["basename"], sf["pattern"])
-
-                        for sfname in aslist(sfpath):
-                            if not sfname:
-                                continue
-                            found = False
-                            for d in datum["secondaryFiles"]:
-                                if not d.get("basename"):
-                                    d["basename"] = d["location"][d["location"].rindex("/")+1:]
-                                if d["basename"] == sfname:
-                                    found = True
-                            if not found:
-                                sf_location = datum["location"][0:datum["location"].rindex("/")+1]+sfname
-                                if isinstance(sfname, MutableMapping):
-                                    datum["secondaryFiles"].append(sfname)
-                                elif discover_secondaryFiles and self.fs_access.exists(sf_location):
-                                    datum["secondaryFiles"].append({
-                                        "location": sf_location,
-                                        "basename": sfname,
-                                        "class": "File"})
-                                elif sf_required:
-                                    raise WorkflowException("Missing required secondary file '%s' from file object: %s" % (
-                                        sfname, json_dumps(datum, indent=4)))
-
-                    normalizeFilesDirs(datum["secondaryFiles"])
-
-                if "format" in schema:
-                    try:
-                        check_format(datum, self.do_eval(schema["format"]),
-                                     self.formatgraph)
-                    except validate.ValidationException as ve:
-                        raise_from(WorkflowException(
-                            "Expected value of '%s' to have format %s but\n "
-                            " %s" % (schema["name"], schema["format"], ve)), ve)
-
-                visit_class(datum.get("secondaryFiles", []), ("File", "Directory"), _capture_files)
-
-            if schema["type"] == "Directory":
-                ll = schema.get("loadListing") or self.loadListing
-                if ll and ll != "no_listing":
-                    get_listing(self.fs_access, datum, (ll == "deep_listing"))
-                self.files.append(datum)
-
-            if schema["type"] == "Any":
-                visit_class(datum, ("File", "Directory"), _capture_files)
-
-        # Position to front of the sort key
-        if binding:
-            for bi in bindings:
-                bi["position"] = binding["position"] + bi["position"]
-            bindings.append(binding)
-
-        return bindings
-
-    def tostr(self, value):  # type: (Union[MutableMapping[Text, Text], Any]) -> Text
-        if isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"):
-            if "path" not in value:
-                raise WorkflowException(u"%s object missing \"path\": %s" % (value["class"], value))
-
-            # Path adjust for windows file path when passing to docker, docker accepts unix like path only
-            (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
-            if onWindows() and docker_req is not None:
-                # docker_req is none only when there is no dockerRequirement
-                # mentioned in hints and Requirement
-                path = docker_windows_path_adjust(value["path"])
-                return path
-            return value["path"]
-        else:
-            return Text(value)
-
-    def generate_arg(self, binding):  # type: (Dict[Text, Any]) -> List[Text]
-        value = binding.get("datum")
-        if "valueFrom" in binding:
-            with SourceLine(binding, "valueFrom", WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
-                value = self.do_eval(binding["valueFrom"], context=value)
-
-        prefix = binding.get("prefix")  # type: Optional[Text]
-        sep = binding.get("separate", True)
-        if prefix is None and not sep:
-            with SourceLine(binding, "separate", WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
-                raise WorkflowException("'separate' option can not be specified without prefix")
-
-        argl = []  # type: MutableSequence[MutableMapping[Text, Text]]
-        if isinstance(value, MutableSequence):
-            if binding.get("itemSeparator") and value:
-                argl = [binding["itemSeparator"].join([self.tostr(v) for v in value])]
-            elif binding.get("valueFrom"):
-                value = [self.tostr(v) for v in value]
-                return ([prefix] if prefix else []) + value
-            elif prefix and value:
-                return [prefix]
-            else:
-                return []
-        elif isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"):
-            argl = [value]
-        elif isinstance(value, MutableMapping):
-            return [prefix] if prefix else []
-        elif value is True and prefix:
-            return [prefix]
-        elif value is False or value is None or (value is True and not prefix):
-            return []
-        else:
-            argl = [value]
-
-        args = []
-        for j in argl:
-            if sep:
-                args.extend([prefix, self.tostr(j)])
-            else:
-                args.append(self.tostr(j) if prefix is None else prefix + self.tostr(j))
-
-        return [a for a in args if a is not None]
-
-    def do_eval(self, ex, context=None, recursive=False, strip_whitespace=True):
-        # type: (Union[Dict[Text, Text], Text], Any, bool, bool) -> Any
-        if recursive:
-            if isinstance(ex, MutableMapping):
-                return {k: self.do_eval(v, context, recursive)
-                        for k, v in iteritems(ex)}
-            if isinstance(ex, MutableSequence):
-                return [self.do_eval(v, context, recursive)
-                        for v in ex]
-
-        return expression.do_eval(ex, self.job, self.requirements,
-                                  self.outdir, self.tmpdir,
-                                  self.resources,
-                                  context=context,
-                                  timeout=self.timeout,
-                                  debug=self.debug,
-                                  js_console=self.js_console,
-                                  force_docker_pull=self.force_docker_pull,
-                                  strip_whitespace=strip_whitespace)