diff env/lib/python3.7/site-packages/cwltool/builder.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/cwltool/builder.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,451 @@
+from __future__ import absolute_import
+
+import copy
+import os
+import logging
+from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence,
+                    Optional, Set, Tuple, Union)
+
+from typing_extensions import Text, Type, TYPE_CHECKING  # pylint: disable=unused-import
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+from rdflib import Graph, URIRef  # pylint: disable=unused-import
+from rdflib.namespace import OWL, RDFS
+from ruamel.yaml.comments import CommentedMap
+from schema_salad import validate
+from schema_salad.schema import Names, convert_to_dict
+from schema_salad.avro.schema import make_avsc_object, Schema
+from schema_salad.sourceline import SourceLine
+from schema_salad.ref_resolver import uri_file_path
+from six import iteritems, string_types
+from future.utils import raise_from
+from typing import IO
+from typing_extensions import (TYPE_CHECKING,  # pylint: disable=unused-import
+                               Text, Type)
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+from . import expression
+from .errors import WorkflowException
+from .loghandler import _logger
+from .mutation import MutationManager  # pylint: disable=unused-import
+from .pathmapper import PathMapper  # pylint: disable=unused-import
+from .pathmapper import CONTENT_LIMIT, get_listing, normalizeFilesDirs, visit_class
+from .stdfsaccess import StdFsAccess  # pylint: disable=unused-import
+from .utils import aslist, docker_windows_path_adjust, json_dumps, onWindows
+
+
+
+if TYPE_CHECKING:
+    from .provenance import ProvenanceProfile  # pylint: disable=unused-import
+
+
+def content_limit_respected_read_bytes(f):  # type: (IO[bytes]) -> bytes
+    contents = f.read(CONTENT_LIMIT + 1)
+    if len(contents) > CONTENT_LIMIT:
+        raise WorkflowException("loadContents handling encountered buffer that is exceeds maximum lenght of %d bytes" % CONTENT_LIMIT)
+    return contents
+
+
+def content_limit_respected_read(f):  # type: (IO[bytes]) -> Text
+    return content_limit_respected_read_bytes(f).decode("utf-8")
+
+
+def substitute(value, replace):  # type: (Text, Text) -> Text
+    if replace.startswith("^"):
+        try:
+            return substitute(value[0:value.rindex('.')], replace[1:])
+        except ValueError:
+            # No extension to remove
+            return value + replace.lstrip("^")
+    return value + replace
+
+def formatSubclassOf(fmt, cls, ontology, visited):
+    # type: (Text, Text, Optional[Graph], Set[Text]) -> bool
+    """Determine if `fmt` is a subclass of `cls`."""
+    if URIRef(fmt) == URIRef(cls):
+        return True
+
+    if ontology is None:
+        return False
+
+    if fmt in visited:
+        return False
+
+    visited.add(fmt)
+
+    uriRefFmt = URIRef(fmt)
+
+    for s, p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)):
+        # Find parent classes of `fmt` and search upward
+        if formatSubclassOf(o, cls, ontology, visited):
+            return True
+
+    for s, p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)):
+        # Find equivalent classes of `fmt` and search horizontally
+        if formatSubclassOf(o, cls, ontology, visited):
+            return True
+
+    for s, p, o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)):
+        # Find equivalent classes of `fmt` and search horizontally
+        if formatSubclassOf(s, cls, ontology, visited):
+            return True
+
+    return False
+
+def check_format(actual_file,    # type: Union[Dict[Text, Any], List[Dict[Text, Any]], Text]
+                 input_formats,  # type: Union[List[Text], Text]
+                 ontology        # type: Optional[Graph]
+                ):  # type: (...) -> None
+    """Confirm that the format present is valid for the allowed formats."""
+    for afile in aslist(actual_file):
+        if not afile:
+            continue
+        if "format" not in afile:
+            raise validate.ValidationException(
+                u"File has no 'format' defined: {}".format(
+                    json_dumps(afile, indent=4)))
+        for inpf in aslist(input_formats):
+            if afile["format"] == inpf or \
+                    formatSubclassOf(afile["format"], inpf, ontology, set()):
+                return
+        raise validate.ValidationException(
+            u"File has an incompatible format: {}".format(
+                json_dumps(afile, indent=4)))
+
+class HasReqsHints(object):
+    def __init__(self):  # type: () -> None
+        """Initialize this reqs decorator."""
+        self.requirements = []  # type: List[Dict[Text, Any]]
+        self.hints = []         # type: List[Dict[Text, Any]]
+
+    def get_requirement(self,
+                        feature  # type: Text
+                       ):  # type: (...) -> Tuple[Optional[Any], Optional[bool]]
+        for item in reversed(self.requirements):
+            if item["class"] == feature:
+                return (item, True)
+        for item in reversed(self.hints):
+            if item["class"] == feature:
+                return (item, False)
+        return (None, None)
+
+class Builder(HasReqsHints):
+    def __init__(self,
+                 job,                  # type: Dict[Text, expression.JSON]
+                 files,                # type: List[Dict[Text, Text]]
+                 bindings,             # type: List[Dict[Text, Any]]
+                 schemaDefs,           # type: Dict[Text, Dict[Text, Any]]
+                 names,                # type: Names
+                 requirements,         # type: List[Dict[Text, Any]]
+                 hints,                # type: List[Dict[Text, Any]]
+                 resources,            # type: Dict[str, int]
+                 mutation_manager,     # type: Optional[MutationManager]
+                 formatgraph,          # type: Optional[Graph]
+                 make_fs_access,       # type: Type[StdFsAccess]
+                 fs_access,            # type: StdFsAccess
+                 job_script_provider,  # type: Optional[Any]
+                 timeout,              # type: float
+                 debug,                # type: bool
+                 js_console,           # type: bool
+                 force_docker_pull,    # type: bool
+                 loadListing,          # type: Text
+                 outdir,               # type: Text
+                 tmpdir,               # type: Text
+                 stagedir             # type: Text
+                ):  # type: (...) -> None
+        """Initialize this Builder."""
+        self.job = job
+        self.files = files
+        self.bindings = bindings
+        self.schemaDefs = schemaDefs
+        self.names = names
+        self.requirements = requirements
+        self.hints = hints
+        self.resources = resources
+        self.mutation_manager = mutation_manager
+        self.formatgraph = formatgraph
+
+        self.make_fs_access = make_fs_access
+        self.fs_access = fs_access
+
+        self.job_script_provider = job_script_provider
+
+        self.timeout = timeout
+
+        self.debug = debug
+        self.js_console = js_console
+        self.force_docker_pull = force_docker_pull
+
+        # One of "no_listing", "shallow_listing", "deep_listing"
+        self.loadListing = loadListing
+
+        self.outdir = outdir
+        self.tmpdir = tmpdir
+        self.stagedir = stagedir
+
+        self.pathmapper = None  # type: Optional[PathMapper]
+        self.prov_obj = None  # type: Optional[ProvenanceProfile]
+        self.find_default_container = None  # type: Optional[Callable[[], Text]]
+
+    def build_job_script(self, commands):
+        # type: (List[Text]) -> Text
+        build_job_script_method = getattr(self.job_script_provider, "build_job_script", None)  # type: Callable[[Builder, Union[List[str],List[Text]]], Text]
+        if build_job_script_method is not None:
+            return build_job_script_method(self, commands)
+        return None
+
+    def bind_input(self,
+                   schema,                   # type: MutableMapping[Text, Any]
+                   datum,                    # type: Any
+                   discover_secondaryFiles,  # type: bool
+                   lead_pos=None,            # type: Optional[Union[int, List[int]]]
+                   tail_pos=None,            # type: Optional[List[int]]
+                  ):  # type: (...) -> List[MutableMapping[Text, Any]]
+
+        if tail_pos is None:
+            tail_pos = []
+        if lead_pos is None:
+            lead_pos = []
+
+        bindings = []  # type: List[MutableMapping[Text, Text]]
+        binding = {}  # type: Union[MutableMapping[Text, Text], CommentedMap]
+        value_from_expression = False
+        if "inputBinding" in schema and isinstance(schema["inputBinding"], MutableMapping):
+            binding = CommentedMap(schema["inputBinding"].items())
+
+            bp = list(aslist(lead_pos))
+            if "position" in binding:
+                position = binding["position"]
+                if isinstance(position, str):   # no need to test the CWL Version
+                                                # the schema for v1.0 only allow ints
+                    binding['position'] = self.do_eval(position, context=datum)
+                    bp.append(binding['position'])
+                else:
+                    bp.extend(aslist(binding['position']))
+            else:
+                bp.append(0)
+            bp.extend(aslist(tail_pos))
+            binding["position"] = bp
+
+            binding["datum"] = datum
+            if "valueFrom" in binding:
+                value_from_expression = True
+
+        # Handle union types
+        if isinstance(schema["type"], MutableSequence):
+            bound_input = False
+            for t in schema["type"]:
+                avsc = None  # type: Optional[Schema]
+                if isinstance(t, string_types) and self.names.has_name(t, ""):
+                    avsc = self.names.get_name(t, "")
+                elif isinstance(t, MutableMapping) and "name" in t and self.names.has_name(t["name"], ""):
+                    avsc = self.names.get_name(t["name"], "")
+                if not avsc:
+                    avsc = make_avsc_object(convert_to_dict(t), self.names)
+                if validate.validate(avsc, datum):
+                    schema = copy.deepcopy(schema)
+                    schema["type"] = t
+                    if not value_from_expression:
+                        return self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)
+                    else:
+                        self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)
+                        bound_input = True
+            if not bound_input:
+                raise validate.ValidationException(u"'%s' is not a valid union %s" % (datum, schema["type"]))
+        elif isinstance(schema["type"], MutableMapping):
+            st = copy.deepcopy(schema["type"])
+            if binding and "inputBinding" not in st\
+                    and "type" in st\
+                    and st["type"] == "array"\
+                    and "itemSeparator" not in binding:
+                st["inputBinding"] = {}
+            for k in ("secondaryFiles", "format", "streamable"):
+                if k in schema:
+                    st[k] = schema[k]
+            if value_from_expression:
+                self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)
+            else:
+                bindings.extend(self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles))
+        else:
+            if schema["type"] in self.schemaDefs:
+                schema = self.schemaDefs[schema["type"]]
+
+            if schema["type"] == "record":
+                for f in schema["fields"]:
+                    if f["name"] in datum and datum[f["name"]] is not None:
+                        bindings.extend(self.bind_input(f, datum[f["name"]], lead_pos=lead_pos, tail_pos=f["name"], discover_secondaryFiles=discover_secondaryFiles))
+                    else:
+                        datum[f["name"]] = f.get("default")
+
+            if schema["type"] == "array":
+                for n, item in enumerate(datum):
+                    b2 = None
+                    if binding:
+                        b2 = copy.deepcopy(binding)
+                        b2["datum"] = item
+                    itemschema = {
+                        u"type": schema["items"],
+                        u"inputBinding": b2
+                    }
+                    for k in ("secondaryFiles", "format", "streamable"):
+                        if k in schema:
+                            itemschema[k] = schema[k]
+                    bindings.extend(
+                        self.bind_input(itemschema, item, lead_pos=n, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles))
+                binding = {}
+
+            def _capture_files(f):  # type: (Dict[Text, Text]) -> Dict[Text, Text]
+                self.files.append(f)
+                return f
+
+            if schema["type"] == "File":
+                self.files.append(datum)
+                if (binding and binding.get("loadContents")) or schema.get("loadContents"):
+                    with self.fs_access.open(datum["location"], "rb") as f:
+                        datum["contents"] = content_limit_respected_read(f)
+
+                if "secondaryFiles" in schema:
+                    if "secondaryFiles" not in datum:
+                        datum["secondaryFiles"] = []
+                    for sf in aslist(schema["secondaryFiles"]):
+                        if 'required' in sf:
+                            sf_required = self.do_eval(sf['required'], context=datum)
+                        else:
+                            sf_required = True
+
+
+                        if "$(" in sf["pattern"] or "${" in sf["pattern"]:
+                            sfpath = self.do_eval(sf["pattern"], context=datum)
+                        else:
+                            sfpath = substitute(datum["basename"], sf["pattern"])
+
+                        for sfname in aslist(sfpath):
+                            if not sfname:
+                                continue
+                            found = False
+                            for d in datum["secondaryFiles"]:
+                                if not d.get("basename"):
+                                    d["basename"] = d["location"][d["location"].rindex("/")+1:]
+                                if d["basename"] == sfname:
+                                    found = True
+                            if not found:
+                                sf_location = datum["location"][0:datum["location"].rindex("/")+1]+sfname
+                                if isinstance(sfname, MutableMapping):
+                                    datum["secondaryFiles"].append(sfname)
+                                elif discover_secondaryFiles and self.fs_access.exists(sf_location):
+                                    datum["secondaryFiles"].append({
+                                        "location": sf_location,
+                                        "basename": sfname,
+                                        "class": "File"})
+                                elif sf_required:
+                                    raise WorkflowException("Missing required secondary file '%s' from file object: %s" % (
+                                        sfname, json_dumps(datum, indent=4)))
+
+                    normalizeFilesDirs(datum["secondaryFiles"])
+
+                if "format" in schema:
+                    try:
+                        check_format(datum, self.do_eval(schema["format"]),
+                                     self.formatgraph)
+                    except validate.ValidationException as ve:
+                        raise_from(WorkflowException(
+                            "Expected value of '%s' to have format %s but\n "
+                            " %s" % (schema["name"], schema["format"], ve)), ve)
+
+                visit_class(datum.get("secondaryFiles", []), ("File", "Directory"), _capture_files)
+
+            if schema["type"] == "Directory":
+                ll = schema.get("loadListing") or self.loadListing
+                if ll and ll != "no_listing":
+                    get_listing(self.fs_access, datum, (ll == "deep_listing"))
+                self.files.append(datum)
+
+            if schema["type"] == "Any":
+                visit_class(datum, ("File", "Directory"), _capture_files)
+
+        # Position to front of the sort key
+        if binding:
+            for bi in bindings:
+                bi["position"] = binding["position"] + bi["position"]
+            bindings.append(binding)
+
+        return bindings
+
+    def tostr(self, value):  # type: (Union[MutableMapping[Text, Text], Any]) -> Text
+        if isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"):
+            if "path" not in value:
+                raise WorkflowException(u"%s object missing \"path\": %s" % (value["class"], value))
+
+            # Path adjust for windows file path when passing to docker, docker accepts unix like path only
+            (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
+            if onWindows() and docker_req is not None:
+                # docker_req is none only when there is no dockerRequirement
+                # mentioned in hints and Requirement
+                path = docker_windows_path_adjust(value["path"])
+                return path
+            return value["path"]
+        else:
+            return Text(value)
+
+    def generate_arg(self, binding):  # type: (Dict[Text, Any]) -> List[Text]
+        value = binding.get("datum")
+        if "valueFrom" in binding:
+            with SourceLine(binding, "valueFrom", WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
+                value = self.do_eval(binding["valueFrom"], context=value)
+
+        prefix = binding.get("prefix")  # type: Optional[Text]
+        sep = binding.get("separate", True)
+        if prefix is None and not sep:
+            with SourceLine(binding, "separate", WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
+                raise WorkflowException("'separate' option can not be specified without prefix")
+
+        argl = []  # type: MutableSequence[MutableMapping[Text, Text]]
+        if isinstance(value, MutableSequence):
+            if binding.get("itemSeparator") and value:
+                argl = [binding["itemSeparator"].join([self.tostr(v) for v in value])]
+            elif binding.get("valueFrom"):
+                value = [self.tostr(v) for v in value]
+                return ([prefix] if prefix else []) + value
+            elif prefix and value:
+                return [prefix]
+            else:
+                return []
+        elif isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"):
+            argl = [value]
+        elif isinstance(value, MutableMapping):
+            return [prefix] if prefix else []
+        elif value is True and prefix:
+            return [prefix]
+        elif value is False or value is None or (value is True and not prefix):
+            return []
+        else:
+            argl = [value]
+
+        args = []
+        for j in argl:
+            if sep:
+                args.extend([prefix, self.tostr(j)])
+            else:
+                args.append(self.tostr(j) if prefix is None else prefix + self.tostr(j))
+
+        return [a for a in args if a is not None]
+
+    def do_eval(self, ex, context=None, recursive=False, strip_whitespace=True):
+        # type: (Union[Dict[Text, Text], Text], Any, bool, bool) -> Any
+        if recursive:
+            if isinstance(ex, MutableMapping):
+                return {k: self.do_eval(v, context, recursive)
+                        for k, v in iteritems(ex)}
+            if isinstance(ex, MutableSequence):
+                return [self.do_eval(v, context, recursive)
+                        for v in ex]
+
+        return expression.do_eval(ex, self.job, self.requirements,
+                                  self.outdir, self.tmpdir,
+                                  self.resources,
+                                  context=context,
+                                  timeout=self.timeout,
+                                  debug=self.debug,
+                                  js_console=self.js_console,
+                                  force_docker_pull=self.force_docker_pull,
+                                  strip_whitespace=strip_whitespace)