Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/load_tool.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 (2021-03-22) |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/load_tool.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,520 @@ +"""Loads a CWL document.""" + +import hashlib +import logging +import os +import re +import urllib +import uuid +from typing import ( + Any, + Dict, + List, + MutableMapping, + MutableSequence, + Optional, + Tuple, + Union, + cast, +) + +from ruamel.yaml.comments import CommentedMap, CommentedSeq +from schema_salad.exceptions import ValidationException +from schema_salad.ref_resolver import Loader, file_uri +from schema_salad.schema import validate_doc +from schema_salad.sourceline import SourceLine, cmap +from schema_salad.utils import ( + ContextType, + FetcherCallableType, + IdxResultType, + ResolveType, + json_dumps, +) + +from . import CWL_CONTENT_TYPES, process, update +from .context import LoadingContext +from .errors import WorkflowException +from .loghandler import _logger +from .process import Process, get_schema, shortname +from .update import ALLUPDATES +from .utils import CWLObjectType, ResolverType, visit_class + +jobloaderctx = { + "cwl": "https://w3id.org/cwl/cwl#", + "cwltool": "http://commonwl.org/cwltool#", + "path": {"@type": "@id"}, + "location": {"@type": "@id"}, + "id": "@id", +} # type: ContextType + + +overrides_ctx = { + "overrideTarget": {"@type": "@id"}, + "cwltool": "http://commonwl.org/cwltool#", + "http://commonwl.org/cwltool#overrides": { + "@id": "cwltool:overrides", + "mapSubject": "overrideTarget", + }, + "requirements": { + "@id": "https://w3id.org/cwl/cwl#requirements", + "mapSubject": "class", + }, +} # type: ContextType + + +def default_loader( + fetcher_constructor: Optional[FetcherCallableType] = None, + enable_dev: bool = False, + doc_cache: bool = True, +) -> Loader: + return Loader( + jobloaderctx, + fetcher_constructor=fetcher_constructor, + allow_attachments=lambda r: enable_dev, + doc_cache=doc_cache, + ) + + +def resolve_tool_uri( + argsworkflow: str, + resolver: Optional[ResolverType] = None, + fetcher_constructor: Optional[FetcherCallableType] = None, + document_loader: Optional[Loader] = None, +) -> Tuple[str, str]: + + uri = None # type: Optional[str] + split = urllib.parse.urlsplit(argsworkflow) + # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that + if split.scheme and split.scheme in ["http", "https", "file"]: + uri = argsworkflow + elif os.path.exists(os.path.abspath(argsworkflow)): + uri = file_uri(str(os.path.abspath(argsworkflow))) + elif resolver is not None: + uri = resolver( + document_loader or default_loader(fetcher_constructor), argsworkflow + ) + + if uri is None: + raise ValidationException("Not found: '%s'" % argsworkflow) + + if argsworkflow != uri: + _logger.info("Resolved '%s' to '%s'", argsworkflow, uri) + + fileuri = urllib.parse.urldefrag(uri)[0] + return uri, fileuri + + +def fetch_document( + argsworkflow: Union[str, CWLObjectType], + loadingContext: Optional[LoadingContext] = None, +) -> Tuple[LoadingContext, CommentedMap, str]: + """Retrieve a CWL document.""" + if loadingContext is None: + loadingContext = LoadingContext() + loadingContext.loader = default_loader() + else: + loadingContext = loadingContext.copy() + if loadingContext.loader is None: + loadingContext.loader = default_loader( + loadingContext.fetcher_constructor, + enable_dev=loadingContext.enable_dev, + doc_cache=loadingContext.doc_cache, + ) + + if isinstance(argsworkflow, str): + uri, fileuri = resolve_tool_uri( + argsworkflow, + resolver=loadingContext.resolver, + document_loader=loadingContext.loader, + ) + workflowobj = cast( + CommentedMap, + loadingContext.loader.fetch(fileuri, content_types=CWL_CONTENT_TYPES), + ) + return loadingContext, workflowobj, uri + if isinstance(argsworkflow, MutableMapping): + uri = ( + cast(str, argsworkflow["id"]) + if argsworkflow.get("id") + else "_:" + str(uuid.uuid4()) + ) + workflowobj = cast( + CommentedMap, cmap(cast(Dict[str, Any], argsworkflow), fn=uri) + ) + loadingContext.loader.idx[uri] = workflowobj + return loadingContext, workflowobj, uri + raise ValidationException("Must be URI or object: '%s'" % argsworkflow) + + +def _convert_stdstreams_to_files( + workflowobj: Union[ + CWLObjectType, MutableSequence[Union[CWLObjectType, str, int]], str + ] +) -> None: + if isinstance(workflowobj, MutableMapping): + if workflowobj.get("class") == "CommandLineTool": + with SourceLine( + workflowobj, + "outputs", + ValidationException, + _logger.isEnabledFor(logging.DEBUG), + ): + outputs = workflowobj.get("outputs", []) + if not isinstance(outputs, CommentedSeq): + raise ValidationException('"outputs" section is not ' "valid.") + for out in cast( + MutableSequence[CWLObjectType], workflowobj.get("outputs", []) + ): + if not isinstance(out, CommentedMap): + raise ValidationException( + f"Output '{out}' is not a valid OutputParameter." + ) + for streamtype in ["stdout", "stderr"]: + if out.get("type") == streamtype: + if "outputBinding" in out: + raise ValidationException( + "Not allowed to specify outputBinding when" + " using %s shortcut." % streamtype + ) + if streamtype in workflowobj: + filename = workflowobj[streamtype] + else: + filename = str( + hashlib.sha1( # nosec + json_dumps(workflowobj, sort_keys=True).encode( + "utf-8" + ) + ).hexdigest() + ) + workflowobj[streamtype] = filename + out["type"] = "File" + out["outputBinding"] = cmap({"glob": filename}) + for inp in cast( + MutableSequence[CWLObjectType], workflowobj.get("inputs", []) + ): + if inp.get("type") == "stdin": + if "inputBinding" in inp: + raise ValidationException( + "Not allowed to specify inputBinding when" + " using stdin shortcut." + ) + if "stdin" in workflowobj: + raise ValidationException( + "Not allowed to specify stdin path when" + " using stdin type shortcut." + ) + else: + workflowobj["stdin"] = ( + "$(inputs.%s.path)" + % cast(str, inp["id"]).rpartition("#")[2] + ) + inp["type"] = "File" + else: + for entry in workflowobj.values(): + _convert_stdstreams_to_files( + cast( + Union[ + CWLObjectType, + MutableSequence[Union[CWLObjectType, str, int]], + str, + ], + entry, + ) + ) + if isinstance(workflowobj, MutableSequence): + for entry in workflowobj: + _convert_stdstreams_to_files( + cast( + Union[ + CWLObjectType, + MutableSequence[Union[CWLObjectType, str, int]], + str, + ], + entry, + ) + ) + + +def _add_blank_ids( + workflowobj: Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]] +) -> None: + if isinstance(workflowobj, MutableMapping): + if ( + "run" in workflowobj + and isinstance(workflowobj["run"], MutableMapping) + and "id" not in workflowobj["run"] + and "$import" not in workflowobj["run"] + ): + workflowobj["run"]["id"] = str(uuid.uuid4()) + for entry in workflowobj.values(): + _add_blank_ids( + cast( + Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]], + entry, + ) + ) + if isinstance(workflowobj, MutableSequence): + for entry in workflowobj: + _add_blank_ids( + cast( + Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]], + entry, + ) + ) + + +def resolve_and_validate_document( + loadingContext: LoadingContext, + workflowobj: Union[CommentedMap, CommentedSeq], + uri: str, + preprocess_only: bool = False, + skip_schemas: Optional[bool] = None, +) -> Tuple[LoadingContext, str]: + """Validate a CWL document.""" + if not loadingContext.loader: + raise ValueError("loadingContext must have a loader.") + else: + loader = loadingContext.loader + loadingContext = loadingContext.copy() + + if not isinstance(workflowobj, MutableMapping): + raise ValueError( + "workflowjobj must be a dict, got '{}': {}".format( + type(workflowobj), workflowobj + ) + ) + + jobobj = None + if "cwl:tool" in workflowobj: + jobobj, _ = loader.resolve_all(workflowobj, uri) + uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"]) + del cast(Dict[str, Any], jobobj)["https://w3id.org/cwl/cwl#tool"] + + workflowobj = fetch_document(uri, loadingContext)[1] + + fileuri = urllib.parse.urldefrag(uri)[0] + + cwlVersion = loadingContext.metadata.get("cwlVersion") + if not cwlVersion: + cwlVersion = workflowobj.get("cwlVersion") + if not cwlVersion and fileuri != uri: + # The tool we're loading is a fragment of a bigger file. Get + # the document root element and look for cwlVersion there. + metadata = cast(CWLObjectType, fetch_document(fileuri, loadingContext)[1]) + cwlVersion = cast(str, metadata.get("cwlVersion")) + if not cwlVersion: + raise ValidationException( + "No cwlVersion found. " + "Use the following syntax in your CWL document to declare " + "the version: cwlVersion: <version>.\n" + "Note: if this is a CWL draft-2 (pre v1.0) document then it " + "will need to be upgraded first." + ) + + if not isinstance(cwlVersion, str): + with SourceLine(workflowobj, "cwlVersion", ValidationException): + raise ValidationException( + "'cwlVersion' must be a string, got {}".format(type(cwlVersion)) + ) + # strip out version + cwlVersion = re.sub(r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", cwlVersion) + if cwlVersion not in list(ALLUPDATES): + # print out all the Supported Versions of cwlVersion + versions = [] + for version in list(ALLUPDATES): + if "dev" in version: + version += " (with --enable-dev flag only)" + versions.append(version) + versions.sort() + raise ValidationException( + "The CWL reference runner no longer supports pre CWL v1.0 " + "documents. Supported versions are: " + "\n{}".format("\n".join(versions)) + ) + + if ( + isinstance(jobobj, CommentedMap) + and "http://commonwl.org/cwltool#overrides" in jobobj + ): + loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri)) + del jobobj["http://commonwl.org/cwltool#overrides"] + + if ( + isinstance(jobobj, CommentedMap) + and "https://w3id.org/cwl/cwl#requirements" in jobobj + ): + if cwlVersion not in ("v1.1.0-dev1", "v1.1"): + raise ValidationException( + "`cwl:requirements` in the input object is not part of CWL " + "v1.0. You can adjust to use `cwltool:overrides` instead; or you " + "can set the cwlVersion to v1.1 or greater." + ) + loadingContext.overrides_list.append( + { + "overrideTarget": uri, + "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"], + } + ) + del jobobj["https://w3id.org/cwl/cwl#requirements"] + + (sch_document_loader, avsc_names) = process.get_schema(cwlVersion)[:2] + + if isinstance(avsc_names, Exception): + raise avsc_names + + processobj = None # type: Optional[ResolveType] + document_loader = Loader( + sch_document_loader.ctx, + schemagraph=sch_document_loader.graph, + idx=loader.idx, + cache=sch_document_loader.cache, + fetcher_constructor=loadingContext.fetcher_constructor, + skip_schemas=skip_schemas, + doc_cache=loadingContext.doc_cache, + ) + + if cwlVersion == "v1.0": + _add_blank_ids(workflowobj) + + document_loader.resolve_all(workflowobj, fileuri) + processobj, metadata = document_loader.resolve_ref(uri) + if not isinstance(processobj, (CommentedMap, CommentedSeq)): + raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.") + + if not hasattr(processobj.lc, "filename"): + processobj.lc.filename = fileuri + + if loadingContext.metadata: + metadata = loadingContext.metadata + + if not isinstance(metadata, CommentedMap): + raise ValidationException( + "metadata must be a CommentedMap, was %s" % type(metadata) + ) + + if isinstance(processobj, CommentedMap): + uri = processobj["id"] + + _convert_stdstreams_to_files(workflowobj) + + if isinstance(jobobj, CommentedMap): + loadingContext.jobdefaults = jobobj + + loadingContext.loader = document_loader + loadingContext.avsc_names = avsc_names + loadingContext.metadata = metadata + + if preprocess_only: + return loadingContext, uri + + if loadingContext.do_validate: + validate_doc(avsc_names, processobj, document_loader, loadingContext.strict) + + # None means default behavior (do update) + if loadingContext.do_update in (True, None): + if "cwlVersion" not in metadata: + metadata["cwlVersion"] = cwlVersion + processobj = update.update( + processobj, document_loader, fileuri, loadingContext.enable_dev, metadata + ) + document_loader.idx[processobj["id"]] = processobj + + def update_index(pr: CommentedMap) -> None: + if "id" in pr: + document_loader.idx[pr["id"]] = pr + + visit_class( + processobj, ("CommandLineTool", "Workflow", "ExpressionTool"), update_index + ) + + return loadingContext, uri + + +def make_tool( + uri: Union[str, CommentedMap, CommentedSeq], loadingContext: LoadingContext +) -> Process: + """Make a Python CWL object.""" + if loadingContext.loader is None: + raise ValueError("loadingContext must have a loader") + resolveduri, metadata = loadingContext.loader.resolve_ref(uri) + + processobj = None + if isinstance(resolveduri, MutableSequence): + for obj in resolveduri: + if obj["id"].endswith("#main"): + processobj = obj + break + if not processobj: + raise WorkflowException( + "Tool file contains graph of multiple objects, must specify " + "one of #%s" + % ", #".join( + urllib.parse.urldefrag(i["id"])[1] for i in resolveduri if "id" in i + ) + ) + elif isinstance(resolveduri, MutableMapping): + processobj = resolveduri + else: + raise Exception("Must resolve to list or dict") + + tool = loadingContext.construct_tool_object(processobj, loadingContext) + + if loadingContext.jobdefaults: + jobobj = loadingContext.jobdefaults + for inp in tool.tool["inputs"]: + if shortname(inp["id"]) in jobobj: + inp["default"] = jobobj[shortname(inp["id"])] + + return tool + + +def load_tool( + argsworkflow: Union[str, CWLObjectType], + loadingContext: Optional[LoadingContext] = None, +) -> Process: + + loadingContext, workflowobj, uri = fetch_document(argsworkflow, loadingContext) + + loadingContext, uri = resolve_and_validate_document( + loadingContext, workflowobj, uri + ) + + return make_tool(uri, loadingContext) + + +def resolve_overrides( + ov: IdxResultType, + ov_uri: str, + baseurl: str, +) -> List[CWLObjectType]: + ovloader = Loader(overrides_ctx) + ret, _ = ovloader.resolve_all(ov, baseurl) + if not isinstance(ret, CommentedMap): + raise Exception("Expected CommentedMap, got %s" % type(ret)) + cwl_docloader = get_schema("v1.0")[0] + cwl_docloader.resolve_all(ret, ov_uri) + return cast(List[CWLObjectType], ret["http://commonwl.org/cwltool#overrides"]) + + +def load_overrides(ov: str, base_url: str) -> List[CWLObjectType]: + ovloader = Loader(overrides_ctx) + return resolve_overrides(ovloader.fetch(ov), ov, base_url) + + +def recursive_resolve_and_validate_document( + loadingContext: LoadingContext, + workflowobj: Union[CommentedMap, CommentedSeq], + uri: str, + preprocess_only: bool = False, + skip_schemas: Optional[bool] = None, +) -> Tuple[LoadingContext, str, Process]: + """Validate a CWL document, checking that a tool object can be built.""" + loadingContext, uri = resolve_and_validate_document( + loadingContext, + workflowobj, + uri, + preprocess_only=preprocess_only, + skip_schemas=skip_schemas, + ) + tool = make_tool(uri, loadingContext) + return loadingContext, uri, tool