diff env/lib/python3.7/site-packages/cwltool/load_tool.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/load_tool.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,392 +0,0 @@
-"""Loads a CWL document."""
-from __future__ import absolute_import
-
-import hashlib
-import logging
-import os
-import re
-import uuid
-from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence,
-                    Optional, Tuple, Union, cast)
-
-import requests.sessions
-from ruamel.yaml.comments import CommentedMap, CommentedSeq
-from schema_salad import schema
-from schema_salad.ref_resolver import (ContextType,  # pylint: disable=unused-import
-                                       Fetcher, Loader, file_uri, SubLoader)
-from schema_salad.sourceline import SourceLine, cmap
-from schema_salad.validate import ValidationException
-from six import itervalues, string_types
-from six.moves import urllib
-from typing_extensions import Text  # pylint: disable=unused-import
-# move to a regular typing import when Python 3.3-3.6 is no longer supported
-
-from . import process, update
-from .context import LoadingContext  # pylint: disable=unused-import
-from .errors import WorkflowException
-from .loghandler import _logger
-from .process import (Process, get_schema,  # pylint: disable=unused-import
-                      shortname)
-from .software_requirements import (  # pylint: disable=unused-import
-    DependenciesConfiguration)
-from .update import ALLUPDATES
-from .utils import json_dumps
-
-
-
-
-jobloaderctx = {
-    u"cwl": "https://w3id.org/cwl/cwl#",
-    u"cwltool": "http://commonwl.org/cwltool#",
-    u"path": {u"@type": u"@id"},
-    u"location": {u"@type": u"@id"},
-    u"id": u"@id"
-}  # type: ContextType
-
-
-overrides_ctx = {
-    u"overrideTarget": {u"@type": u"@id"},
-    u"cwltool": "http://commonwl.org/cwltool#",
-    u"http://commonwl.org/cwltool#overrides": {
-        "@id": "cwltool:overrides",
-        "mapSubject": "overrideTarget",
-    },
-    "requirements": {
-        "@id": "https://w3id.org/cwl/cwl#requirements",
-        "mapSubject": "class"
-    }
-}  # type: ContextType
-
-
-FetcherConstructorType = Callable[
-    [Dict[Text, Union[Text, bool]], requests.sessions.Session], Fetcher]
-ResolverType = Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
-
-def default_loader(fetcher_constructor=None, enable_dev=False):
-    # type: (Optional[FetcherConstructorType], bool) -> Loader
-    return Loader(jobloaderctx, fetcher_constructor=fetcher_constructor,
-                  allow_attachments=lambda r: enable_dev)
-
-def resolve_tool_uri(argsworkflow,              # type: Text
-                     resolver=None,             # type: Optional[ResolverType]
-                     fetcher_constructor=None,  # type: Optional[FetcherConstructorType]
-                     document_loader=None       # type: Optional[Loader]
-                    ):  # type: (...) -> Tuple[Text, Text]
-
-    uri = None  # type: Optional[Text]
-    split = urllib.parse.urlsplit(argsworkflow)
-    # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
-    if split.scheme and split.scheme in [u'http', u'https', u'file']:
-        uri = argsworkflow
-    elif os.path.exists(os.path.abspath(argsworkflow)):
-        uri = file_uri(str(os.path.abspath(argsworkflow)))
-    elif resolver is not None:
-        if document_loader is None:
-            document_loader = default_loader(fetcher_constructor)
-        uri = resolver(document_loader, argsworkflow)
-
-    if uri is None:
-        raise ValidationException("Not found: '%s'" % argsworkflow)
-
-    if argsworkflow != uri:
-        _logger.info("Resolved '%s' to '%s'", argsworkflow, uri)
-
-    fileuri = urllib.parse.urldefrag(uri)[0]
-    return uri, fileuri
-
-
-def fetch_document(argsworkflow,        # type: Union[Text, Dict[Text, Any]]
-                   loadingContext=None  # type: Optional[LoadingContext]
-                  ):  # type: (...) -> Tuple[LoadingContext, CommentedMap, Text]
-    """Retrieve a CWL document."""
-    if loadingContext is None:
-        loadingContext = LoadingContext()
-        loadingContext.loader = default_loader()
-    else:
-        loadingContext = loadingContext.copy()
-        if loadingContext.loader is None:
-            loadingContext.loader = default_loader(loadingContext.fetcher_constructor)
-
-    if isinstance(argsworkflow, string_types):
-        uri, fileuri = resolve_tool_uri(argsworkflow,
-                                        resolver=loadingContext.resolver,
-                                        document_loader=loadingContext.loader)
-        workflowobj = loadingContext.loader.fetch(fileuri)
-        return loadingContext, workflowobj, uri
-    if isinstance(argsworkflow, dict):
-        uri = argsworkflow["id"] if argsworkflow.get("id") else "_:" + Text(uuid.uuid4())
-        workflowobj = cast(CommentedMap, cmap(argsworkflow, fn=uri))
-        loadingContext.loader.idx[uri] = workflowobj
-        return loadingContext, workflowobj, uri
-    raise ValidationException("Must be URI or object: '%s'" % argsworkflow)
-
-
-def _convert_stdstreams_to_files(workflowobj):
-    # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None
-
-    if isinstance(workflowobj, MutableMapping):
-        if workflowobj.get('class') == 'CommandLineTool':
-            with SourceLine(workflowobj, "outputs", ValidationException,
-                            _logger.isEnabledFor(logging.DEBUG)):
-                outputs = workflowobj.get('outputs', [])
-                if not isinstance(outputs, CommentedSeq):
-                    raise ValidationException('"outputs" section is not '
-                                              'valid.')
-                for out in workflowobj.get('outputs', []):
-                    if not isinstance(out, CommentedMap):
-                        raise ValidationException(
-                            "Output '{}' is not a valid "
-                            "OutputParameter.".format(out))
-                    for streamtype in ['stdout', 'stderr']:
-                        if out.get('type') == streamtype:
-                            if 'outputBinding' in out:
-                                raise ValidationException(
-                                    "Not allowed to specify outputBinding when"
-                                    " using %s shortcut." % streamtype)
-                            if streamtype in workflowobj:
-                                filename = workflowobj[streamtype]
-                            else:
-                                filename = Text(
-                                    hashlib.sha1(  # nosec
-                                        json_dumps(workflowobj, sort_keys=True
-                                                  ).encode('utf-8')).hexdigest())
-                                workflowobj[streamtype] = filename
-                            out['type'] = 'File'
-                            out['outputBinding'] = cmap({'glob': filename})
-            for inp in workflowobj.get('inputs', []):
-                if inp.get('type') == 'stdin':
-                    if 'inputBinding' in inp:
-                        raise ValidationException(
-                            "Not allowed to specify inputBinding when"
-                            " using stdin shortcut.")
-                    if 'stdin' in workflowobj:
-                        raise ValidationException(
-                            "Not allowed to specify stdin path when"
-                            " using stdin type shortcut.")
-                    else:
-                        workflowobj['stdin'] = \
-                            "$(inputs.%s.path)" % \
-                            inp['id'].rpartition('#')[2]
-                        inp['type'] = 'File'
-        else:
-            for entry in itervalues(workflowobj):
-                _convert_stdstreams_to_files(entry)
-    if isinstance(workflowobj, MutableSequence):
-        for entry in workflowobj:
-            _convert_stdstreams_to_files(entry)
-
-def _add_blank_ids(workflowobj):
-    # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None
-
-    if isinstance(workflowobj, MutableMapping):
-        if ("run" in workflowobj and
-                isinstance(workflowobj["run"], MutableMapping) and
-                "id" not in workflowobj["run"] and
-                "$import" not in workflowobj["run"]):
-            workflowobj["run"]["id"] = Text(uuid.uuid4())
-        for entry in itervalues(workflowobj):
-            _add_blank_ids(entry)
-    if isinstance(workflowobj, MutableSequence):
-        for entry in workflowobj:
-            _add_blank_ids(entry)
-
-def resolve_and_validate_document(
-        loadingContext,            # type: LoadingContext
-        workflowobj,               # type: Union[CommentedMap, CommentedSeq]
-        uri,                       # type: Text
-        preprocess_only=False,     # type: bool
-        skip_schemas=None,         # type: Optional[bool]
-                                 ):
-    # type: (...) -> Tuple[LoadingContext, Text]
-    """Validate a CWL document."""
-    if not loadingContext.loader:
-        raise ValueError("loadingContext must have a loader.")
-    else:
-        loader = loadingContext.loader
-    loadingContext = loadingContext.copy()
-
-    if not isinstance(workflowobj, MutableMapping):
-        raise ValueError("workflowjobj must be a dict, got '{}': {}".format(
-            type(workflowobj), workflowobj))
-
-    jobobj = None
-    if "cwl:tool" in workflowobj:
-        jobobj, _ = loader.resolve_all(workflowobj, uri)
-        uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"])
-        del cast(Dict[Text, Any], jobobj)["https://w3id.org/cwl/cwl#tool"]
-
-        workflowobj = fetch_document(uri, loadingContext)[1]
-
-    fileuri = urllib.parse.urldefrag(uri)[0]
-
-    cwlVersion = loadingContext.metadata.get("cwlVersion")
-    if not cwlVersion:
-        cwlVersion = workflowobj.get("cwlVersion")
-    if not cwlVersion and fileuri != uri:
-        # The tool we're loading is a fragment of a bigger file.  Get
-        # the document root element and look for cwlVersion there.
-        metadata = fetch_document(fileuri, loadingContext)[1]  # type: Dict[Text, Any]
-        cwlVersion = metadata.get("cwlVersion")
-    if not cwlVersion:
-        raise ValidationException(
-            "No cwlVersion found. "
-            "Use the following syntax in your CWL document to declare "
-            "the version: cwlVersion: <version>.\n"
-            "Note: if this is a CWL draft-2 (pre v1.0) document then it "
-            "will need to be upgraded first.")
-
-    if not isinstance(cwlVersion, string_types):
-        with SourceLine(workflowobj, "cwlVersion", ValidationException):
-            raise ValidationException("'cwlVersion' must be a string, "
-                                      "got {}".format(
-                                          type(cwlVersion)))
-    # strip out version
-    cwlVersion = re.sub(
-        r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "",
-        cwlVersion)
-    if cwlVersion not in list(ALLUPDATES):
-        # print out all the Supported Versions of cwlVersion
-        versions = []
-        for version in list(ALLUPDATES):
-            if "dev" in version:
-                version += " (with --enable-dev flag only)"
-            versions.append(version)
-        versions.sort()
-        raise ValidationException(
-            "The CWL reference runner no longer supports pre CWL v1.0 "
-            "documents. Supported versions are: "
-            "\n{}".format("\n".join(versions)))
-
-    if isinstance(jobobj, CommentedMap) and "http://commonwl.org/cwltool#overrides" in jobobj:
-        loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri))
-        del jobobj["http://commonwl.org/cwltool#overrides"]
-
-    if isinstance(jobobj, CommentedMap) and "https://w3id.org/cwl/cwl#requirements" in jobobj:
-        if cwlVersion not in ("v1.1.0-dev1","v1.1"):
-            raise ValidationException(
-                    "`cwl:requirements` in the input object is not part of CWL "
-                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
-                    "can set the cwlVersion to v1.1 or greater.")
-        loadingContext.overrides_list.append({"overrideTarget": uri,
-                                              "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"]})
-        del jobobj["https://w3id.org/cwl/cwl#requirements"]
-
-    (sch_document_loader, avsc_names) = \
-        process.get_schema(cwlVersion)[:2]
-
-    if isinstance(avsc_names, Exception):
-        raise avsc_names
-
-    processobj = None  # type: Union[CommentedMap, CommentedSeq, Text, None]
-    document_loader = Loader(sch_document_loader.ctx,
-                             schemagraph=sch_document_loader.graph,
-                             idx=loader.idx,
-                             cache=sch_document_loader.cache,
-                             fetcher_constructor=loadingContext.fetcher_constructor,
-                             skip_schemas=skip_schemas)
-
-    if cwlVersion == "v1.0":
-        _add_blank_ids(workflowobj)
-
-    processobj, metadata = document_loader.resolve_all(workflowobj, fileuri)
-    if loadingContext.metadata:
-        metadata = loadingContext.metadata
-    if not isinstance(processobj, (CommentedMap, CommentedSeq)):
-        raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.")
-    if not isinstance(metadata, CommentedMap):
-        raise ValidationException("metadata must be a CommentedMap, was %s" % type(metadata))
-
-    if isinstance(processobj, CommentedMap):
-        uri = processobj["id"]
-
-    _convert_stdstreams_to_files(workflowobj)
-
-    if preprocess_only:
-        return loadingContext, uri
-
-    if loadingContext.do_validate:
-        schema.validate_doc(avsc_names, processobj, document_loader, loadingContext.strict)
-
-    # None means default behavior (do update)
-    if loadingContext.do_update in (True, None):
-        if "cwlVersion" not in metadata:
-            metadata["cwlVersion"] = cwlVersion
-        processobj = update.update(
-            processobj, document_loader, fileuri, loadingContext.enable_dev, metadata)
-        document_loader.idx[processobj["id"]] = processobj
-
-    if jobobj is not None:
-        loadingContext.jobdefaults = jobobj
-
-    loadingContext.loader = document_loader
-    loadingContext.avsc_names = avsc_names
-    loadingContext.metadata = metadata
-
-    return loadingContext, uri
-
-
-def make_tool(uri,                # type: Union[Text, CommentedMap, CommentedSeq]
-              loadingContext      # type: LoadingContext
-             ):  # type: (...) -> Process
-    """Make a Python CWL object."""
-    if loadingContext.loader is None:
-        raise ValueError("loadingContext must have a loader")
-    resolveduri, metadata = loadingContext.loader.resolve_ref(uri)
-
-    processobj = None
-    if isinstance(resolveduri, MutableSequence):
-        for obj in resolveduri:
-            if obj['id'].endswith('#main'):
-                processobj = obj
-                break
-        if not processobj:
-            raise WorkflowException(
-                u"Tool file contains graph of multiple objects, must specify "
-                "one of #%s" % ", #".join(
-                    urllib.parse.urldefrag(i["id"])[1] for i in resolveduri
-                    if "id" in i))
-    elif isinstance(resolveduri, MutableMapping):
-        processobj = resolveduri
-    else:
-        raise Exception("Must resolve to list or dict")
-
-    tool = loadingContext.construct_tool_object(processobj, loadingContext)
-
-    if loadingContext.jobdefaults:
-        jobobj = loadingContext.jobdefaults
-        for inp in tool.tool["inputs"]:
-            if shortname(inp["id"]) in jobobj:
-                inp["default"] = jobobj[shortname(inp["id"])]
-
-    return tool
-
-
-def load_tool(argsworkflow,         # type: Union[Text, Dict[Text, Any]]
-              loadingContext=None   # type: Optional[LoadingContext]
-             ):  # type: (...) -> Process
-
-    loadingContext, workflowobj, uri = fetch_document(
-        argsworkflow, loadingContext)
-
-    loadingContext, uri = resolve_and_validate_document(
-        loadingContext, workflowobj, uri)
-
-    return make_tool(uri,
-                     loadingContext)
-
-def resolve_overrides(ov,      # type: CommentedMap
-                      ov_uri,  # type: Text
-                      baseurl  # type: Text
-                     ):  # type: (...) -> List[Dict[Text, Any]]
-    ovloader = Loader(overrides_ctx)
-    ret, _ = ovloader.resolve_all(ov, baseurl)
-    if not isinstance(ret, CommentedMap):
-        raise Exception("Expected CommentedMap, got %s" % type(ret))
-    cwl_docloader = get_schema("v1.0")[0]
-    cwl_docloader.resolve_all(ret, ov_uri)
-    return cast(List[Dict[Text, Any]],
-                ret["http://commonwl.org/cwltool#overrides"])
-
-def load_overrides(ov, base_url):  # type: (Text, Text) -> List[Dict[Text, Any]]
-    ovloader = Loader(overrides_ctx)
-    return resolve_overrides(ovloader.fetch(ov), ov, base_url)