view env/lib/python3.7/site-packages/schema_salad/ref_resolver.py @ 4:79f47841a781 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:47:39 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

from __future__ import absolute_import

import copy
import logging
import os
import re
import sys
import xml.sax
from io import open
from typing import Callable  # pylint: disable=unused-import
from typing import (
    Any,
    Dict,
    Iterable,
    List,
    MutableMapping,
    MutableSequence,
    Optional,
    Set,
    Tuple,
    TypeVar,
    Union,
    cast,
)

import requests
from cachecontrol.caches import FileCache
from cachecontrol.wrapper import CacheControl
from future.utils import raise_from
from rdflib.graph import Graph
from rdflib.namespace import OWL, RDF, RDFS
from rdflib.plugins.parsers.notation3 import BadSyntax
from six import StringIO, iteritems, string_types
from six.moves import range, urllib
from typing_extensions import Text  # pylint: disable=unused-import

from ruamel import yaml
from ruamel.yaml.comments import CommentedMap, CommentedSeq, LineCol

from .exceptions import ValidationException, SchemaSaladException
from .sourceline import SourceLine, add_lc_filename, relname
from .utils import aslist, onWindows

# move to a regular typing import when Python 3.3-3.6 is no longer supported


_logger = logging.getLogger("salad")
ContextType = Dict[Text, Union[Dict[Text, Any], Text, Iterable[Text]]]
DocumentType = TypeVar("DocumentType", CommentedSeq, CommentedMap)
DocumentOrStrType = TypeVar("DocumentOrStrType", CommentedSeq, CommentedMap, Text)

_re_drive = re.compile(r"/([a-zA-Z]):")


def file_uri(path, split_frag=False):  # type: (str, bool) -> str
    if path.startswith("file://"):
        return path
    if split_frag:
        pathsp = path.split("#", 2)
        frag = "#" + urllib.parse.quote(str(pathsp[1])) if len(pathsp) == 2 else ""
        urlpath = urllib.request.pathname2url(str(pathsp[0]))
    else:
        urlpath = urllib.request.pathname2url(path)
        frag = ""
    if urlpath.startswith("//"):
        return "file:{}{}".format(urlpath, frag)
    return "file://{}{}".format(urlpath, frag)


def uri_file_path(url):  # type: (str) -> str
    split = urllib.parse.urlsplit(url)
    if split.scheme == "file":
        return urllib.request.url2pathname(str(split.path)) + (
            "#" + urllib.parse.unquote(str(split.fragment))
            if bool(split.fragment)
            else ""
        )
    raise ValidationException("Not a file URI: {}".format(url))


def to_validation_exception(
    e,
):  # type: (yaml.error.MarkedYAMLError) -> ValidationException
    fname_regex = re.compile(r"^file://" + re.escape(os.getcwd()) + "/")

    exc = ValidationException(e.problem)
    mark = e.problem_mark
    exc.file = re.sub(fname_regex, "", mark.name)
    exc.start = (mark.line + 1, mark.column + 1)
    exc.end = None

    if e.context:
        parent = ValidationException(e.context)
        mark = e.context_mark
        parent.file = re.sub(fname_regex, "", mark.name)
        parent.start = (mark.line + 1, mark.column + 1)
        parent.end = None
        parent.children = [exc]
        return parent
    else:
        return exc


class NormDict(CommentedMap):
    """A Dict where all keys are normalized using the provided function."""

    def __init__(self, normalize=Text):  # type: (Callable[[Text], Text]) -> None
        super(NormDict, self).__init__()
        self.normalize = normalize

    def __getitem__(self, key):  # type: (Any) -> Any
        return super(NormDict, self).__getitem__(self.normalize(key))

    def __setitem__(self, key, value):  # type: (Any, Any) -> Any
        return super(NormDict, self).__setitem__(self.normalize(key), value)

    def __delitem__(self, key):  # type: (Any) -> Any
        return super(NormDict, self).__delitem__(self.normalize(key))

    def __contains__(self, key):  # type: (Any) -> Any
        return super(NormDict, self).__contains__(self.normalize(key))


def merge_properties(a, b):  # type: (List[Any], List[Any]) -> Dict[Any, Any]
    c = {}
    for i in a:
        if i not in b:
            c[i] = a[i]
    for i in b:
        if i not in a:
            c[i] = b[i]
    for i in a:
        if i in b:
            c[i] = aslist(a[i]) + aslist(b[i])  # type: ignore

    return c


def SubLoader(loader):  # type: (Loader) -> Loader
    return Loader(
        loader.ctx,
        schemagraph=loader.graph,
        foreign_properties=loader.foreign_properties,
        idx=loader.idx,
        cache=loader.cache,
        fetcher_constructor=loader.fetcher_constructor,
        skip_schemas=loader.skip_schemas,
        url_fields=loader.url_fields,
        allow_attachments=loader.allow_attachments,
    )


class Fetcher(object):
    def fetch_text(self, url):  # type: (Text) -> Text
        raise NotImplementedError()

    def check_exists(self, url):  # type: (Text) -> bool
        raise NotImplementedError()

    def urljoin(self, base_url, url):  # type: (Text, Text) -> Text
        raise NotImplementedError()

    schemes = [u"file", u"http", u"https", u"mailto"]

    def supported_schemes(self):  # type: () -> List[Text]
        return self.schemes


class DefaultFetcher(Fetcher):
    def __init__(
        self,
        cache,  # type: Dict[Text, Union[Text, bool]]
        session,  # type: Optional[requests.sessions.Session]
    ):  # type: (...) -> None
        self.cache = cache
        self.session = session

    def fetch_text(self, url):
        # type: (Text) -> Text
        if url in self.cache and self.cache[url] is not True:
            # treat "True" as a placeholder that indicates something exists but
            # not necessarily what its contents is.
            return cast(Text, self.cache[url])

        split = urllib.parse.urlsplit(url)
        scheme, path = split.scheme, split.path

        if scheme in [u"http", u"https"] and self.session is not None:
            try:
                resp = self.session.get(url)
                resp.raise_for_status()
            except Exception as e:
                raise_from(
                    ValidationException("Error fetching {}: {}".format(url, e)), e
                )
            return resp.text
        if scheme == "file":
            try:
                # On Windows, url.path will be /drive:/path ; on Unix systems,
                # /path. As we want drive:/path instead of /drive:/path on Windows,
                # remove the leading /.
                if os.path.isabs(
                    path[1:]
                ):  # checking if pathis valid after removing front / or not
                    path = path[1:]
                with open(
                    urllib.request.url2pathname(str(path)), encoding="utf-8"
                ) as fp:
                    return Text(fp.read())

            except (OSError, IOError) as err:
                if err.filename == path:
                    raise_from(ValidationException(Text(err)), err)
                else:
                    raise_from(
                        ValidationException("Error reading {}: {}".format(url, err)),
                        err,
                    )
        raise ValidationException("Unsupported scheme in url: {}".format(url))

    def check_exists(self, url):  # type: (Text) -> bool
        if url in self.cache:
            return True

        split = urllib.parse.urlsplit(url)
        scheme, path = split.scheme, split.path

        if scheme in [u"http", u"https"] and self.session is not None:
            try:
                resp = self.session.head(url)
                resp.raise_for_status()
            except Exception:
                return False
            self.cache[url] = True
            return True
        if scheme == "file":
            return os.path.exists(urllib.request.url2pathname(str(path)))
        if scheme == "mailto":
            return True
        raise ValidationException("Unsupported scheme in url: {}".format(url))

    def urljoin(self, base_url, url):  # type: (Text, Text) -> Text
        if url.startswith("_:"):
            return url

        basesplit = urllib.parse.urlsplit(base_url)
        split = urllib.parse.urlsplit(url)
        if basesplit.scheme and basesplit.scheme != "file" and split.scheme == "file":
            raise ValidationException(
                "Not resolving potential remote exploit {} from base {}".format(
                    url, base_url
                )
            )

        if sys.platform == "win32":
            if base_url == url:
                return url
            basesplit = urllib.parse.urlsplit(base_url)
            # note that below might split
            # "C:" with "C" as URI scheme
            split = urllib.parse.urlsplit(url)

            has_drive = split.scheme and len(split.scheme) == 1

            if basesplit.scheme == "file":
                # Special handling of relative file references on Windows
                # as urllib seems to not be quite up to the job

                # netloc MIGHT appear in equivalents of UNC Strings
                # \\server1.example.com\path as
                # file:///server1.example.com/path
                # https://tools.ietf.org/html/rfc8089#appendix-E.3.2
                # (TODO: test this)
                netloc = split.netloc or basesplit.netloc

                # Check if url is a local path like "C:/Users/fred"
                # or actually an absolute URI like http://example.com/fred
                if has_drive:
                    # Assume split.scheme is actually a drive, e.g. "C:"
                    # so we'll recombine into a path
                    path_with_drive = urllib.parse.urlunsplit(
                        (split.scheme, "", split.path, "", "")
                    )
                    # Compose new file:/// URI with path_with_drive
                    # .. carrying over any #fragment (?query just in case..)
                    return urllib.parse.urlunsplit(
                        ("file", netloc, path_with_drive, split.query, split.fragment)
                    )
                if (
                    not split.scheme
                    and not netloc
                    and split.path
                    and split.path.startswith("/")
                ):
                    # Relative - but does it have a drive?
                    base_drive = _re_drive.match(basesplit.path)
                    drive = _re_drive.match(split.path)
                    if base_drive and not drive:
                        # Keep drive letter from base_url
                        # https://tools.ietf.org/html/rfc8089#appendix-E.2.1
                        # e.g. urljoin("file:///D:/bar/a.txt", "/foo/b.txt")
                        #          == file:///D:/foo/b.txt
                        path_with_drive = "/{}:{}".format(
                            base_drive.group(1), split.path
                        )
                        return urllib.parse.urlunsplit(
                            (
                                "file",
                                netloc,
                                path_with_drive,
                                split.query,
                                split.fragment,
                            )
                        )

                # else: fall-through to resolve as relative URI
            elif has_drive:
                # Base is http://something but url is C:/something - which urllib
                # would wrongly resolve as an absolute path that could later be used
                # to access local files
                raise ValidationException(
                    "Not resolving potential remote exploit {} from base {}".format(
                        url, base_url
                    )
                )

        return urllib.parse.urljoin(base_url, url)


idx_type = Dict[Text, Union[CommentedMap, CommentedSeq, Text, None]]
fetcher_sig = Callable[
    [Dict[Text, Union[Text, bool]], requests.sessions.Session], Fetcher
]
attachements_sig = Callable[[Union[CommentedMap, CommentedSeq]], bool]


class Loader(object):
    def __init__(
        self,
        ctx,  # type: ContextType
        schemagraph=None,  # type: Optional[Graph]
        foreign_properties=None,  # type: Optional[Set[Text]]
        idx=None,  # type: Optional[idx_type]
        cache=None,  # type: Optional[Dict[Text, Any]]
        session=None,  # type: Optional[requests.sessions.Session]
        fetcher_constructor=None,  # type: Optional[fetcher_sig]
        skip_schemas=None,  # type: Optional[bool]
        url_fields=None,  # type: Optional[Set[Text]]
        allow_attachments=None,  # type: Optional[attachements_sig]
    ):
        # type: (...) -> None

        if idx is not None:
            self.idx = idx
        else:
            self.idx = NormDict(lambda url: urllib.parse.urlsplit(url).geturl())

        self.ctx = {}  # type: ContextType
        if schemagraph is not None:
            self.graph = schemagraph
        else:
            self.graph = Graph()

        if foreign_properties is not None:
            self.foreign_properties = set(foreign_properties)
        else:
            self.foreign_properties = set()

        if cache is not None:
            self.cache = cache
        else:
            self.cache = {}

        if skip_schemas is not None:
            self.skip_schemas = skip_schemas
        else:
            self.skip_schemas = False

        if session is None:
            if "HOME" in os.environ:
                self.session = CacheControl(
                    requests.Session(),
                    cache=FileCache(
                        os.path.join(os.environ["HOME"], ".cache", "salad")
                    ),
                )
            elif "TMP" in os.environ:
                self.session = CacheControl(
                    requests.Session(),
                    cache=FileCache(os.path.join(os.environ["TMP"], ".cache", "salad")),
                )
            else:
                self.session = CacheControl(
                    requests.Session(),
                    cache=FileCache(os.path.join("/tmp", ".cache", "salad")),
                )
        else:
            self.session = session

        if fetcher_constructor is not None:
            self.fetcher_constructor = fetcher_constructor
        else:
            self.fetcher_constructor = DefaultFetcher
        self.fetcher = self.fetcher_constructor(self.cache, self.session)
        self.fetch_text = self.fetcher.fetch_text
        self.check_exists = self.fetcher.check_exists

        if url_fields is None:
            self.url_fields = set()  # type: Set[Text]
        else:
            self.url_fields = set(url_fields)

        self.scoped_ref_fields = {}  # type: Dict[Text, int]
        self.vocab_fields = set()  # type: Set[Text]
        self.identifiers = []  # type: List[Text]
        self.identity_links = set()  # type: Set[Text]
        self.standalone = None  # type: Optional[Set[Text]]
        self.nolinkcheck = set()  # type: Set[Text]
        self.vocab = {}  # type: Dict[Text, Text]
        self.rvocab = {}  # type: Dict[Text, Text]
        self.idmap = {}  # type: Dict[Text, Any]
        self.mapPredicate = {}  # type: Dict[Text, Text]
        self.type_dsl_fields = set()  # type: Set[Text]
        self.subscopes = {}  # type: Dict[Text, Text]
        self.secondaryFile_dsl_fields = set()  # type: Set[Text]
        self.allow_attachments = allow_attachments

        self.add_context(ctx)

    def expand_url(
        self,
        url,  # type: Text
        base_url,  # type: Text
        scoped_id=False,  # type: bool
        vocab_term=False,  # type: bool
        scoped_ref=None,  # type: Optional[int]
    ):
        # type: (...) -> Text
        if url in (u"@id", u"@type") or url is None:
            return url

        if vocab_term and url in self.vocab:
            return url

        if url.startswith("_:"):
            return url

        if bool(self.vocab) and u":" in url:
            prefix = url.split(u":")[0]
            if prefix in self.vocab:
                url = self.vocab[prefix] + url[len(prefix) + 1 :]
            elif prefix not in self.fetcher.supported_schemes():
                _logger.warning(
                    "URI prefix '%s' of '%s' not recognized, are you missing a "
                    "$namespaces section?",
                    prefix,
                    url,
                )

        split = urllib.parse.urlsplit(url)

        if (
            (bool(split.scheme) and split.scheme in [u"http", u"https", u"file"])
            or url.startswith(u"$(")
            or url.startswith(u"${")
        ):
            pass
        elif scoped_id and not bool(split.fragment):
            splitbase = urllib.parse.urlsplit(base_url)
            frg = u""
            if bool(splitbase.fragment):
                frg = splitbase.fragment + u"/" + split.path
            else:
                frg = split.path
            pt = splitbase.path if splitbase.path != "" else "/"
            url = urllib.parse.urlunsplit(
                (splitbase.scheme, splitbase.netloc, pt, splitbase.query, frg)
            )
        elif scoped_ref is not None and not split.fragment:
            pass
        else:
            url = self.fetcher.urljoin(base_url, url)

        if vocab_term and url in self.rvocab:
            return self.rvocab[url]
        else:
            return url

    def _add_properties(self, s):  # type: (Text) -> None
        for _, _, rng in self.graph.triples((s, RDFS.range, None)):
            literal = (
                Text(rng).startswith(u"http://www.w3.org/2001/XMLSchema#")
                and not Text(rng) == u"http://www.w3.org/2001/XMLSchema#anyURI"
            ) or Text(rng) == u"http://www.w3.org/2000/01/rdf-schema#Literal"
            if not literal:
                self.url_fields.add(Text(s))
        self.foreign_properties.add(Text(s))

    def add_namespaces(self, ns):  # type: (Dict[Text, Text]) -> None
        self.vocab.update(ns)

    def add_schemas(self, ns, base_url):
        # type: (Union[List[Text], Text], Text) -> None
        if self.skip_schemas:
            return
        for sch in aslist(ns):
            try:
                fetchurl = self.fetcher.urljoin(base_url, sch)
                if fetchurl not in self.cache or self.cache[fetchurl] is True:
                    _logger.debug("Getting external schema %s", fetchurl)
                    content = self.fetch_text(fetchurl)
                    self.cache[fetchurl] = Graph()
                    for fmt in ["xml", "turtle", "rdfa"]:
                        try:
                            self.cache[fetchurl].parse(
                                data=content, format=fmt, publicID=str(fetchurl)
                            )
                            self.graph += self.cache[fetchurl]
                            break
                        except xml.sax.SAXParseException:
                            pass
                        except TypeError:
                            pass
                        except BadSyntax:
                            pass
            except Exception as e:
                _logger.warning(
                    "Could not load extension schema %s: %s", fetchurl, Text(e)
                )

        for s, _, _ in self.graph.triples((None, RDF.type, RDF.Property)):
            self._add_properties(s)
        for s, _, o in self.graph.triples((None, RDFS.subPropertyOf, None)):
            self._add_properties(s)
            self._add_properties(o)
        for s, _, _ in self.graph.triples((None, RDFS.range, None)):
            self._add_properties(s)
        for s, _, _ in self.graph.triples((None, RDF.type, OWL.ObjectProperty)):
            self._add_properties(s)

        for s, _, _ in self.graph.triples((None, None, None)):
            self.idx[Text(s)] = None

    def add_context(self, newcontext, baseuri=""):
        # type: (ContextType, Text) -> None
        if bool(self.vocab):
            raise ValidationException("Refreshing context that already has stuff in it")

        self.url_fields = set(("$schemas",))
        self.scoped_ref_fields = {}
        self.vocab_fields = set()
        self.identifiers = []
        self.identity_links = set()
        self.standalone = set()
        self.nolinkcheck = set()
        self.idmap = {}
        self.mapPredicate = {}
        self.vocab = {}
        self.rvocab = {}
        self.type_dsl_fields = set()
        self.secondaryFile_dsl_fields = set()
        self.subscopes = {}

        self.ctx.update(_copy_dict_without_key(newcontext, u"@context"))

        _logger.debug("ctx is %s", self.ctx)

        for key, value in self.ctx.items():
            if value == u"@id":
                self.identifiers.append(key)
                self.identity_links.add(key)
            elif isinstance(value, MutableMapping):
                if value.get(u"@type") == u"@id":
                    self.url_fields.add(key)
                    if u"refScope" in value:
                        self.scoped_ref_fields[key] = value[u"refScope"]
                    if value.get(u"identity", False):
                        self.identity_links.add(key)

                if value.get(u"@type") == u"@vocab":
                    self.url_fields.add(key)
                    self.vocab_fields.add(key)
                    if u"refScope" in value:
                        self.scoped_ref_fields[key] = value[u"refScope"]
                    if value.get(u"typeDSL"):
                        self.type_dsl_fields.add(key)

                if value.get(u"secondaryFilesDSL"):
                    self.secondaryFile_dsl_fields.add(key)

                if value.get(u"noLinkCheck"):
                    self.nolinkcheck.add(key)

                if value.get(u"mapSubject"):
                    self.idmap[key] = value[u"mapSubject"]

                if value.get(u"mapPredicate"):
                    self.mapPredicate[key] = value[u"mapPredicate"]

                if value.get(u"@id"):
                    self.vocab[key] = value[u"@id"]

                if value.get(u"subscope"):
                    self.subscopes[key] = value[u"subscope"]

            elif isinstance(value, string_types):
                self.vocab[key] = value

        for k, v in self.vocab.items():
            self.rvocab[self.expand_url(v, u"", scoped_id=False)] = k

        self.identifiers.sort()

        _logger.debug("identifiers is %s", self.identifiers)
        _logger.debug("identity_links is %s", self.identity_links)
        _logger.debug("url_fields is %s", self.url_fields)
        _logger.debug("vocab_fields is %s", self.vocab_fields)
        _logger.debug("vocab is %s", self.vocab)

    resolved_ref_type = Tuple[
        Optional[Union[CommentedMap, CommentedSeq, Text]], CommentedMap
    ]

    def resolve_ref(
        self,
        ref,  # type: Union[CommentedMap, CommentedSeq, Text]
        base_url=None,  # type: Optional[Text]
        checklinks=True,  # type: bool
        strict_foreign_properties=False,  # type: bool
    ):
        # type: (...) -> Loader.resolved_ref_type

        lref = ref  # type: Union[CommentedMap, CommentedSeq, Text, None]
        obj = None  # type: Optional[CommentedMap]
        resolved_obj = None  # type: Optional[Union[CommentedMap, CommentedSeq, Text]]
        inc = False
        mixin = None  # type: Optional[MutableMapping[Text, Any]]

        if not base_url:
            base_url = file_uri(os.getcwd()) + "/"

        sl = SourceLine(obj, None)
        # If `ref` is a dict, look for special directives.
        if isinstance(lref, CommentedMap):
            obj = lref
            if "$import" in obj:
                sl = SourceLine(obj, "$import")
                if len(obj) == 1:
                    lref = obj[u"$import"]
                    obj = None
                else:
                    raise ValidationException(
                        u"'$import' must be the only field in {}".format(obj), sl
                    )
            elif "$include" in obj:
                sl = SourceLine(obj, "$include")
                if len(obj) == 1:
                    lref = obj[u"$include"]
                    inc = True
                    obj = None
                else:
                    raise ValidationException(
                        u"'$include' must be the only field in {}".format(obj), sl
                    )
            elif "$mixin" in obj:
                sl = SourceLine(obj, "$mixin")
                lref = obj[u"$mixin"]
                mixin = obj
                obj = None
            else:
                lref = None
                for identifier in self.identifiers:
                    if identifier in obj:
                        lref = obj[identifier]
                        break
                if not lref:
                    raise ValidationException(
                        u"Object `{}` does not have identifier field in {}".format(
                            obj, self.identifiers
                        ),
                        sl,
                    )

        if not isinstance(lref, string_types):
            raise ValidationException(
                u"Expected CommentedMap or string, got {}: `{}`".format(
                    type(lref), lref
                )
            )

        if isinstance(lref, string_types) and os.sep == "\\":
            # Convert Windows path separator in ref
            lref = lref.replace("\\", "/")

        url = self.expand_url(lref, base_url, scoped_id=(obj is not None))
        # Has this reference been loaded already?
        if url in self.idx and (not mixin):
            resolved_obj = self.idx[url]
            if isinstance(resolved_obj, MutableMapping):
                metadata = self.idx.get(urllib.parse.urldefrag(url)[0], CommentedMap())
                if isinstance(metadata, MutableMapping):
                    if u"$graph" in resolved_obj:
                        metadata = _copy_dict_without_key(resolved_obj, u"$graph")
                        return resolved_obj[u"$graph"], metadata
                    else:
                        return resolved_obj, metadata
                else:
                    raise ValidationException(
                        u"Expected CommentedMap, got {}: `{}`".format(
                            type(metadata), metadata
                        )
                    )
            elif isinstance(resolved_obj, MutableSequence):
                metadata = self.idx.get(urllib.parse.urldefrag(url)[0], CommentedMap())
                if isinstance(metadata, MutableMapping):
                    return resolved_obj, metadata
                else:
                    return resolved_obj, CommentedMap()
            elif isinstance(resolved_obj, string_types):
                return resolved_obj, CommentedMap()
            else:
                raise ValidationException(
                    u"Expected MutableMapping or MutableSequence, got {}: `{}`".format(
                        type(resolved_obj), resolved_obj
                    )
                )

        # "$include" directive means load raw text
        if inc:
            return self.fetch_text(url), CommentedMap()

        doc = None
        if isinstance(obj, MutableMapping):
            for identifier in self.identifiers:
                obj[identifier] = url
            doc_url = url
        else:
            # Load structured document
            doc_url, frg = urllib.parse.urldefrag(url)
            if doc_url in self.idx and (not mixin):
                # If the base document is in the index, it was already loaded,
                # so if we didn't find the reference earlier then it must not
                # exist.
                raise ValidationException(
                    u"Reference `#{}` not found in file `{}`.".format(frg, doc_url), sl
                )
            doc = self.fetch(doc_url, inject_ids=(not mixin))

        # Recursively expand urls and resolve directives
        if bool(mixin):
            doc = copy.deepcopy(doc)
            if doc is not None and mixin is not None:
                doc.update(mixin)
                del doc["$mixin"]
            resolved_obj, metadata = self.resolve_all(
                doc,
                base_url,
                file_base=doc_url,
                checklinks=checklinks,
                strict_foreign_properties=strict_foreign_properties,
            )
        else:
            if doc:
                resolve_target = doc
            else:
                resolve_target = obj
            resolved_obj, metadata = self.resolve_all(
                resolve_target,
                doc_url,
                checklinks=checklinks,
                strict_foreign_properties=strict_foreign_properties,
            )

        # Requested reference should be in the index now, otherwise it's a bad
        # reference
        if not bool(mixin):
            if url in self.idx:
                resolved_obj = self.idx[url]
            else:
                raise ValidationException(
                    "Reference `{}` is not in the index. Index contains: {}".format(
                        url, ", ".join(self.idx)
                    )
                )

        if isinstance(resolved_obj, CommentedMap):
            if u"$graph" in resolved_obj:
                metadata = _copy_dict_without_key(resolved_obj, u"$graph")
                return resolved_obj[u"$graph"], metadata
            else:
                return resolved_obj, metadata
        else:
            return resolved_obj, metadata

    def _resolve_idmap(
        self,
        document,  # type: CommentedMap
        loader,  # type: Loader
    ):
        # type: (...) -> None
        # Convert fields with mapSubject into lists
        # use mapPredicate if the mapped value isn't a dict.
        for idmapField in loader.idmap:
            if idmapField in document:
                idmapFieldValue = document[idmapField]
                if (
                    isinstance(idmapFieldValue, MutableMapping)
                    and "$import" not in idmapFieldValue
                    and "$include" not in idmapFieldValue
                ):
                    ls = CommentedSeq()
                    for k in sorted(idmapFieldValue.keys()):
                        val = idmapFieldValue[k]
                        v = None  # type: Optional[CommentedMap]
                        if not isinstance(val, CommentedMap):
                            if idmapField in loader.mapPredicate:
                                v = CommentedMap(
                                    ((loader.mapPredicate[idmapField], val),)
                                )
                                v.lc.add_kv_line_col(
                                    loader.mapPredicate[idmapField],
                                    document[idmapField].lc.data[k],
                                )
                                v.lc.filename = document.lc.filename
                            else:
                                raise ValidationException(
                                    "mapSubject '{}' value '{}' is not a dict "
                                    "and does not have a mapPredicate.".format(k, v)
                                )
                        else:
                            v = val

                        v[loader.idmap[idmapField]] = k
                        v.lc.add_kv_line_col(
                            loader.idmap[idmapField], document[idmapField].lc.data[k]
                        )
                        v.lc.filename = document.lc.filename

                        ls.lc.add_kv_line_col(len(ls), document[idmapField].lc.data[k])

                        ls.lc.filename = document.lc.filename
                        ls.append(v)

                    document[idmapField] = ls

    typeDSLregex = re.compile(Text(r"^([^[?]+)(\[\])?(\?)?$"))

    def _type_dsl(
        self,
        t,  # type: Union[Text, Dict[Text, Text], List[Text]]
        lc,  # type: LineCol
        filename,  # type: Text
    ):  # type: (...) -> Union[Text, Dict[Text, Text], List[Text]]

        if not isinstance(t, string_types):
            return t

        m = Loader.typeDSLregex.match(t)
        if not m:
            return t
        first = m.group(1)
        second = third = None
        if bool(m.group(2)):
            second = CommentedMap((("type", "array"), ("items", first)))
            second.lc.add_kv_line_col("type", lc)
            second.lc.add_kv_line_col("items", lc)
            second.lc.filename = filename
        if bool(m.group(3)):
            third = CommentedSeq([u"null", second or first])
            third.lc.add_kv_line_col(0, lc)
            third.lc.add_kv_line_col(1, lc)
            third.lc.filename = filename
        return third or second or first

    def _secondaryFile_dsl(
        self,
        t,  # type: Union[Text, Dict[Text, Text], List[Text]]
        lc,  # type: LineCol
        filename,  # type: Text
    ):  # type: (...) -> Union[Text, Dict[Text, Text], List[Text]]

        if not isinstance(t, string_types):
            return t
        pat = t
        req = None
        if t.endswith("?"):
            pat = t[0:-1]
            req = False

        second = CommentedMap((("pattern", pat), ("required", req)))
        second.lc.add_kv_line_col("pattern", lc)
        second.lc.add_kv_line_col("required", lc)
        second.lc.filename = filename
        return second

    def _apply_dsl(
        self,
        datum,  # type: Union[Text, Dict[Any, Any], List[Any]]
        d,  # type: Text
        loader,  # type: Loader
        lc,  # type: LineCol
        filename,  # type: Text
    ):
        # type: (...) -> Union[Text, Dict[Any, Any], List[Any]]
        if d in loader.type_dsl_fields:
            return self._type_dsl(datum, lc, filename)
        elif d in loader.secondaryFile_dsl_fields:
            return self._secondaryFile_dsl(datum, lc, filename)
        else:
            return datum

    def _resolve_dsl(
        self,
        document,  # type: CommentedMap
        loader,  # type: Loader
    ):
        # type: (...) -> None
        fields = list(loader.type_dsl_fields)
        fields.extend(loader.secondaryFile_dsl_fields)

        for d in fields:
            if d in document:
                datum2 = datum = document[d]
                if isinstance(datum, string_types):
                    datum2 = self._apply_dsl(
                        datum, d, loader, document.lc.data[d], document.lc.filename
                    )
                elif isinstance(datum, CommentedSeq):
                    datum2 = CommentedSeq()
                    for n, t in enumerate(datum):
                        if datum.lc and datum.lc.data:
                            datum2.lc.add_kv_line_col(len(datum2), datum.lc.data[n])
                            datum2.append(
                                self._apply_dsl(
                                    t, d, loader, datum.lc.data[n], document.lc.filename
                                )
                            )
                        else:
                            datum2.append(self._apply_dsl(t, d, loader, LineCol(), ""))
                if isinstance(datum2, CommentedSeq):
                    datum3 = CommentedSeq()
                    seen = []  # type: List[Text]
                    for i, item in enumerate(datum2):
                        if isinstance(item, CommentedSeq):
                            for j, v in enumerate(item):
                                if v not in seen:
                                    datum3.lc.add_kv_line_col(
                                        len(datum3), item.lc.data[j]
                                    )
                                    datum3.append(v)
                                    seen.append(v)
                        else:
                            if item not in seen:
                                if datum2.lc and datum2.lc.data:
                                    datum3.lc.add_kv_line_col(
                                        len(datum3), datum2.lc.data[i]
                                    )
                                datum3.append(item)
                                seen.append(item)
                    document[d] = datum3
                else:
                    document[d] = datum2

    def _resolve_identifier(self, document, loader, base_url):
        # type: (CommentedMap, Loader, Text) -> Text
        # Expand identifier field (usually 'id') to resolve scope
        for identifer in loader.identifiers:
            if identifer in document:
                if isinstance(document[identifer], string_types):
                    document[identifer] = loader.expand_url(
                        document[identifer], base_url, scoped_id=True
                    )
                    if document[identifer] not in loader.idx or isinstance(
                        loader.idx[document[identifer]], string_types
                    ):
                        loader.idx[document[identifer]] = document
                    base_url = document[identifer]
                else:
                    raise ValidationException(
                        "identifier field '{}' must be a string".format(
                            document[identifer]
                        )
                    )
        return base_url

    def _resolve_identity(self, document, loader, base_url):
        # type: (Dict[Text, List[Text]], Loader, Text) -> None
        # Resolve scope for identity fields (fields where the value is the
        # identity of a standalone node, such as enum symbols)
        for identifer in loader.identity_links:
            if identifer in document and isinstance(
                document[identifer], MutableSequence
            ):
                for n, _v in enumerate(document[identifer]):
                    if isinstance(document[identifer][n], string_types):
                        document[identifer][n] = loader.expand_url(
                            document[identifer][n], base_url, scoped_id=True
                        )
                        if document[identifer][n] not in loader.idx:
                            loader.idx[document[identifer][n]] = document[identifer][n]

    def _normalize_fields(self, document, loader):
        # type: (CommentedMap, Loader) -> None
        # Normalize fields which are prefixed or full URIn to vocabulary terms
        for d in list(document.keys()):
            d2 = loader.expand_url(d, u"", scoped_id=False, vocab_term=True)
            if d != d2:
                document[d2] = document[d]
                document.lc.add_kv_line_col(d2, document.lc.data[d])
                del document[d]

    def _resolve_uris(
        self,
        document,  # type: Dict[Text, Union[Text, List[Text]]]
        loader,  # type: Loader
        base_url,  # type: Text
    ):
        # type: (...) -> None
        # Resolve remaining URLs based on document base
        for d in loader.url_fields:
            if d in document:
                datum = document[d]
                if isinstance(datum, string_types):
                    document[d] = loader.expand_url(
                        datum,
                        base_url,
                        scoped_id=False,
                        vocab_term=(d in loader.vocab_fields),
                        scoped_ref=loader.scoped_ref_fields.get(d),
                    )
                elif isinstance(datum, MutableSequence):
                    for i, url in enumerate(datum):
                        if isinstance(url, string_types):
                            datum[i] = loader.expand_url(
                                url,
                                base_url,
                                scoped_id=False,
                                vocab_term=(d in loader.vocab_fields),
                                scoped_ref=loader.scoped_ref_fields.get(d),
                            )

    def resolve_all(
        self,
        document,  # type: Union[CommentedMap, CommentedSeq]
        base_url,  # type: Text
        file_base=None,  # type: Optional[Text]
        checklinks=True,  # type: bool
        strict_foreign_properties=False,  # type: bool
    ):
        # type: (...) -> Loader.resolved_ref_type
        loader = self
        metadata = CommentedMap()  # type: CommentedMap
        if file_base is None:
            file_base = base_url

        if isinstance(document, CommentedMap):
            # Handle $import and $include
            if u"$import" in document or u"$include" in document:
                return self.resolve_ref(
                    document,
                    base_url=file_base,
                    checklinks=checklinks,
                    strict_foreign_properties=strict_foreign_properties,
                )
            elif u"$mixin" in document:
                return self.resolve_ref(
                    document,
                    base_url=base_url,
                    checklinks=checklinks,
                    strict_foreign_properties=strict_foreign_properties,
                )
        elif isinstance(document, CommentedSeq):
            pass
        elif isinstance(document, (list, dict)):
            raise ValidationException(
                "Expected CommentedMap or CommentedSeq, got {}: `{}`".format(
                    type(document), document
                )
            )
        else:
            return (document, metadata)

        newctx = None  # type: Optional[Loader]
        if isinstance(document, CommentedMap):
            # Handle $base, $profile, $namespaces, $schemas and $graph
            if u"$base" in document:
                base_url = document[u"$base"]

            if u"$profile" in document:
                if newctx is None:
                    newctx = SubLoader(self)
                newctx.add_namespaces(document.get(u"$namespaces", CommentedMap()))
                newctx.add_schemas(document.get(u"$schemas", []), document[u"$profile"])

            if u"$namespaces" in document:
                if newctx is None:
                    newctx = SubLoader(self)
                newctx.add_namespaces(document[u"$namespaces"])

            if u"$schemas" in document:
                if newctx is None:
                    newctx = SubLoader(self)
                newctx.add_schemas(document[u"$schemas"], file_base)

            if newctx is not None:
                loader = newctx

            for identifer in loader.identity_links:
                if identifer in document:
                    if isinstance(document[identifer], string_types):
                        document[identifer] = loader.expand_url(
                            document[identifer], base_url, scoped_id=True
                        )
                        loader.idx[document[identifer]] = document

            metadata = document
            if u"$graph" in document:
                document = document[u"$graph"]

        if isinstance(document, CommentedMap):
            self._normalize_fields(document, loader)
            self._resolve_idmap(document, loader)
            self._resolve_dsl(document, loader)
            base_url = self._resolve_identifier(document, loader, base_url)
            self._resolve_identity(document, loader, base_url)
            self._resolve_uris(document, loader, base_url)

            try:
                for key, val in document.items():
                    subscope = ""  # type: Text
                    if key in loader.subscopes:
                        subscope = "/" + loader.subscopes[key]
                    document[key], _ = loader.resolve_all(
                        val, base_url + subscope, file_base=file_base, checklinks=False
                    )
            except ValidationException as v:
                _logger.warning("loader is %s", id(loader), exc_info=True)
                raise_from(
                    ValidationException(
                        "({}) ({}) Validation error in field {}:".format(
                            id(loader), file_base, key
                        ),
                        None,
                        [v],
                    ),
                    v,
                )

        elif isinstance(document, CommentedSeq):
            i = 0
            try:
                while i < len(document):
                    val = document[i]
                    if isinstance(val, CommentedMap) and (
                        u"$import" in val or u"$mixin" in val
                    ):
                        l, import_metadata = loader.resolve_ref(
                            val, base_url=file_base, checklinks=False
                        )
                        metadata.setdefault("$import_metadata", {})
                        for identifier in loader.identifiers:
                            if identifier in import_metadata:
                                metadata["$import_metadata"][
                                    import_metadata[identifier]
                                ] = import_metadata
                        if isinstance(l, CommentedSeq):
                            lc = document.lc.data[i]
                            del document[i]
                            llen = len(l)
                            for j in range(len(document) + llen, i + llen, -1):
                                document.lc.data[j - 1] = document.lc.data[j - llen]
                            for item in l:
                                document.insert(i, item)
                                document.lc.data[i] = lc
                                i += 1
                        else:
                            document[i] = l
                            i += 1
                    else:
                        document[i], _ = loader.resolve_all(
                            val, base_url, file_base=file_base, checklinks=False
                        )
                        i += 1
            except ValidationException as v:
                _logger.warning("failed", exc_info=True)
                raise_from(
                    ValidationException(
                        "({}) ({}) Validation error in position {}:".format(
                            id(loader), file_base, i
                        ),
                        None,
                        [v],
                    ),
                    v,
                )

        if checklinks:
            all_doc_ids = {}  # type: Dict[Text, Text]
            loader.validate_links(
                document,
                u"",
                all_doc_ids,
                strict_foreign_properties=strict_foreign_properties,
            )

        return document, metadata

    def fetch(self, url, inject_ids=True):  # type: (Text, bool) -> Any
        if url in self.idx:
            return self.idx[url]
        try:
            text = self.fetch_text(url)
            if isinstance(text, bytes):
                textIO = StringIO(text.decode("utf-8"))
            else:
                textIO = StringIO(text)
            textIO.name = str(url)
            attachments = yaml.round_trip_load_all(textIO, preserve_quotes=True)
            result = next(attachments)

            if self.allow_attachments is not None and self.allow_attachments(result):
                i = 1
                for a in attachments:
                    self.idx["{}#attachment-{}".format(url, i)] = a
                    i += 1
            add_lc_filename(result, url)
        except yaml.error.MarkedYAMLError as e:
            raise_from(to_validation_exception(e), e)
        if isinstance(result, CommentedMap) and inject_ids and bool(self.identifiers):
            for identifier in self.identifiers:
                if identifier not in result:
                    result[identifier] = url
                self.idx[
                    self.expand_url(result[identifier], url, scoped_id=True)
                ] = result
        self.idx[url] = result
        return result

    FieldType = TypeVar("FieldType", Text, CommentedSeq, CommentedMap)

    def validate_scoped(self, field, link, docid):
        # type: (Text, Text, Text) -> Text
        split = urllib.parse.urlsplit(docid)
        sp = split.fragment.split(u"/")
        n = self.scoped_ref_fields[field]
        while n > 0 and len(sp) > 0:
            sp.pop()
            n -= 1
        tried = []
        while True:
            sp.append(link)
            url = urllib.parse.urlunsplit(
                (split.scheme, split.netloc, split.path, split.query, u"/".join(sp))
            )
            tried.append(url)
            if url in self.idx:
                return url
            sp.pop()
            if len(sp) == 0:
                break
            sp.pop()
        if onWindows() and link.startswith("file:"):
            link = link.lower()
        raise ValidationException(
            "Field `{}` references unknown identifier `{}`, tried {}".format(
                field, link, ", ".join(tried)
            )
        )

    def validate_link(self, field, link, docid, all_doc_ids):
        # type: (Text, Loader.FieldType, Text, Dict[Text, Text]) -> Loader.FieldType
        if field in self.nolinkcheck:
            return link
        if isinstance(link, string_types):
            if field in self.vocab_fields:
                if (
                    link not in self.vocab
                    and link not in self.idx
                    and link not in self.rvocab
                ):
                    if field in self.scoped_ref_fields:
                        return self.validate_scoped(field, link, docid)
                    elif not self.check_exists(link):
                        raise ValidationException(
                            "Field `{}` contains undefined reference to `{}`".format(
                                field, link
                            )
                        )
            elif link not in self.idx and link not in self.rvocab:
                if field in self.scoped_ref_fields:
                    return self.validate_scoped(field, link, docid)
                elif not self.check_exists(link):
                    raise ValidationException(
                        "Field `{}` contains undefined reference to `{}`".format(
                            field, link
                        )
                    )
        elif isinstance(link, CommentedSeq):
            errors = []
            for n, i in enumerate(link):
                try:
                    link[n] = self.validate_link(field, i, docid, all_doc_ids)
                except ValidationException as v:
                    errors.append(v)
            if bool(errors):
                raise ValidationException("", None, errors)
        elif isinstance(link, CommentedMap):
            self.validate_links(link, docid, all_doc_ids)
        else:
            raise ValidationException(
                "`{}` field is {}, expected string, list, or a dict.".format(
                    field, type(link).__name__
                )
            )
        return link

    def getid(self, d):  # type: (Any) -> Optional[Text]
        if isinstance(d, MutableMapping):
            for i in self.identifiers:
                if i in d:
                    idd = d[i]
                    if isinstance(idd, string_types):
                        return idd
        return None

    def validate_links(
        self,
        document,  # type: Union[CommentedMap, CommentedSeq, Text, None]
        base_url,  # type: Text
        all_doc_ids,  # type: Dict[Text, Text]
        strict_foreign_properties=False,  # type: bool
    ):  # type: (...) -> None
        docid = self.getid(document)
        if not docid:
            docid = base_url

        errors = []  # type: List[SchemaSaladException]
        iterator = None  # type: Any
        if isinstance(document, MutableSequence):
            iterator = enumerate(document)
        elif isinstance(document, MutableMapping):
            for d in self.url_fields:
                sl = SourceLine(document, d, Text)
                try:
                    if d in document and d not in self.identity_links:
                        document[d] = self.validate_link(
                            d, document[d], docid, all_doc_ids
                        )
                except SchemaSaladException as v:
                    v = v.with_sourceline(sl)
                    if d == "$schemas" or (
                        d in self.foreign_properties and not strict_foreign_properties
                    ):
                        _logger.warning(v)
                    else:
                        errors.append(v)
            # TODO: Validator should local scope only in which
            # duplicated keys are prohibited.
            # See also https://github.com/common-workflow-language/common-workflow-language/issues/734  # noqa: B950
            # In the future, it should raise
            # ValidationException instead of _logger.warn
            try:
                for (
                    identifier
                ) in self.identifiers:  # validate that each id is defined uniquely
                    if identifier in document:
                        sl = SourceLine(document, identifier, Text)
                        if (
                            document[identifier] in all_doc_ids
                            and sl.makeLead() != all_doc_ids[document[identifier]]
                        ):
                            _logger.warning(
                                "%s object %s `%s` previously defined",
                                all_doc_ids[document[identifier]],
                                identifier,
                                relname(document[identifier]),
                            )
                        else:
                            all_doc_ids[document[identifier]] = sl.makeLead()
                            break
            except ValidationException as v:
                errors.append(v.with_sourceline(sl))

            if hasattr(document, "iteritems"):
                iterator = iteritems(document)
            else:
                iterator = list(document.items())
        else:
            return

        for key, val in iterator:
            sl = SourceLine(document, key, Text)
            try:
                self.validate_links(
                    val,
                    docid,
                    all_doc_ids,
                    strict_foreign_properties=strict_foreign_properties,
                )
            except ValidationException as v:
                if key in self.nolinkcheck or (
                    isinstance(key, string_types) and ":" in key
                ):
                    _logger.warning(v)
                else:
                    docid2 = self.getid(val)
                    if docid2 is not None:
                        errors.append(
                            ValidationException(
                                "checking object `{}`".format(relname(docid2)), sl, [v]
                            )
                        )
                    else:
                        if isinstance(key, string_types):
                            errors.append(
                                ValidationException(
                                    "checking field `{}`".format(key), sl, [v]
                                )
                            )
                        else:
                            errors.append(ValidationException("checking item", sl, [v]))
        if bool(errors):
            if len(errors) > 1:
                raise ValidationException("", None, errors)
            else:
                raise errors[0]
        return


D = TypeVar("D", CommentedMap, ContextType)


def _copy_dict_without_key(from_dict, filtered_key):
    # type: (D, Any) -> D
    new_dict = CommentedMap(from_dict.items())
    if filtered_key in new_dict:
        del new_dict[filtered_key]
    if isinstance(from_dict, CommentedMap):
        new_dict.lc.data = copy.copy(from_dict.lc.data)
        new_dict.lc.filename = from_dict.lc.filename
    return new_dict