Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/load_tool.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
"""Loads a CWL document.""" from __future__ import absolute_import import hashlib import logging import os import re import uuid from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence, Optional, Tuple, Union, cast) import requests.sessions from ruamel.yaml.comments import CommentedMap, CommentedSeq from schema_salad import schema from schema_salad.ref_resolver import (ContextType, # pylint: disable=unused-import Fetcher, Loader, file_uri, SubLoader) from schema_salad.sourceline import SourceLine, cmap from schema_salad.validate import ValidationException from six import itervalues, string_types from six.moves import urllib from typing_extensions import Text # pylint: disable=unused-import # move to a regular typing import when Python 3.3-3.6 is no longer supported from . import process, update from .context import LoadingContext # pylint: disable=unused-import from .errors import WorkflowException from .loghandler import _logger from .process import (Process, get_schema, # pylint: disable=unused-import shortname) from .software_requirements import ( # pylint: disable=unused-import DependenciesConfiguration) from .update import ALLUPDATES from .utils import json_dumps jobloaderctx = { u"cwl": "https://w3id.org/cwl/cwl#", u"cwltool": "http://commonwl.org/cwltool#", u"path": {u"@type": u"@id"}, u"location": {u"@type": u"@id"}, u"id": u"@id" } # type: ContextType overrides_ctx = { u"overrideTarget": {u"@type": u"@id"}, u"cwltool": "http://commonwl.org/cwltool#", u"http://commonwl.org/cwltool#overrides": { "@id": "cwltool:overrides", "mapSubject": "overrideTarget", }, "requirements": { "@id": "https://w3id.org/cwl/cwl#requirements", "mapSubject": "class" } } # type: ContextType FetcherConstructorType = Callable[ [Dict[Text, Union[Text, bool]], requests.sessions.Session], Fetcher] ResolverType = Callable[[Loader, Union[Text, Dict[Text, Any]]], Text] def default_loader(fetcher_constructor=None, enable_dev=False): # type: (Optional[FetcherConstructorType], bool) -> Loader return Loader(jobloaderctx, fetcher_constructor=fetcher_constructor, allow_attachments=lambda r: enable_dev) def resolve_tool_uri(argsworkflow, # type: Text resolver=None, # type: Optional[ResolverType] fetcher_constructor=None, # type: Optional[FetcherConstructorType] document_loader=None # type: Optional[Loader] ): # type: (...) -> Tuple[Text, Text] uri = None # type: Optional[Text] split = urllib.parse.urlsplit(argsworkflow) # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that if split.scheme and split.scheme in [u'http', u'https', u'file']: uri = argsworkflow elif os.path.exists(os.path.abspath(argsworkflow)): uri = file_uri(str(os.path.abspath(argsworkflow))) elif resolver is not None: if document_loader is None: document_loader = default_loader(fetcher_constructor) uri = resolver(document_loader, argsworkflow) if uri is None: raise ValidationException("Not found: '%s'" % argsworkflow) if argsworkflow != uri: _logger.info("Resolved '%s' to '%s'", argsworkflow, uri) fileuri = urllib.parse.urldefrag(uri)[0] return uri, fileuri def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]] loadingContext=None # type: Optional[LoadingContext] ): # type: (...) -> Tuple[LoadingContext, CommentedMap, Text] """Retrieve a CWL document.""" if loadingContext is None: loadingContext = LoadingContext() loadingContext.loader = default_loader() else: loadingContext = loadingContext.copy() if loadingContext.loader is None: loadingContext.loader = default_loader(loadingContext.fetcher_constructor) if isinstance(argsworkflow, string_types): uri, fileuri = resolve_tool_uri(argsworkflow, resolver=loadingContext.resolver, document_loader=loadingContext.loader) workflowobj = loadingContext.loader.fetch(fileuri) return loadingContext, workflowobj, uri if isinstance(argsworkflow, dict): uri = argsworkflow["id"] if argsworkflow.get("id") else "_:" + Text(uuid.uuid4()) workflowobj = cast(CommentedMap, cmap(argsworkflow, fn=uri)) loadingContext.loader.idx[uri] = workflowobj return loadingContext, workflowobj, uri raise ValidationException("Must be URI or object: '%s'" % argsworkflow) def _convert_stdstreams_to_files(workflowobj): # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None if isinstance(workflowobj, MutableMapping): if workflowobj.get('class') == 'CommandLineTool': with SourceLine(workflowobj, "outputs", ValidationException, _logger.isEnabledFor(logging.DEBUG)): outputs = workflowobj.get('outputs', []) if not isinstance(outputs, CommentedSeq): raise ValidationException('"outputs" section is not ' 'valid.') for out in workflowobj.get('outputs', []): if not isinstance(out, CommentedMap): raise ValidationException( "Output '{}' is not a valid " "OutputParameter.".format(out)) for streamtype in ['stdout', 'stderr']: if out.get('type') == streamtype: if 'outputBinding' in out: raise ValidationException( "Not allowed to specify outputBinding when" " using %s shortcut." % streamtype) if streamtype in workflowobj: filename = workflowobj[streamtype] else: filename = Text( hashlib.sha1( # nosec json_dumps(workflowobj, sort_keys=True ).encode('utf-8')).hexdigest()) workflowobj[streamtype] = filename out['type'] = 'File' out['outputBinding'] = cmap({'glob': filename}) for inp in workflowobj.get('inputs', []): if inp.get('type') == 'stdin': if 'inputBinding' in inp: raise ValidationException( "Not allowed to specify inputBinding when" " using stdin shortcut.") if 'stdin' in workflowobj: raise ValidationException( "Not allowed to specify stdin path when" " using stdin type shortcut.") else: workflowobj['stdin'] = \ "$(inputs.%s.path)" % \ inp['id'].rpartition('#')[2] inp['type'] = 'File' else: for entry in itervalues(workflowobj): _convert_stdstreams_to_files(entry) if isinstance(workflowobj, MutableSequence): for entry in workflowobj: _convert_stdstreams_to_files(entry) def _add_blank_ids(workflowobj): # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None if isinstance(workflowobj, MutableMapping): if ("run" in workflowobj and isinstance(workflowobj["run"], MutableMapping) and "id" not in workflowobj["run"] and "$import" not in workflowobj["run"]): workflowobj["run"]["id"] = Text(uuid.uuid4()) for entry in itervalues(workflowobj): _add_blank_ids(entry) if isinstance(workflowobj, MutableSequence): for entry in workflowobj: _add_blank_ids(entry) def resolve_and_validate_document( loadingContext, # type: LoadingContext workflowobj, # type: Union[CommentedMap, CommentedSeq] uri, # type: Text preprocess_only=False, # type: bool skip_schemas=None, # type: Optional[bool] ): # type: (...) -> Tuple[LoadingContext, Text] """Validate a CWL document.""" if not loadingContext.loader: raise ValueError("loadingContext must have a loader.") else: loader = loadingContext.loader loadingContext = loadingContext.copy() if not isinstance(workflowobj, MutableMapping): raise ValueError("workflowjobj must be a dict, got '{}': {}".format( type(workflowobj), workflowobj)) jobobj = None if "cwl:tool" in workflowobj: jobobj, _ = loader.resolve_all(workflowobj, uri) uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"]) del cast(Dict[Text, Any], jobobj)["https://w3id.org/cwl/cwl#tool"] workflowobj = fetch_document(uri, loadingContext)[1] fileuri = urllib.parse.urldefrag(uri)[0] cwlVersion = loadingContext.metadata.get("cwlVersion") if not cwlVersion: cwlVersion = workflowobj.get("cwlVersion") if not cwlVersion and fileuri != uri: # The tool we're loading is a fragment of a bigger file. Get # the document root element and look for cwlVersion there. metadata = fetch_document(fileuri, loadingContext)[1] # type: Dict[Text, Any] cwlVersion = metadata.get("cwlVersion") if not cwlVersion: raise ValidationException( "No cwlVersion found. " "Use the following syntax in your CWL document to declare " "the version: cwlVersion: <version>.\n" "Note: if this is a CWL draft-2 (pre v1.0) document then it " "will need to be upgraded first.") if not isinstance(cwlVersion, string_types): with SourceLine(workflowobj, "cwlVersion", ValidationException): raise ValidationException("'cwlVersion' must be a string, " "got {}".format( type(cwlVersion))) # strip out version cwlVersion = re.sub( r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", cwlVersion) if cwlVersion not in list(ALLUPDATES): # print out all the Supported Versions of cwlVersion versions = [] for version in list(ALLUPDATES): if "dev" in version: version += " (with --enable-dev flag only)" versions.append(version) versions.sort() raise ValidationException( "The CWL reference runner no longer supports pre CWL v1.0 " "documents. Supported versions are: " "\n{}".format("\n".join(versions))) if isinstance(jobobj, CommentedMap) and "http://commonwl.org/cwltool#overrides" in jobobj: loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri)) del jobobj["http://commonwl.org/cwltool#overrides"] if isinstance(jobobj, CommentedMap) and "https://w3id.org/cwl/cwl#requirements" in jobobj: if cwlVersion not in ("v1.1.0-dev1","v1.1"): raise ValidationException( "`cwl:requirements` in the input object is not part of CWL " "v1.0. You can adjust to use `cwltool:overrides` instead; or you " "can set the cwlVersion to v1.1 or greater.") loadingContext.overrides_list.append({"overrideTarget": uri, "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"]}) del jobobj["https://w3id.org/cwl/cwl#requirements"] (sch_document_loader, avsc_names) = \ process.get_schema(cwlVersion)[:2] if isinstance(avsc_names, Exception): raise avsc_names processobj = None # type: Union[CommentedMap, CommentedSeq, Text, None] document_loader = Loader(sch_document_loader.ctx, schemagraph=sch_document_loader.graph, idx=loader.idx, cache=sch_document_loader.cache, fetcher_constructor=loadingContext.fetcher_constructor, skip_schemas=skip_schemas) if cwlVersion == "v1.0": _add_blank_ids(workflowobj) processobj, metadata = document_loader.resolve_all(workflowobj, fileuri) if loadingContext.metadata: metadata = loadingContext.metadata if not isinstance(processobj, (CommentedMap, CommentedSeq)): raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.") if not isinstance(metadata, CommentedMap): raise ValidationException("metadata must be a CommentedMap, was %s" % type(metadata)) if isinstance(processobj, CommentedMap): uri = processobj["id"] _convert_stdstreams_to_files(workflowobj) if preprocess_only: return loadingContext, uri if loadingContext.do_validate: schema.validate_doc(avsc_names, processobj, document_loader, loadingContext.strict) # None means default behavior (do update) if loadingContext.do_update in (True, None): if "cwlVersion" not in metadata: metadata["cwlVersion"] = cwlVersion processobj = update.update( processobj, document_loader, fileuri, loadingContext.enable_dev, metadata) document_loader.idx[processobj["id"]] = processobj if jobobj is not None: loadingContext.jobdefaults = jobobj loadingContext.loader = document_loader loadingContext.avsc_names = avsc_names loadingContext.metadata = metadata return loadingContext, uri def make_tool(uri, # type: Union[Text, CommentedMap, CommentedSeq] loadingContext # type: LoadingContext ): # type: (...) -> Process """Make a Python CWL object.""" if loadingContext.loader is None: raise ValueError("loadingContext must have a loader") resolveduri, metadata = loadingContext.loader.resolve_ref(uri) processobj = None if isinstance(resolveduri, MutableSequence): for obj in resolveduri: if obj['id'].endswith('#main'): processobj = obj break if not processobj: raise WorkflowException( u"Tool file contains graph of multiple objects, must specify " "one of #%s" % ", #".join( urllib.parse.urldefrag(i["id"])[1] for i in resolveduri if "id" in i)) elif isinstance(resolveduri, MutableMapping): processobj = resolveduri else: raise Exception("Must resolve to list or dict") tool = loadingContext.construct_tool_object(processobj, loadingContext) if loadingContext.jobdefaults: jobobj = loadingContext.jobdefaults for inp in tool.tool["inputs"]: if shortname(inp["id"]) in jobobj: inp["default"] = jobobj[shortname(inp["id"])] return tool def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]] loadingContext=None # type: Optional[LoadingContext] ): # type: (...) -> Process loadingContext, workflowobj, uri = fetch_document( argsworkflow, loadingContext) loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri) return make_tool(uri, loadingContext) def resolve_overrides(ov, # type: CommentedMap ov_uri, # type: Text baseurl # type: Text ): # type: (...) -> List[Dict[Text, Any]] ovloader = Loader(overrides_ctx) ret, _ = ovloader.resolve_all(ov, baseurl) if not isinstance(ret, CommentedMap): raise Exception("Expected CommentedMap, got %s" % type(ret)) cwl_docloader = get_schema("v1.0")[0] cwl_docloader.resolve_all(ret, ov_uri) return cast(List[Dict[Text, Any]], ret["http://commonwl.org/cwltool#overrides"]) def load_overrides(ov, base_url): # type: (Text, Text) -> List[Dict[Text, Any]] ovloader = Loader(overrides_ctx) return resolve_overrides(ovloader.fetch(ov), ov, base_url)