Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/builder.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/cwltool/builder.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,451 @@ +from __future__ import absolute_import + +import copy +import os +import logging +from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence, + Optional, Set, Tuple, Union) + +from typing_extensions import Text, Type, TYPE_CHECKING # pylint: disable=unused-import +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +from rdflib import Graph, URIRef # pylint: disable=unused-import +from rdflib.namespace import OWL, RDFS +from ruamel.yaml.comments import CommentedMap +from schema_salad import validate +from schema_salad.schema import Names, convert_to_dict +from schema_salad.avro.schema import make_avsc_object, Schema +from schema_salad.sourceline import SourceLine +from schema_salad.ref_resolver import uri_file_path +from six import iteritems, string_types +from future.utils import raise_from +from typing import IO +from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import + Text, Type) +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +from . import expression +from .errors import WorkflowException +from .loghandler import _logger +from .mutation import MutationManager # pylint: disable=unused-import +from .pathmapper import PathMapper # pylint: disable=unused-import +from .pathmapper import CONTENT_LIMIT, get_listing, normalizeFilesDirs, visit_class +from .stdfsaccess import StdFsAccess # pylint: disable=unused-import +from .utils import aslist, docker_windows_path_adjust, json_dumps, onWindows + + + +if TYPE_CHECKING: + from .provenance import ProvenanceProfile # pylint: disable=unused-import + + +def content_limit_respected_read_bytes(f): # type: (IO[bytes]) -> bytes + contents = f.read(CONTENT_LIMIT + 1) + if len(contents) > CONTENT_LIMIT: + raise WorkflowException("loadContents handling encountered buffer that is exceeds maximum lenght of %d bytes" % CONTENT_LIMIT) + return contents + + +def content_limit_respected_read(f): # type: (IO[bytes]) -> Text + return content_limit_respected_read_bytes(f).decode("utf-8") + + +def substitute(value, replace): # type: (Text, Text) -> Text + if replace.startswith("^"): + try: + return substitute(value[0:value.rindex('.')], replace[1:]) + except ValueError: + # No extension to remove + return value + replace.lstrip("^") + return value + replace + +def formatSubclassOf(fmt, cls, ontology, visited): + # type: (Text, Text, Optional[Graph], Set[Text]) -> bool + """Determine if `fmt` is a subclass of `cls`.""" + if URIRef(fmt) == URIRef(cls): + return True + + if ontology is None: + return False + + if fmt in visited: + return False + + visited.add(fmt) + + uriRefFmt = URIRef(fmt) + + for s, p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)): + # Find parent classes of `fmt` and search upward + if formatSubclassOf(o, cls, ontology, visited): + return True + + for s, p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)): + # Find equivalent classes of `fmt` and search horizontally + if formatSubclassOf(o, cls, ontology, visited): + return True + + for s, p, o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)): + # Find equivalent classes of `fmt` and search horizontally + if formatSubclassOf(s, cls, ontology, visited): + return True + + return False + +def check_format(actual_file, # type: Union[Dict[Text, Any], List[Dict[Text, Any]], Text] + input_formats, # type: Union[List[Text], Text] + ontology # type: Optional[Graph] + ): # type: (...) -> None + """Confirm that the format present is valid for the allowed formats.""" + for afile in aslist(actual_file): + if not afile: + continue + if "format" not in afile: + raise validate.ValidationException( + u"File has no 'format' defined: {}".format( + json_dumps(afile, indent=4))) + for inpf in aslist(input_formats): + if afile["format"] == inpf or \ + formatSubclassOf(afile["format"], inpf, ontology, set()): + return + raise validate.ValidationException( + u"File has an incompatible format: {}".format( + json_dumps(afile, indent=4))) + +class HasReqsHints(object): + def __init__(self): # type: () -> None + """Initialize this reqs decorator.""" + self.requirements = [] # type: List[Dict[Text, Any]] + self.hints = [] # type: List[Dict[Text, Any]] + + def get_requirement(self, + feature # type: Text + ): # type: (...) -> Tuple[Optional[Any], Optional[bool]] + for item in reversed(self.requirements): + if item["class"] == feature: + return (item, True) + for item in reversed(self.hints): + if item["class"] == feature: + return (item, False) + return (None, None) + +class Builder(HasReqsHints): + def __init__(self, + job, # type: Dict[Text, expression.JSON] + files, # type: List[Dict[Text, Text]] + bindings, # type: List[Dict[Text, Any]] + schemaDefs, # type: Dict[Text, Dict[Text, Any]] + names, # type: Names + requirements, # type: List[Dict[Text, Any]] + hints, # type: List[Dict[Text, Any]] + resources, # type: Dict[str, int] + mutation_manager, # type: Optional[MutationManager] + formatgraph, # type: Optional[Graph] + make_fs_access, # type: Type[StdFsAccess] + fs_access, # type: StdFsAccess + job_script_provider, # type: Optional[Any] + timeout, # type: float + debug, # type: bool + js_console, # type: bool + force_docker_pull, # type: bool + loadListing, # type: Text + outdir, # type: Text + tmpdir, # type: Text + stagedir # type: Text + ): # type: (...) -> None + """Initialize this Builder.""" + self.job = job + self.files = files + self.bindings = bindings + self.schemaDefs = schemaDefs + self.names = names + self.requirements = requirements + self.hints = hints + self.resources = resources + self.mutation_manager = mutation_manager + self.formatgraph = formatgraph + + self.make_fs_access = make_fs_access + self.fs_access = fs_access + + self.job_script_provider = job_script_provider + + self.timeout = timeout + + self.debug = debug + self.js_console = js_console + self.force_docker_pull = force_docker_pull + + # One of "no_listing", "shallow_listing", "deep_listing" + self.loadListing = loadListing + + self.outdir = outdir + self.tmpdir = tmpdir + self.stagedir = stagedir + + self.pathmapper = None # type: Optional[PathMapper] + self.prov_obj = None # type: Optional[ProvenanceProfile] + self.find_default_container = None # type: Optional[Callable[[], Text]] + + def build_job_script(self, commands): + # type: (List[Text]) -> Text + build_job_script_method = getattr(self.job_script_provider, "build_job_script", None) # type: Callable[[Builder, Union[List[str],List[Text]]], Text] + if build_job_script_method is not None: + return build_job_script_method(self, commands) + return None + + def bind_input(self, + schema, # type: MutableMapping[Text, Any] + datum, # type: Any + discover_secondaryFiles, # type: bool + lead_pos=None, # type: Optional[Union[int, List[int]]] + tail_pos=None, # type: Optional[List[int]] + ): # type: (...) -> List[MutableMapping[Text, Any]] + + if tail_pos is None: + tail_pos = [] + if lead_pos is None: + lead_pos = [] + + bindings = [] # type: List[MutableMapping[Text, Text]] + binding = {} # type: Union[MutableMapping[Text, Text], CommentedMap] + value_from_expression = False + if "inputBinding" in schema and isinstance(schema["inputBinding"], MutableMapping): + binding = CommentedMap(schema["inputBinding"].items()) + + bp = list(aslist(lead_pos)) + if "position" in binding: + position = binding["position"] + if isinstance(position, str): # no need to test the CWL Version + # the schema for v1.0 only allow ints + binding['position'] = self.do_eval(position, context=datum) + bp.append(binding['position']) + else: + bp.extend(aslist(binding['position'])) + else: + bp.append(0) + bp.extend(aslist(tail_pos)) + binding["position"] = bp + + binding["datum"] = datum + if "valueFrom" in binding: + value_from_expression = True + + # Handle union types + if isinstance(schema["type"], MutableSequence): + bound_input = False + for t in schema["type"]: + avsc = None # type: Optional[Schema] + if isinstance(t, string_types) and self.names.has_name(t, ""): + avsc = self.names.get_name(t, "") + elif isinstance(t, MutableMapping) and "name" in t and self.names.has_name(t["name"], ""): + avsc = self.names.get_name(t["name"], "") + if not avsc: + avsc = make_avsc_object(convert_to_dict(t), self.names) + if validate.validate(avsc, datum): + schema = copy.deepcopy(schema) + schema["type"] = t + if not value_from_expression: + return self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles) + else: + self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles) + bound_input = True + if not bound_input: + raise validate.ValidationException(u"'%s' is not a valid union %s" % (datum, schema["type"])) + elif isinstance(schema["type"], MutableMapping): + st = copy.deepcopy(schema["type"]) + if binding and "inputBinding" not in st\ + and "type" in st\ + and st["type"] == "array"\ + and "itemSeparator" not in binding: + st["inputBinding"] = {} + for k in ("secondaryFiles", "format", "streamable"): + if k in schema: + st[k] = schema[k] + if value_from_expression: + self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles) + else: + bindings.extend(self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)) + else: + if schema["type"] in self.schemaDefs: + schema = self.schemaDefs[schema["type"]] + + if schema["type"] == "record": + for f in schema["fields"]: + if f["name"] in datum and datum[f["name"]] is not None: + bindings.extend(self.bind_input(f, datum[f["name"]], lead_pos=lead_pos, tail_pos=f["name"], discover_secondaryFiles=discover_secondaryFiles)) + else: + datum[f["name"]] = f.get("default") + + if schema["type"] == "array": + for n, item in enumerate(datum): + b2 = None + if binding: + b2 = copy.deepcopy(binding) + b2["datum"] = item + itemschema = { + u"type": schema["items"], + u"inputBinding": b2 + } + for k in ("secondaryFiles", "format", "streamable"): + if k in schema: + itemschema[k] = schema[k] + bindings.extend( + self.bind_input(itemschema, item, lead_pos=n, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)) + binding = {} + + def _capture_files(f): # type: (Dict[Text, Text]) -> Dict[Text, Text] + self.files.append(f) + return f + + if schema["type"] == "File": + self.files.append(datum) + if (binding and binding.get("loadContents")) or schema.get("loadContents"): + with self.fs_access.open(datum["location"], "rb") as f: + datum["contents"] = content_limit_respected_read(f) + + if "secondaryFiles" in schema: + if "secondaryFiles" not in datum: + datum["secondaryFiles"] = [] + for sf in aslist(schema["secondaryFiles"]): + if 'required' in sf: + sf_required = self.do_eval(sf['required'], context=datum) + else: + sf_required = True + + + if "$(" in sf["pattern"] or "${" in sf["pattern"]: + sfpath = self.do_eval(sf["pattern"], context=datum) + else: + sfpath = substitute(datum["basename"], sf["pattern"]) + + for sfname in aslist(sfpath): + if not sfname: + continue + found = False + for d in datum["secondaryFiles"]: + if not d.get("basename"): + d["basename"] = d["location"][d["location"].rindex("/")+1:] + if d["basename"] == sfname: + found = True + if not found: + sf_location = datum["location"][0:datum["location"].rindex("/")+1]+sfname + if isinstance(sfname, MutableMapping): + datum["secondaryFiles"].append(sfname) + elif discover_secondaryFiles and self.fs_access.exists(sf_location): + datum["secondaryFiles"].append({ + "location": sf_location, + "basename": sfname, + "class": "File"}) + elif sf_required: + raise WorkflowException("Missing required secondary file '%s' from file object: %s" % ( + sfname, json_dumps(datum, indent=4))) + + normalizeFilesDirs(datum["secondaryFiles"]) + + if "format" in schema: + try: + check_format(datum, self.do_eval(schema["format"]), + self.formatgraph) + except validate.ValidationException as ve: + raise_from(WorkflowException( + "Expected value of '%s' to have format %s but\n " + " %s" % (schema["name"], schema["format"], ve)), ve) + + visit_class(datum.get("secondaryFiles", []), ("File", "Directory"), _capture_files) + + if schema["type"] == "Directory": + ll = schema.get("loadListing") or self.loadListing + if ll and ll != "no_listing": + get_listing(self.fs_access, datum, (ll == "deep_listing")) + self.files.append(datum) + + if schema["type"] == "Any": + visit_class(datum, ("File", "Directory"), _capture_files) + + # Position to front of the sort key + if binding: + for bi in bindings: + bi["position"] = binding["position"] + bi["position"] + bindings.append(binding) + + return bindings + + def tostr(self, value): # type: (Union[MutableMapping[Text, Text], Any]) -> Text + if isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"): + if "path" not in value: + raise WorkflowException(u"%s object missing \"path\": %s" % (value["class"], value)) + + # Path adjust for windows file path when passing to docker, docker accepts unix like path only + (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") + if onWindows() and docker_req is not None: + # docker_req is none only when there is no dockerRequirement + # mentioned in hints and Requirement + path = docker_windows_path_adjust(value["path"]) + return path + return value["path"] + else: + return Text(value) + + def generate_arg(self, binding): # type: (Dict[Text, Any]) -> List[Text] + value = binding.get("datum") + if "valueFrom" in binding: + with SourceLine(binding, "valueFrom", WorkflowException, _logger.isEnabledFor(logging.DEBUG)): + value = self.do_eval(binding["valueFrom"], context=value) + + prefix = binding.get("prefix") # type: Optional[Text] + sep = binding.get("separate", True) + if prefix is None and not sep: + with SourceLine(binding, "separate", WorkflowException, _logger.isEnabledFor(logging.DEBUG)): + raise WorkflowException("'separate' option can not be specified without prefix") + + argl = [] # type: MutableSequence[MutableMapping[Text, Text]] + if isinstance(value, MutableSequence): + if binding.get("itemSeparator") and value: + argl = [binding["itemSeparator"].join([self.tostr(v) for v in value])] + elif binding.get("valueFrom"): + value = [self.tostr(v) for v in value] + return ([prefix] if prefix else []) + value + elif prefix and value: + return [prefix] + else: + return [] + elif isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"): + argl = [value] + elif isinstance(value, MutableMapping): + return [prefix] if prefix else [] + elif value is True and prefix: + return [prefix] + elif value is False or value is None or (value is True and not prefix): + return [] + else: + argl = [value] + + args = [] + for j in argl: + if sep: + args.extend([prefix, self.tostr(j)]) + else: + args.append(self.tostr(j) if prefix is None else prefix + self.tostr(j)) + + return [a for a in args if a is not None] + + def do_eval(self, ex, context=None, recursive=False, strip_whitespace=True): + # type: (Union[Dict[Text, Text], Text], Any, bool, bool) -> Any + if recursive: + if isinstance(ex, MutableMapping): + return {k: self.do_eval(v, context, recursive) + for k, v in iteritems(ex)} + if isinstance(ex, MutableSequence): + return [self.do_eval(v, context, recursive) + for v in ex] + + return expression.do_eval(ex, self.job, self.requirements, + self.outdir, self.tmpdir, + self.resources, + context=context, + timeout=self.timeout, + debug=self.debug, + js_console=self.js_console, + force_docker_pull=self.force_docker_pull, + strip_whitespace=strip_whitespace)