diff env/lib/python3.7/site-packages/schema_salad/ref_resolver.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/schema_salad/ref_resolver.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1441 +0,0 @@
-from __future__ import absolute_import
-import copy
-import logging
-import os
-import re
-import sys
-import xml.sax
-from io import open
-from typing import Callable  # pylint: disable=unused-import
-from typing import (
-    Any,
-    Dict,
-    Iterable,
-    List,
-    MutableMapping,
-    MutableSequence,
-    Optional,
-    Set,
-    Tuple,
-    TypeVar,
-    Union,
-    cast,
-import requests
-from cachecontrol.caches import FileCache
-from cachecontrol.wrapper import CacheControl
-from future.utils import raise_from
-from rdflib.graph import Graph
-from rdflib.namespace import OWL, RDF, RDFS
-from rdflib.plugins.parsers.notation3 import BadSyntax
-from six import StringIO, iteritems, string_types
-from six.moves import range, urllib
-from typing_extensions import Text  # pylint: disable=unused-import
-from ruamel import yaml
-from ruamel.yaml.comments import CommentedMap, CommentedSeq, LineCol
-from .exceptions import ValidationException, SchemaSaladException
-from .sourceline import SourceLine, add_lc_filename, relname
-from .utils import aslist, onWindows
-# move to a regular typing import when Python 3.3-3.6 is no longer supported
-_logger = logging.getLogger("salad")
-ContextType = Dict[Text, Union[Dict[Text, Any], Text, Iterable[Text]]]
-DocumentType = TypeVar("DocumentType", CommentedSeq, CommentedMap)
-DocumentOrStrType = TypeVar("DocumentOrStrType", CommentedSeq, CommentedMap, Text)
-_re_drive = re.compile(r"/([a-zA-Z]):")
-def file_uri(path, split_frag=False):  # type: (str, bool) -> str
-    if path.startswith("file://"):
-        return path
-    if split_frag:
-        pathsp = path.split("#", 2)
-        frag = "#" + urllib.parse.quote(str(pathsp[1])) if len(pathsp) == 2 else ""
-        urlpath = urllib.request.pathname2url(str(pathsp[0]))
-    else:
-        urlpath = urllib.request.pathname2url(path)
-        frag = ""
-    if urlpath.startswith("//"):
-        return "file:{}{}".format(urlpath, frag)
-    return "file://{}{}".format(urlpath, frag)
-def uri_file_path(url):  # type: (str) -> str
-    split = urllib.parse.urlsplit(url)
-    if split.scheme == "file":
-        return urllib.request.url2pathname(str(split.path)) + (
-            "#" + urllib.parse.unquote(str(split.fragment))
-            if bool(split.fragment)
-            else ""
-        )
-    raise ValidationException("Not a file URI: {}".format(url))
-def to_validation_exception(
-    e,
-):  # type: (yaml.error.MarkedYAMLError) -> ValidationException
-    fname_regex = re.compile(r"^file://" + re.escape(os.getcwd()) + "/")
-    exc = ValidationException(e.problem)
-    mark = e.problem_mark
-    exc.file = re.sub(fname_regex, "", mark.name)
-    exc.start = (mark.line + 1, mark.column + 1)
-    exc.end = None
-    if e.context:
-        parent = ValidationException(e.context)
-        mark = e.context_mark
-        parent.file = re.sub(fname_regex, "", mark.name)
-        parent.start = (mark.line + 1, mark.column + 1)
-        parent.end = None
-        parent.children = [exc]
-        return parent
-    else:
-        return exc
-class NormDict(CommentedMap):
-    """A Dict where all keys are normalized using the provided function."""
-    def __init__(self, normalize=Text):  # type: (Callable[[Text], Text]) -> None
-        super(NormDict, self).__init__()
-        self.normalize = normalize
-    def __getitem__(self, key):  # type: (Any) -> Any
-        return super(NormDict, self).__getitem__(self.normalize(key))
-    def __setitem__(self, key, value):  # type: (Any, Any) -> Any
-        return super(NormDict, self).__setitem__(self.normalize(key), value)
-    def __delitem__(self, key):  # type: (Any) -> Any
-        return super(NormDict, self).__delitem__(self.normalize(key))
-    def __contains__(self, key):  # type: (Any) -> Any
-        return super(NormDict, self).__contains__(self.normalize(key))
-def merge_properties(a, b):  # type: (List[Any], List[Any]) -> Dict[Any, Any]
-    c = {}
-    for i in a:
-        if i not in b:
-            c[i] = a[i]
-    for i in b:
-        if i not in a:
-            c[i] = b[i]
-    for i in a:
-        if i in b:
-            c[i] = aslist(a[i]) + aslist(b[i])  # type: ignore
-    return c
-def SubLoader(loader):  # type: (Loader) -> Loader
-    return Loader(
-        loader.ctx,
-        schemagraph=loader.graph,
-        foreign_properties=loader.foreign_properties,
-        idx=loader.idx,
-        cache=loader.cache,
-        fetcher_constructor=loader.fetcher_constructor,
-        skip_schemas=loader.skip_schemas,
-        url_fields=loader.url_fields,
-        allow_attachments=loader.allow_attachments,
-    )
-class Fetcher(object):
-    def fetch_text(self, url):  # type: (Text) -> Text
-        raise NotImplementedError()
-    def check_exists(self, url):  # type: (Text) -> bool
-        raise NotImplementedError()
-    def urljoin(self, base_url, url):  # type: (Text, Text) -> Text
-        raise NotImplementedError()
-    schemes = [u"file", u"http", u"https", u"mailto"]
-    def supported_schemes(self):  # type: () -> List[Text]
-        return self.schemes
-class DefaultFetcher(Fetcher):
-    def __init__(
-        self,
-        cache,  # type: Dict[Text, Union[Text, bool]]
-        session,  # type: Optional[requests.sessions.Session]
-    ):  # type: (...) -> None
-        self.cache = cache
-        self.session = session
-    def fetch_text(self, url):
-        # type: (Text) -> Text
-        if url in self.cache and self.cache[url] is not True:
-            # treat "True" as a placeholder that indicates something exists but
-            # not necessarily what its contents is.
-            return cast(Text, self.cache[url])
-        split = urllib.parse.urlsplit(url)
-        scheme, path = split.scheme, split.path
-        if scheme in [u"http", u"https"] and self.session is not None:
-            try:
-                resp = self.session.get(url)
-                resp.raise_for_status()
-            except Exception as e:
-                raise_from(
-                    ValidationException("Error fetching {}: {}".format(url, e)), e
-                )
-            return resp.text
-        if scheme == "file":
-            try:
-                # On Windows, url.path will be /drive:/path ; on Unix systems,
-                # /path. As we want drive:/path instead of /drive:/path on Windows,
-                # remove the leading /.
-                if os.path.isabs(
-                    path[1:]
-                ):  # checking if pathis valid after removing front / or not
-                    path = path[1:]
-                with open(
-                    urllib.request.url2pathname(str(path)), encoding="utf-8"
-                ) as fp:
-                    return Text(fp.read())
-            except (OSError, IOError) as err:
-                if err.filename == path:
-                    raise_from(ValidationException(Text(err)), err)
-                else:
-                    raise_from(
-                        ValidationException("Error reading {}: {}".format(url, err)),
-                        err,
-                    )
-        raise ValidationException("Unsupported scheme in url: {}".format(url))
-    def check_exists(self, url):  # type: (Text) -> bool
-        if url in self.cache:
-            return True
-        split = urllib.parse.urlsplit(url)
-        scheme, path = split.scheme, split.path
-        if scheme in [u"http", u"https"] and self.session is not None:
-            try:
-                resp = self.session.head(url)
-                resp.raise_for_status()
-            except Exception:
-                return False
-            self.cache[url] = True
-            return True
-        if scheme == "file":
-            return os.path.exists(urllib.request.url2pathname(str(path)))
-        if scheme == "mailto":
-            return True
-        raise ValidationException("Unsupported scheme in url: {}".format(url))
-    def urljoin(self, base_url, url):  # type: (Text, Text) -> Text
-        if url.startswith("_:"):
-            return url
-        basesplit = urllib.parse.urlsplit(base_url)
-        split = urllib.parse.urlsplit(url)
-        if basesplit.scheme and basesplit.scheme != "file" and split.scheme == "file":
-            raise ValidationException(
-                "Not resolving potential remote exploit {} from base {}".format(
-                    url, base_url
-                )
-            )
-        if sys.platform == "win32":
-            if base_url == url:
-                return url
-            basesplit = urllib.parse.urlsplit(base_url)
-            # note that below might split
-            # "C:" with "C" as URI scheme
-            split = urllib.parse.urlsplit(url)
-            has_drive = split.scheme and len(split.scheme) == 1
-            if basesplit.scheme == "file":
-                # Special handling of relative file references on Windows
-                # as urllib seems to not be quite up to the job
-                # netloc MIGHT appear in equivalents of UNC Strings
-                # \\server1.example.com\path as
-                # file:///server1.example.com/path
-                # https://tools.ietf.org/html/rfc8089#appendix-E.3.2
-                # (TODO: test this)
-                netloc = split.netloc or basesplit.netloc
-                # Check if url is a local path like "C:/Users/fred"
-                # or actually an absolute URI like http://example.com/fred
-                if has_drive:
-                    # Assume split.scheme is actually a drive, e.g. "C:"
-                    # so we'll recombine into a path
-                    path_with_drive = urllib.parse.urlunsplit(
-                        (split.scheme, "", split.path, "", "")
-                    )
-                    # Compose new file:/// URI with path_with_drive
-                    # .. carrying over any #fragment (?query just in case..)
-                    return urllib.parse.urlunsplit(
-                        ("file", netloc, path_with_drive, split.query, split.fragment)
-                    )
-                if (
-                    not split.scheme
-                    and not netloc
-                    and split.path
-                    and split.path.startswith("/")
-                ):
-                    # Relative - but does it have a drive?
-                    base_drive = _re_drive.match(basesplit.path)
-                    drive = _re_drive.match(split.path)
-                    if base_drive and not drive:
-                        # Keep drive letter from base_url
-                        # https://tools.ietf.org/html/rfc8089#appendix-E.2.1
-                        # e.g. urljoin("file:///D:/bar/a.txt", "/foo/b.txt")
-                        #          == file:///D:/foo/b.txt
-                        path_with_drive = "/{}:{}".format(
-                            base_drive.group(1), split.path
-                        )
-                        return urllib.parse.urlunsplit(
-                            (
-                                "file",
-                                netloc,
-                                path_with_drive,
-                                split.query,
-                                split.fragment,
-                            )
-                        )
-                # else: fall-through to resolve as relative URI
-            elif has_drive:
-                # Base is http://something but url is C:/something - which urllib
-                # would wrongly resolve as an absolute path that could later be used
-                # to access local files
-                raise ValidationException(
-                    "Not resolving potential remote exploit {} from base {}".format(
-                        url, base_url
-                    )
-                )
-        return urllib.parse.urljoin(base_url, url)
-idx_type = Dict[Text, Union[CommentedMap, CommentedSeq, Text, None]]
-fetcher_sig = Callable[
-    [Dict[Text, Union[Text, bool]], requests.sessions.Session], Fetcher
-attachements_sig = Callable[[Union[CommentedMap, CommentedSeq]], bool]
-class Loader(object):
-    def __init__(
-        self,
-        ctx,  # type: ContextType
-        schemagraph=None,  # type: Optional[Graph]
-        foreign_properties=None,  # type: Optional[Set[Text]]
-        idx=None,  # type: Optional[idx_type]
-        cache=None,  # type: Optional[Dict[Text, Any]]
-        session=None,  # type: Optional[requests.sessions.Session]
-        fetcher_constructor=None,  # type: Optional[fetcher_sig]
-        skip_schemas=None,  # type: Optional[bool]
-        url_fields=None,  # type: Optional[Set[Text]]
-        allow_attachments=None,  # type: Optional[attachements_sig]
-    ):
-        # type: (...) -> None
-        if idx is not None:
-            self.idx = idx
-        else:
-            self.idx = NormDict(lambda url: urllib.parse.urlsplit(url).geturl())
-        self.ctx = {}  # type: ContextType
-        if schemagraph is not None:
-            self.graph = schemagraph
-        else:
-            self.graph = Graph()
-        if foreign_properties is not None:
-            self.foreign_properties = set(foreign_properties)
-        else:
-            self.foreign_properties = set()
-        if cache is not None:
-            self.cache = cache
-        else:
-            self.cache = {}
-        if skip_schemas is not None:
-            self.skip_schemas = skip_schemas
-        else:
-            self.skip_schemas = False
-        if session is None:
-            if "HOME" in os.environ:
-                self.session = CacheControl(
-                    requests.Session(),
-                    cache=FileCache(
-                        os.path.join(os.environ["HOME"], ".cache", "salad")
-                    ),
-                )
-            elif "TMP" in os.environ:
-                self.session = CacheControl(
-                    requests.Session(),
-                    cache=FileCache(os.path.join(os.environ["TMP"], ".cache", "salad")),
-                )
-            else:
-                self.session = CacheControl(
-                    requests.Session(),
-                    cache=FileCache(os.path.join("/tmp", ".cache", "salad")),
-                )
-        else:
-            self.session = session
-        if fetcher_constructor is not None:
-            self.fetcher_constructor = fetcher_constructor
-        else:
-            self.fetcher_constructor = DefaultFetcher
-        self.fetcher = self.fetcher_constructor(self.cache, self.session)
-        self.fetch_text = self.fetcher.fetch_text
-        self.check_exists = self.fetcher.check_exists
-        if url_fields is None:
-            self.url_fields = set()  # type: Set[Text]
-        else:
-            self.url_fields = set(url_fields)
-        self.scoped_ref_fields = {}  # type: Dict[Text, int]
-        self.vocab_fields = set()  # type: Set[Text]
-        self.identifiers = []  # type: List[Text]
-        self.identity_links = set()  # type: Set[Text]
-        self.standalone = None  # type: Optional[Set[Text]]
-        self.nolinkcheck = set()  # type: Set[Text]
-        self.vocab = {}  # type: Dict[Text, Text]
-        self.rvocab = {}  # type: Dict[Text, Text]
-        self.idmap = {}  # type: Dict[Text, Any]
-        self.mapPredicate = {}  # type: Dict[Text, Text]
-        self.type_dsl_fields = set()  # type: Set[Text]
-        self.subscopes = {}  # type: Dict[Text, Text]
-        self.secondaryFile_dsl_fields = set()  # type: Set[Text]
-        self.allow_attachments = allow_attachments
-        self.add_context(ctx)
-    def expand_url(
-        self,
-        url,  # type: Text
-        base_url,  # type: Text
-        scoped_id=False,  # type: bool
-        vocab_term=False,  # type: bool
-        scoped_ref=None,  # type: Optional[int]
-    ):
-        # type: (...) -> Text
-        if url in (u"@id", u"@type") or url is None:
-            return url
-        if vocab_term and url in self.vocab:
-            return url
-        if url.startswith("_:"):
-            return url
-        if bool(self.vocab) and u":" in url:
-            prefix = url.split(u":")[0]
-            if prefix in self.vocab:
-                url = self.vocab[prefix] + url[len(prefix) + 1 :]
-            elif prefix not in self.fetcher.supported_schemes():
-                _logger.warning(
-                    "URI prefix '%s' of '%s' not recognized, are you missing a "
-                    "$namespaces section?",
-                    prefix,
-                    url,
-                )
-        split = urllib.parse.urlsplit(url)
-        if (
-            (bool(split.scheme) and split.scheme in [u"http", u"https", u"file"])
-            or url.startswith(u"$(")
-            or url.startswith(u"${")
-        ):
-            pass
-        elif scoped_id and not bool(split.fragment):
-            splitbase = urllib.parse.urlsplit(base_url)
-            frg = u""
-            if bool(splitbase.fragment):
-                frg = splitbase.fragment + u"/" + split.path
-            else:
-                frg = split.path
-            pt = splitbase.path if splitbase.path != "" else "/"
-            url = urllib.parse.urlunsplit(
-                (splitbase.scheme, splitbase.netloc, pt, splitbase.query, frg)
-            )
-        elif scoped_ref is not None and not split.fragment:
-            pass
-        else:
-            url = self.fetcher.urljoin(base_url, url)
-        if vocab_term and url in self.rvocab:
-            return self.rvocab[url]
-        else:
-            return url
-    def _add_properties(self, s):  # type: (Text) -> None
-        for _, _, rng in self.graph.triples((s, RDFS.range, None)):
-            literal = (
-                Text(rng).startswith(u"http://www.w3.org/2001/XMLSchema#")
-                and not Text(rng) == u"http://www.w3.org/2001/XMLSchema#anyURI"
-            ) or Text(rng) == u"http://www.w3.org/2000/01/rdf-schema#Literal"
-            if not literal:
-                self.url_fields.add(Text(s))
-        self.foreign_properties.add(Text(s))
-    def add_namespaces(self, ns):  # type: (Dict[Text, Text]) -> None
-        self.vocab.update(ns)
-    def add_schemas(self, ns, base_url):
-        # type: (Union[List[Text], Text], Text) -> None
-        if self.skip_schemas:
-            return
-        for sch in aslist(ns):
-            try:
-                fetchurl = self.fetcher.urljoin(base_url, sch)
-                if fetchurl not in self.cache or self.cache[fetchurl] is True:
-                    _logger.debug("Getting external schema %s", fetchurl)
-                    content = self.fetch_text(fetchurl)
-                    self.cache[fetchurl] = Graph()
-                    for fmt in ["xml", "turtle", "rdfa"]:
-                        try:
-                            self.cache[fetchurl].parse(
-                                data=content, format=fmt, publicID=str(fetchurl)
-                            )
-                            self.graph += self.cache[fetchurl]
-                            break
-                        except xml.sax.SAXParseException:
-                            pass
-                        except TypeError:
-                            pass
-                        except BadSyntax:
-                            pass
-            except Exception as e:
-                _logger.warning(
-                    "Could not load extension schema %s: %s", fetchurl, Text(e)
-                )
-        for s, _, _ in self.graph.triples((None, RDF.type, RDF.Property)):
-            self._add_properties(s)
-        for s, _, o in self.graph.triples((None, RDFS.subPropertyOf, None)):
-            self._add_properties(s)
-            self._add_properties(o)
-        for s, _, _ in self.graph.triples((None, RDFS.range, None)):
-            self._add_properties(s)
-        for s, _, _ in self.graph.triples((None, RDF.type, OWL.ObjectProperty)):
-            self._add_properties(s)
-        for s, _, _ in self.graph.triples((None, None, None)):
-            self.idx[Text(s)] = None
-    def add_context(self, newcontext, baseuri=""):
-        # type: (ContextType, Text) -> None
-        if bool(self.vocab):
-            raise ValidationException("Refreshing context that already has stuff in it")
-        self.url_fields = set(("$schemas",))
-        self.scoped_ref_fields = {}
-        self.vocab_fields = set()
-        self.identifiers = []
-        self.identity_links = set()
-        self.standalone = set()
-        self.nolinkcheck = set()
-        self.idmap = {}
-        self.mapPredicate = {}
-        self.vocab = {}
-        self.rvocab = {}
-        self.type_dsl_fields = set()
-        self.secondaryFile_dsl_fields = set()
-        self.subscopes = {}
-        self.ctx.update(_copy_dict_without_key(newcontext, u"@context"))
-        _logger.debug("ctx is %s", self.ctx)
-        for key, value in self.ctx.items():
-            if value == u"@id":
-                self.identifiers.append(key)
-                self.identity_links.add(key)
-            elif isinstance(value, MutableMapping):
-                if value.get(u"@type") == u"@id":
-                    self.url_fields.add(key)
-                    if u"refScope" in value:
-                        self.scoped_ref_fields[key] = value[u"refScope"]
-                    if value.get(u"identity", False):
-                        self.identity_links.add(key)
-                if value.get(u"@type") == u"@vocab":
-                    self.url_fields.add(key)
-                    self.vocab_fields.add(key)
-                    if u"refScope" in value:
-                        self.scoped_ref_fields[key] = value[u"refScope"]
-                    if value.get(u"typeDSL"):
-                        self.type_dsl_fields.add(key)
-                if value.get(u"secondaryFilesDSL"):
-                    self.secondaryFile_dsl_fields.add(key)
-                if value.get(u"noLinkCheck"):
-                    self.nolinkcheck.add(key)
-                if value.get(u"mapSubject"):
-                    self.idmap[key] = value[u"mapSubject"]
-                if value.get(u"mapPredicate"):
-                    self.mapPredicate[key] = value[u"mapPredicate"]
-                if value.get(u"@id"):
-                    self.vocab[key] = value[u"@id"]
-                if value.get(u"subscope"):
-                    self.subscopes[key] = value[u"subscope"]
-            elif isinstance(value, string_types):
-                self.vocab[key] = value
-        for k, v in self.vocab.items():
-            self.rvocab[self.expand_url(v, u"", scoped_id=False)] = k
-        self.identifiers.sort()
-        _logger.debug("identifiers is %s", self.identifiers)
-        _logger.debug("identity_links is %s", self.identity_links)
-        _logger.debug("url_fields is %s", self.url_fields)
-        _logger.debug("vocab_fields is %s", self.vocab_fields)
-        _logger.debug("vocab is %s", self.vocab)
-    resolved_ref_type = Tuple[
-        Optional[Union[CommentedMap, CommentedSeq, Text]], CommentedMap
-    ]
-    def resolve_ref(
-        self,
-        ref,  # type: Union[CommentedMap, CommentedSeq, Text]
-        base_url=None,  # type: Optional[Text]
-        checklinks=True,  # type: bool
-        strict_foreign_properties=False,  # type: bool
-    ):
-        # type: (...) -> Loader.resolved_ref_type
-        lref = ref  # type: Union[CommentedMap, CommentedSeq, Text, None]
-        obj = None  # type: Optional[CommentedMap]
-        resolved_obj = None  # type: Optional[Union[CommentedMap, CommentedSeq, Text]]
-        inc = False
-        mixin = None  # type: Optional[MutableMapping[Text, Any]]
-        if not base_url:
-            base_url = file_uri(os.getcwd()) + "/"
-        sl = SourceLine(obj, None)
-        # If `ref` is a dict, look for special directives.
-        if isinstance(lref, CommentedMap):
-            obj = lref
-            if "$import" in obj:
-                sl = SourceLine(obj, "$import")
-                if len(obj) == 1:
-                    lref = obj[u"$import"]
-                    obj = None
-                else:
-                    raise ValidationException(
-                        u"'$import' must be the only field in {}".format(obj), sl
-                    )
-            elif "$include" in obj:
-                sl = SourceLine(obj, "$include")
-                if len(obj) == 1:
-                    lref = obj[u"$include"]
-                    inc = True
-                    obj = None
-                else:
-                    raise ValidationException(
-                        u"'$include' must be the only field in {}".format(obj), sl
-                    )
-            elif "$mixin" in obj:
-                sl = SourceLine(obj, "$mixin")
-                lref = obj[u"$mixin"]
-                mixin = obj
-                obj = None
-            else:
-                lref = None
-                for identifier in self.identifiers:
-                    if identifier in obj:
-                        lref = obj[identifier]
-                        break
-                if not lref:
-                    raise ValidationException(
-                        u"Object `{}` does not have identifier field in {}".format(
-                            obj, self.identifiers
-                        ),
-                        sl,
-                    )
-        if not isinstance(lref, string_types):
-            raise ValidationException(
-                u"Expected CommentedMap or string, got {}: `{}`".format(
-                    type(lref), lref
-                )
-            )
-        if isinstance(lref, string_types) and os.sep == "\\":
-            # Convert Windows path separator in ref
-            lref = lref.replace("\\", "/")
-        url = self.expand_url(lref, base_url, scoped_id=(obj is not None))
-        # Has this reference been loaded already?
-        if url in self.idx and (not mixin):
-            resolved_obj = self.idx[url]
-            if isinstance(resolved_obj, MutableMapping):
-                metadata = self.idx.get(urllib.parse.urldefrag(url)[0], CommentedMap())
-                if isinstance(metadata, MutableMapping):
-                    if u"$graph" in resolved_obj:
-                        metadata = _copy_dict_without_key(resolved_obj, u"$graph")
-                        return resolved_obj[u"$graph"], metadata
-                    else:
-                        return resolved_obj, metadata
-                else:
-                    raise ValidationException(
-                        u"Expected CommentedMap, got {}: `{}`".format(
-                            type(metadata), metadata
-                        )
-                    )
-            elif isinstance(resolved_obj, MutableSequence):
-                metadata = self.idx.get(urllib.parse.urldefrag(url)[0], CommentedMap())
-                if isinstance(metadata, MutableMapping):
-                    return resolved_obj, metadata
-                else:
-                    return resolved_obj, CommentedMap()
-            elif isinstance(resolved_obj, string_types):
-                return resolved_obj, CommentedMap()
-            else:
-                raise ValidationException(
-                    u"Expected MutableMapping or MutableSequence, got {}: `{}`".format(
-                        type(resolved_obj), resolved_obj
-                    )
-                )
-        # "$include" directive means load raw text
-        if inc:
-            return self.fetch_text(url), CommentedMap()
-        doc = None
-        if isinstance(obj, MutableMapping):
-            for identifier in self.identifiers:
-                obj[identifier] = url
-            doc_url = url
-        else:
-            # Load structured document
-            doc_url, frg = urllib.parse.urldefrag(url)
-            if doc_url in self.idx and (not mixin):
-                # If the base document is in the index, it was already loaded,
-                # so if we didn't find the reference earlier then it must not
-                # exist.
-                raise ValidationException(
-                    u"Reference `#{}` not found in file `{}`.".format(frg, doc_url), sl
-                )
-            doc = self.fetch(doc_url, inject_ids=(not mixin))
-        # Recursively expand urls and resolve directives
-        if bool(mixin):
-            doc = copy.deepcopy(doc)
-            if doc is not None and mixin is not None:
-                doc.update(mixin)
-                del doc["$mixin"]
-            resolved_obj, metadata = self.resolve_all(
-                doc,
-                base_url,
-                file_base=doc_url,
-                checklinks=checklinks,
-                strict_foreign_properties=strict_foreign_properties,
-            )
-        else:
-            if doc:
-                resolve_target = doc
-            else:
-                resolve_target = obj
-            resolved_obj, metadata = self.resolve_all(
-                resolve_target,
-                doc_url,
-                checklinks=checklinks,
-                strict_foreign_properties=strict_foreign_properties,
-            )
-        # Requested reference should be in the index now, otherwise it's a bad
-        # reference
-        if not bool(mixin):
-            if url in self.idx:
-                resolved_obj = self.idx[url]
-            else:
-                raise ValidationException(
-                    "Reference `{}` is not in the index. Index contains: {}".format(
-                        url, ", ".join(self.idx)
-                    )
-                )
-        if isinstance(resolved_obj, CommentedMap):
-            if u"$graph" in resolved_obj:
-                metadata = _copy_dict_without_key(resolved_obj, u"$graph")
-                return resolved_obj[u"$graph"], metadata
-            else:
-                return resolved_obj, metadata
-        else:
-            return resolved_obj, metadata
-    def _resolve_idmap(
-        self,
-        document,  # type: CommentedMap
-        loader,  # type: Loader
-    ):
-        # type: (...) -> None
-        # Convert fields with mapSubject into lists
-        # use mapPredicate if the mapped value isn't a dict.
-        for idmapField in loader.idmap:
-            if idmapField in document:
-                idmapFieldValue = document[idmapField]
-                if (
-                    isinstance(idmapFieldValue, MutableMapping)
-                    and "$import" not in idmapFieldValue
-                    and "$include" not in idmapFieldValue
-                ):
-                    ls = CommentedSeq()
-                    for k in sorted(idmapFieldValue.keys()):
-                        val = idmapFieldValue[k]
-                        v = None  # type: Optional[CommentedMap]
-                        if not isinstance(val, CommentedMap):
-                            if idmapField in loader.mapPredicate:
-                                v = CommentedMap(
-                                    ((loader.mapPredicate[idmapField], val),)
-                                )
-                                v.lc.add_kv_line_col(
-                                    loader.mapPredicate[idmapField],
-                                    document[idmapField].lc.data[k],
-                                )
-                                v.lc.filename = document.lc.filename
-                            else:
-                                raise ValidationException(
-                                    "mapSubject '{}' value '{}' is not a dict "
-                                    "and does not have a mapPredicate.".format(k, v)
-                                )
-                        else:
-                            v = val
-                        v[loader.idmap[idmapField]] = k
-                        v.lc.add_kv_line_col(
-                            loader.idmap[idmapField], document[idmapField].lc.data[k]
-                        )
-                        v.lc.filename = document.lc.filename
-                        ls.lc.add_kv_line_col(len(ls), document[idmapField].lc.data[k])
-                        ls.lc.filename = document.lc.filename
-                        ls.append(v)
-                    document[idmapField] = ls
-    typeDSLregex = re.compile(Text(r"^([^[?]+)(\[\])?(\?)?$"))
-    def _type_dsl(
-        self,
-        t,  # type: Union[Text, Dict[Text, Text], List[Text]]
-        lc,  # type: LineCol
-        filename,  # type: Text
-    ):  # type: (...) -> Union[Text, Dict[Text, Text], List[Text]]
-        if not isinstance(t, string_types):
-            return t
-        m = Loader.typeDSLregex.match(t)
-        if not m:
-            return t
-        first = m.group(1)
-        second = third = None
-        if bool(m.group(2)):
-            second = CommentedMap((("type", "array"), ("items", first)))
-            second.lc.add_kv_line_col("type", lc)
-            second.lc.add_kv_line_col("items", lc)
-            second.lc.filename = filename
-        if bool(m.group(3)):
-            third = CommentedSeq([u"null", second or first])
-            third.lc.add_kv_line_col(0, lc)
-            third.lc.add_kv_line_col(1, lc)
-            third.lc.filename = filename
-        return third or second or first
-    def _secondaryFile_dsl(
-        self,
-        t,  # type: Union[Text, Dict[Text, Text], List[Text]]
-        lc,  # type: LineCol
-        filename,  # type: Text
-    ):  # type: (...) -> Union[Text, Dict[Text, Text], List[Text]]
-        if not isinstance(t, string_types):
-            return t
-        pat = t
-        req = None
-        if t.endswith("?"):
-            pat = t[0:-1]
-            req = False
-        second = CommentedMap((("pattern", pat), ("required", req)))
-        second.lc.add_kv_line_col("pattern", lc)
-        second.lc.add_kv_line_col("required", lc)
-        second.lc.filename = filename
-        return second
-    def _apply_dsl(
-        self,
-        datum,  # type: Union[Text, Dict[Any, Any], List[Any]]
-        d,  # type: Text
-        loader,  # type: Loader
-        lc,  # type: LineCol
-        filename,  # type: Text
-    ):
-        # type: (...) -> Union[Text, Dict[Any, Any], List[Any]]
-        if d in loader.type_dsl_fields:
-            return self._type_dsl(datum, lc, filename)
-        elif d in loader.secondaryFile_dsl_fields:
-            return self._secondaryFile_dsl(datum, lc, filename)
-        else:
-            return datum
-    def _resolve_dsl(
-        self,
-        document,  # type: CommentedMap
-        loader,  # type: Loader
-    ):
-        # type: (...) -> None
-        fields = list(loader.type_dsl_fields)
-        fields.extend(loader.secondaryFile_dsl_fields)
-        for d in fields:
-            if d in document:
-                datum2 = datum = document[d]
-                if isinstance(datum, string_types):
-                    datum2 = self._apply_dsl(
-                        datum, d, loader, document.lc.data[d], document.lc.filename
-                    )
-                elif isinstance(datum, CommentedSeq):
-                    datum2 = CommentedSeq()
-                    for n, t in enumerate(datum):
-                        if datum.lc and datum.lc.data:
-                            datum2.lc.add_kv_line_col(len(datum2), datum.lc.data[n])
-                            datum2.append(
-                                self._apply_dsl(
-                                    t, d, loader, datum.lc.data[n], document.lc.filename
-                                )
-                            )
-                        else:
-                            datum2.append(self._apply_dsl(t, d, loader, LineCol(), ""))
-                if isinstance(datum2, CommentedSeq):
-                    datum3 = CommentedSeq()
-                    seen = []  # type: List[Text]
-                    for i, item in enumerate(datum2):
-                        if isinstance(item, CommentedSeq):
-                            for j, v in enumerate(item):
-                                if v not in seen:
-                                    datum3.lc.add_kv_line_col(
-                                        len(datum3), item.lc.data[j]
-                                    )
-                                    datum3.append(v)
-                                    seen.append(v)
-                        else:
-                            if item not in seen:
-                                if datum2.lc and datum2.lc.data:
-                                    datum3.lc.add_kv_line_col(
-                                        len(datum3), datum2.lc.data[i]
-                                    )
-                                datum3.append(item)
-                                seen.append(item)
-                    document[d] = datum3
-                else:
-                    document[d] = datum2
-    def _resolve_identifier(self, document, loader, base_url):
-        # type: (CommentedMap, Loader, Text) -> Text
-        # Expand identifier field (usually 'id') to resolve scope
-        for identifer in loader.identifiers:
-            if identifer in document:
-                if isinstance(document[identifer], string_types):
-                    document[identifer] = loader.expand_url(
-                        document[identifer], base_url, scoped_id=True
-                    )
-                    if document[identifer] not in loader.idx or isinstance(
-                        loader.idx[document[identifer]], string_types
-                    ):
-                        loader.idx[document[identifer]] = document
-                    base_url = document[identifer]
-                else:
-                    raise ValidationException(
-                        "identifier field '{}' must be a string".format(
-                            document[identifer]
-                        )
-                    )
-        return base_url
-    def _resolve_identity(self, document, loader, base_url):
-        # type: (Dict[Text, List[Text]], Loader, Text) -> None
-        # Resolve scope for identity fields (fields where the value is the
-        # identity of a standalone node, such as enum symbols)
-        for identifer in loader.identity_links:
-            if identifer in document and isinstance(
-                document[identifer], MutableSequence
-            ):
-                for n, _v in enumerate(document[identifer]):
-                    if isinstance(document[identifer][n], string_types):
-                        document[identifer][n] = loader.expand_url(
-                            document[identifer][n], base_url, scoped_id=True
-                        )
-                        if document[identifer][n] not in loader.idx:
-                            loader.idx[document[identifer][n]] = document[identifer][n]
-    def _normalize_fields(self, document, loader):
-        # type: (CommentedMap, Loader) -> None
-        # Normalize fields which are prefixed or full URIn to vocabulary terms
-        for d in list(document.keys()):
-            d2 = loader.expand_url(d, u"", scoped_id=False, vocab_term=True)
-            if d != d2:
-                document[d2] = document[d]
-                document.lc.add_kv_line_col(d2, document.lc.data[d])
-                del document[d]
-    def _resolve_uris(
-        self,
-        document,  # type: Dict[Text, Union[Text, List[Text]]]
-        loader,  # type: Loader
-        base_url,  # type: Text
-    ):
-        # type: (...) -> None
-        # Resolve remaining URLs based on document base
-        for d in loader.url_fields:
-            if d in document:
-                datum = document[d]
-                if isinstance(datum, string_types):
-                    document[d] = loader.expand_url(
-                        datum,
-                        base_url,
-                        scoped_id=False,
-                        vocab_term=(d in loader.vocab_fields),
-                        scoped_ref=loader.scoped_ref_fields.get(d),
-                    )
-                elif isinstance(datum, MutableSequence):
-                    for i, url in enumerate(datum):
-                        if isinstance(url, string_types):
-                            datum[i] = loader.expand_url(
-                                url,
-                                base_url,
-                                scoped_id=False,
-                                vocab_term=(d in loader.vocab_fields),
-                                scoped_ref=loader.scoped_ref_fields.get(d),
-                            )
-    def resolve_all(
-        self,
-        document,  # type: Union[CommentedMap, CommentedSeq]
-        base_url,  # type: Text
-        file_base=None,  # type: Optional[Text]
-        checklinks=True,  # type: bool
-        strict_foreign_properties=False,  # type: bool
-    ):
-        # type: (...) -> Loader.resolved_ref_type
-        loader = self
-        metadata = CommentedMap()  # type: CommentedMap
-        if file_base is None:
-            file_base = base_url
-        if isinstance(document, CommentedMap):
-            # Handle $import and $include
-            if u"$import" in document or u"$include" in document:
-                return self.resolve_ref(
-                    document,
-                    base_url=file_base,
-                    checklinks=checklinks,
-                    strict_foreign_properties=strict_foreign_properties,
-                )
-            elif u"$mixin" in document:
-                return self.resolve_ref(
-                    document,
-                    base_url=base_url,
-                    checklinks=checklinks,
-                    strict_foreign_properties=strict_foreign_properties,
-                )
-        elif isinstance(document, CommentedSeq):
-            pass
-        elif isinstance(document, (list, dict)):
-            raise ValidationException(
-                "Expected CommentedMap or CommentedSeq, got {}: `{}`".format(
-                    type(document), document
-                )
-            )
-        else:
-            return (document, metadata)
-        newctx = None  # type: Optional[Loader]
-        if isinstance(document, CommentedMap):
-            # Handle $base, $profile, $namespaces, $schemas and $graph
-            if u"$base" in document:
-                base_url = document[u"$base"]
-            if u"$profile" in document:
-                if newctx is None:
-                    newctx = SubLoader(self)
-                newctx.add_namespaces(document.get(u"$namespaces", CommentedMap()))
-                newctx.add_schemas(document.get(u"$schemas", []), document[u"$profile"])
-            if u"$namespaces" in document:
-                if newctx is None:
-                    newctx = SubLoader(self)
-                newctx.add_namespaces(document[u"$namespaces"])
-            if u"$schemas" in document:
-                if newctx is None:
-                    newctx = SubLoader(self)
-                newctx.add_schemas(document[u"$schemas"], file_base)
-            if newctx is not None:
-                loader = newctx
-            for identifer in loader.identity_links:
-                if identifer in document:
-                    if isinstance(document[identifer], string_types):
-                        document[identifer] = loader.expand_url(
-                            document[identifer], base_url, scoped_id=True
-                        )
-                        loader.idx[document[identifer]] = document
-            metadata = document
-            if u"$graph" in document:
-                document = document[u"$graph"]
-        if isinstance(document, CommentedMap):
-            self._normalize_fields(document, loader)
-            self._resolve_idmap(document, loader)
-            self._resolve_dsl(document, loader)
-            base_url = self._resolve_identifier(document, loader, base_url)
-            self._resolve_identity(document, loader, base_url)
-            self._resolve_uris(document, loader, base_url)
-            try:
-                for key, val in document.items():
-                    subscope = ""  # type: Text
-                    if key in loader.subscopes:
-                        subscope = "/" + loader.subscopes[key]
-                    document[key], _ = loader.resolve_all(
-                        val, base_url + subscope, file_base=file_base, checklinks=False
-                    )
-            except ValidationException as v:
-                _logger.warning("loader is %s", id(loader), exc_info=True)
-                raise_from(
-                    ValidationException(
-                        "({}) ({}) Validation error in field {}:".format(
-                            id(loader), file_base, key
-                        ),
-                        None,
-                        [v],
-                    ),
-                    v,
-                )
-        elif isinstance(document, CommentedSeq):
-            i = 0
-            try:
-                while i < len(document):
-                    val = document[i]
-                    if isinstance(val, CommentedMap) and (
-                        u"$import" in val or u"$mixin" in val
-                    ):
-                        l, import_metadata = loader.resolve_ref(
-                            val, base_url=file_base, checklinks=False
-                        )
-                        metadata.setdefault("$import_metadata", {})
-                        for identifier in loader.identifiers:
-                            if identifier in import_metadata:
-                                metadata["$import_metadata"][
-                                    import_metadata[identifier]
-                                ] = import_metadata
-                        if isinstance(l, CommentedSeq):
-                            lc = document.lc.data[i]
-                            del document[i]
-                            llen = len(l)
-                            for j in range(len(document) + llen, i + llen, -1):
-                                document.lc.data[j - 1] = document.lc.data[j - llen]
-                            for item in l:
-                                document.insert(i, item)
-                                document.lc.data[i] = lc
-                                i += 1
-                        else:
-                            document[i] = l
-                            i += 1
-                    else:
-                        document[i], _ = loader.resolve_all(
-                            val, base_url, file_base=file_base, checklinks=False
-                        )
-                        i += 1
-            except ValidationException as v:
-                _logger.warning("failed", exc_info=True)
-                raise_from(
-                    ValidationException(
-                        "({}) ({}) Validation error in position {}:".format(
-                            id(loader), file_base, i
-                        ),
-                        None,
-                        [v],
-                    ),
-                    v,
-                )
-        if checklinks:
-            all_doc_ids = {}  # type: Dict[Text, Text]
-            loader.validate_links(
-                document,
-                u"",
-                all_doc_ids,
-                strict_foreign_properties=strict_foreign_properties,
-            )
-        return document, metadata
-    def fetch(self, url, inject_ids=True):  # type: (Text, bool) -> Any
-        if url in self.idx:
-            return self.idx[url]
-        try:
-            text = self.fetch_text(url)
-            if isinstance(text, bytes):
-                textIO = StringIO(text.decode("utf-8"))
-            else:
-                textIO = StringIO(text)
-            textIO.name = str(url)
-            attachments = yaml.round_trip_load_all(textIO, preserve_quotes=True)
-            result = next(attachments)
-            if self.allow_attachments is not None and self.allow_attachments(result):
-                i = 1
-                for a in attachments:
-                    self.idx["{}#attachment-{}".format(url, i)] = a
-                    i += 1
-            add_lc_filename(result, url)
-        except yaml.error.MarkedYAMLError as e:
-            raise_from(to_validation_exception(e), e)
-        if isinstance(result, CommentedMap) and inject_ids and bool(self.identifiers):
-            for identifier in self.identifiers:
-                if identifier not in result:
-                    result[identifier] = url
-                self.idx[
-                    self.expand_url(result[identifier], url, scoped_id=True)
-                ] = result
-        self.idx[url] = result
-        return result
-    FieldType = TypeVar("FieldType", Text, CommentedSeq, CommentedMap)
-    def validate_scoped(self, field, link, docid):
-        # type: (Text, Text, Text) -> Text
-        split = urllib.parse.urlsplit(docid)
-        sp = split.fragment.split(u"/")
-        n = self.scoped_ref_fields[field]
-        while n > 0 and len(sp) > 0:
-            sp.pop()
-            n -= 1
-        tried = []
-        while True:
-            sp.append(link)
-            url = urllib.parse.urlunsplit(
-                (split.scheme, split.netloc, split.path, split.query, u"/".join(sp))
-            )
-            tried.append(url)
-            if url in self.idx:
-                return url
-            sp.pop()
-            if len(sp) == 0:
-                break
-            sp.pop()
-        if onWindows() and link.startswith("file:"):
-            link = link.lower()
-        raise ValidationException(
-            "Field `{}` references unknown identifier `{}`, tried {}".format(
-                field, link, ", ".join(tried)
-            )
-        )
-    def validate_link(self, field, link, docid, all_doc_ids):
-        # type: (Text, Loader.FieldType, Text, Dict[Text, Text]) -> Loader.FieldType
-        if field in self.nolinkcheck:
-            return link
-        if isinstance(link, string_types):
-            if field in self.vocab_fields:
-                if (
-                    link not in self.vocab
-                    and link not in self.idx
-                    and link not in self.rvocab
-                ):
-                    if field in self.scoped_ref_fields:
-                        return self.validate_scoped(field, link, docid)
-                    elif not self.check_exists(link):
-                        raise ValidationException(
-                            "Field `{}` contains undefined reference to `{}`".format(
-                                field, link
-                            )
-                        )
-            elif link not in self.idx and link not in self.rvocab:
-                if field in self.scoped_ref_fields:
-                    return self.validate_scoped(field, link, docid)
-                elif not self.check_exists(link):
-                    raise ValidationException(
-                        "Field `{}` contains undefined reference to `{}`".format(
-                            field, link
-                        )
-                    )
-        elif isinstance(link, CommentedSeq):
-            errors = []
-            for n, i in enumerate(link):
-                try:
-                    link[n] = self.validate_link(field, i, docid, all_doc_ids)
-                except ValidationException as v:
-                    errors.append(v)
-            if bool(errors):
-                raise ValidationException("", None, errors)
-        elif isinstance(link, CommentedMap):
-            self.validate_links(link, docid, all_doc_ids)
-        else:
-            raise ValidationException(
-                "`{}` field is {}, expected string, list, or a dict.".format(
-                    field, type(link).__name__
-                )
-            )
-        return link
-    def getid(self, d):  # type: (Any) -> Optional[Text]
-        if isinstance(d, MutableMapping):
-            for i in self.identifiers:
-                if i in d:
-                    idd = d[i]
-                    if isinstance(idd, string_types):
-                        return idd
-        return None
-    def validate_links(
-        self,
-        document,  # type: Union[CommentedMap, CommentedSeq, Text, None]
-        base_url,  # type: Text
-        all_doc_ids,  # type: Dict[Text, Text]
-        strict_foreign_properties=False,  # type: bool
-    ):  # type: (...) -> None
-        docid = self.getid(document)
-        if not docid:
-            docid = base_url
-        errors = []  # type: List[SchemaSaladException]
-        iterator = None  # type: Any
-        if isinstance(document, MutableSequence):
-            iterator = enumerate(document)
-        elif isinstance(document, MutableMapping):
-            for d in self.url_fields:
-                sl = SourceLine(document, d, Text)
-                try:
-                    if d in document and d not in self.identity_links:
-                        document[d] = self.validate_link(
-                            d, document[d], docid, all_doc_ids
-                        )
-                except SchemaSaladException as v:
-                    v = v.with_sourceline(sl)
-                    if d == "$schemas" or (
-                        d in self.foreign_properties and not strict_foreign_properties
-                    ):
-                        _logger.warning(v)
-                    else:
-                        errors.append(v)
-            # TODO: Validator should local scope only in which
-            # duplicated keys are prohibited.
-            # See also https://github.com/common-workflow-language/common-workflow-language/issues/734  # noqa: B950
-            # In the future, it should raise
-            # ValidationException instead of _logger.warn
-            try:
-                for (
-                    identifier
-                ) in self.identifiers:  # validate that each id is defined uniquely
-                    if identifier in document:
-                        sl = SourceLine(document, identifier, Text)
-                        if (
-                            document[identifier] in all_doc_ids
-                            and sl.makeLead() != all_doc_ids[document[identifier]]
-                        ):
-                            _logger.warning(
-                                "%s object %s `%s` previously defined",
-                                all_doc_ids[document[identifier]],
-                                identifier,
-                                relname(document[identifier]),
-                            )
-                        else:
-                            all_doc_ids[document[identifier]] = sl.makeLead()
-                            break
-            except ValidationException as v:
-                errors.append(v.with_sourceline(sl))
-            if hasattr(document, "iteritems"):
-                iterator = iteritems(document)
-            else:
-                iterator = list(document.items())
-        else:
-            return
-        for key, val in iterator:
-            sl = SourceLine(document, key, Text)
-            try:
-                self.validate_links(
-                    val,
-                    docid,
-                    all_doc_ids,
-                    strict_foreign_properties=strict_foreign_properties,
-                )
-            except ValidationException as v:
-                if key in self.nolinkcheck or (
-                    isinstance(key, string_types) and ":" in key
-                ):
-                    _logger.warning(v)
-                else:
-                    docid2 = self.getid(val)
-                    if docid2 is not None:
-                        errors.append(
-                            ValidationException(
-                                "checking object `{}`".format(relname(docid2)), sl, [v]
-                            )
-                        )
-                    else:
-                        if isinstance(key, string_types):
-                            errors.append(
-                                ValidationException(
-                                    "checking field `{}`".format(key), sl, [v]
-                                )
-                            )
-                        else:
-                            errors.append(ValidationException("checking item", sl, [v]))
-        if bool(errors):
-            if len(errors) > 1:
-                raise ValidationException("", None, errors)
-            else:
-                raise errors[0]
-        return
-D = TypeVar("D", CommentedMap, ContextType)
-def _copy_dict_without_key(from_dict, filtered_key):
-    # type: (D, Any) -> D
-    new_dict = CommentedMap(from_dict.items())
-    if filtered_key in new_dict:
-        del new_dict[filtered_key]
-    if isinstance(from_dict, CommentedMap):
-        new_dict.lc.data = copy.copy(from_dict.lc.data)
-        new_dict.lc.filename = from_dict.lc.filename
-    return new_dict