diff env/lib/python3.7/site-packages/rdflib_jsonld/context.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/rdflib_jsonld/context.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,297 @@
+# -*- coding: utf-8 -*-
+"""
+Implementation of the JSON-LD Context structure. See:
+
+    http://json-ld.org/
+
+"""
+from collections import namedtuple
+from rdflib.namespace import RDF
+
+from .keys import (BASE, CONTAINER, CONTEXT, GRAPH, ID, INDEX, LANG, LIST,
+        REV, SET, TYPE, VALUE, VOCAB)
+from . import errors
+from .util import source_to_json, urljoin, urlsplit, split_iri, norm_url
+
+
+NODE_KEYS = set([LANG, ID, TYPE, VALUE, LIST, SET, REV, GRAPH])
+
+class Defined(int): pass
+UNDEF = Defined(0)
+
+
+class Context(object):
+
+    def __init__(self, source=None, base=None):
+        self.language = None
+        self.vocab = None
+        self.base = base
+        self.doc_base = base
+        self.terms = {}
+        self._alias = {}
+        self._lookup = {}
+        self._prefixes = {}
+        self.active = False
+        if source:
+            self.load(source)
+
+    @property
+    def base(self):
+        return self._base
+
+    @base.setter
+    def base(self, base):
+        if base:
+            hash_index = base.find('#')
+            if hash_index > -1:
+                base = base[0:hash_index]
+        self._base = self.resolve_iri(base) if (
+                hasattr(self, '_base') and base is not None) else base
+        self._basedomain = '%s://%s' % urlsplit(base)[0:2] if base else None
+
+    def subcontext(self, source):
+        # IMPROVE: to optimize, implement SubContext with parent fallback support
+        ctx = Context()
+        ctx.language = self.language
+        ctx.vocab = self.vocab
+        ctx.base = self.base
+        ctx.doc_base = self.doc_base
+        ctx._alias = self._alias.copy()
+        ctx.terms = self.terms.copy()
+        ctx._lookup = self._lookup.copy()
+        ctx._prefixes = self._prefixes.copy()
+        ctx.load(source)
+        return ctx
+
+    def get_id(self, obj):
+        return self._get(obj, ID)
+
+    def get_type(self, obj):
+        return self._get(obj, TYPE)
+
+    def get_language(self, obj):
+        return self._get(obj, LANG)
+
+    def get_value(self, obj):
+        return self._get(obj, VALUE)
+
+    def get_graph(self, obj):
+        return self._get(obj, GRAPH)
+
+    def get_list(self, obj):
+        return self._get(obj, LIST)
+
+    def get_set(self, obj):
+        return self._get(obj, SET)
+
+    def get_rev(self, obj):
+        return self._get(obj, REV)
+
+    def _get(self, obj, key):
+        return obj.get(self._alias.get(key)) or obj.get(key)
+
+    def get_key(self, key):
+        return self._alias.get(key, key)
+
+    lang_key = property(lambda self: self.get_key(LANG))
+    id_key = property(lambda self: self.get_key(ID))
+    type_key = property(lambda self: self.get_key(TYPE))
+    value_key = property(lambda self: self.get_key(VALUE))
+    list_key = property(lambda self: self.get_key(LIST))
+    rev_key = property(lambda self: self.get_key(REV))
+    graph_key = property(lambda self: self.get_key(GRAPH))
+
+    def add_term(self, name, idref, coercion=UNDEF, container=UNDEF,
+            language=UNDEF, reverse=False):
+        term = Term(idref, name, coercion, container, language, reverse)
+        self.terms[name] = term
+        self._lookup[(idref, coercion or language, container, reverse)] = term
+        self._prefixes[idref] = name
+
+    def find_term(self, idref, coercion=None, container=UNDEF,
+            language=None, reverse=False):
+        lu = self._lookup
+        if coercion is None:
+            coercion = language
+        if coercion is not UNDEF and container:
+            found = lu.get((idref, coercion, container, reverse))
+            if found: return found
+        if coercion is not UNDEF:
+            found = lu.get((idref, coercion, UNDEF, reverse))
+            if found: return found
+        if container:
+            found = lu.get((idref, coercion, container, reverse))
+            if found: return found
+        elif language:
+            found = lu.get((idref, UNDEF, LANG, reverse))
+            if found: return found
+        else:
+            found = lu.get((idref, coercion or UNDEF, SET, reverse))
+            if found: return found
+        return lu.get((idref, UNDEF, UNDEF, reverse))
+
+    def resolve(self, curie_or_iri):
+        iri = self.expand(curie_or_iri, False)
+        if self.isblank(iri):
+            return iri
+        return self.resolve_iri(iri)
+
+    def resolve_iri(self, iri):
+        return norm_url(self._base, iri)
+
+    def isblank(self, ref):
+        return ref.startswith('_:')
+
+    def expand(self, term_curie_or_iri, use_vocab=True):
+        if use_vocab:
+            term = self.terms.get(term_curie_or_iri)
+            if term:
+                return term.id
+        is_term, pfx, local = self._prep_expand(term_curie_or_iri)
+        if pfx == '_':
+            return term_curie_or_iri
+        if pfx is not None:
+            ns = self.terms.get(pfx)
+            if ns and ns.id:
+                return ns.id + local
+        elif is_term and use_vocab:
+            if self.vocab:
+                return self.vocab + term_curie_or_iri
+            return None
+        return self.resolve_iri(term_curie_or_iri)
+
+    def shrink_iri(self, iri):
+        ns, name = split_iri(str(iri))
+        pfx = self._prefixes.get(ns)
+        if pfx:
+            return ":".join((pfx, name))
+        elif self._base:
+            if str(iri) == self._base:
+                return ""
+            elif iri.startswith(self._basedomain):
+                    return iri[len(self._basedomain):]
+        return iri
+
+    def to_symbol(self, iri):
+        iri = str(iri)
+        term = self.find_term(iri)
+        if term:
+            return term.name
+        ns, name = split_iri(iri)
+        if ns == self.vocab:
+            return name
+        pfx = self._prefixes.get(ns)
+        if pfx:
+            return ":".join((pfx, name))
+        return iri
+
+    def load(self, source, base=None):
+        self.active = True
+        sources = []
+        source = source if isinstance(source, list) else [source]
+        self._prep_sources(base, source, sources)
+        for source_url, source in sources:
+            self._read_source(source, source_url)
+
+    def _prep_sources(self, base, inputs, sources, referenced_contexts=None,
+            in_source_url=None):
+        referenced_contexts = referenced_contexts or set()
+        for source in inputs:
+            if isinstance(source, str):
+                source_url = urljoin(base, source)
+                if source_url in referenced_contexts:
+                    raise errors.RECURSIVE_CONTEXT_INCLUSION
+                referenced_contexts.add(source_url)
+                source = source_to_json(source_url)
+                if CONTEXT not in source:
+                    raise errors.INVALID_REMOTE_CONTEXT
+            else:
+                source_url = in_source_url
+
+            if isinstance(source, dict):
+                if CONTEXT in source:
+                    source = source[CONTEXT]
+                    source = source if isinstance(source, list) else [source]
+            if isinstance(source, list):
+                self._prep_sources(base, source, sources, referenced_contexts, source_url)
+            else:
+                sources.append((source_url, source))
+
+    def _read_source(self, source, source_url=None):
+        self.vocab = source.get(VOCAB, self.vocab)
+        for key, value in list(source.items()):
+            if key == LANG:
+                self.language = value
+            elif key == VOCAB:
+                continue
+            elif key == BASE:
+                if source_url:
+                    continue
+                self.base = value
+            else:
+                self._read_term(source, key, value)
+
+    def _read_term(self, source, name, dfn):
+        if isinstance(dfn, dict):
+            #term = self._create_term(source, key, value)
+            rev = dfn.get(REV)
+            idref = rev or dfn.get(ID, UNDEF)
+            if idref == TYPE:
+                idref = str(RDF.type)
+            elif idref is not UNDEF:
+                idref = self._rec_expand(source, idref)
+            elif ':' in name:
+                idref = self._rec_expand(source, name)
+            elif self.vocab:
+                idref = self.vocab + name
+            coercion = dfn.get(TYPE, UNDEF)
+            if coercion and coercion not in (ID, TYPE, VOCAB):
+                coercion = self._rec_expand(source, coercion)
+            self.add_term(name, idref, coercion,
+                    dfn.get(CONTAINER, UNDEF), dfn.get(LANG, UNDEF), bool(rev))
+        else:
+            idref = self._rec_expand(source, dfn)
+            self.add_term(name, idref)
+        if idref in NODE_KEYS:
+            self._alias[idref] = name
+
+    def _rec_expand(self, source, expr, prev=None):
+        if expr == prev or expr in NODE_KEYS:
+            return expr
+        is_term, pfx, nxt = self._prep_expand(expr)
+        if pfx:
+            iri = self._get_source_id(source, pfx) or self.expand(pfx)
+            if iri is None:
+                nxt = expr
+            else:
+                nxt = iri + nxt
+        else:
+            nxt = self._get_source_id(source, nxt) or nxt
+            if ':' not in nxt and self.vocab:
+                return self.vocab + nxt
+        return self._rec_expand(source, nxt, expr)
+
+    def _prep_expand(self, expr):
+        if ':' not in expr:
+            return True, None, expr
+        pfx, local = expr.split(':', 1)
+        if not local.startswith('//'):
+            return False, pfx, local
+        else:
+            return False, None, expr
+
+    def _get_source_id(self, source, key):
+        # .. from source dict or if already defined
+        term = source.get(key)
+        if term is None:
+            dfn = self.terms.get(key)
+            if dfn:
+                term = dfn.id
+        elif isinstance(term, dict):
+            term = term.get(ID)
+        return term
+
+
+Term = namedtuple('Term',
+        'id, name, type, container, language, reverse')
+Term.__new__.__defaults__ = (UNDEF, UNDEF, UNDEF, False)