view env/lib/python3.7/site-packages/schema_salad/jsonld_context.py @ 4:79f47841a781 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:47:39 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

from __future__ import absolute_import

import logging
from typing import (
    Any,
    Dict,
    Iterable,
    List,
    MutableMapping,
    MutableSequence,
    Optional,
    Tuple,
    Union,
    cast,
)

import rdflib
import rdflib.namespace
import six
from rdflib import Graph, URIRef
from rdflib.namespace import RDF, RDFS
from six.moves import urllib
from typing_extensions import Text  # pylint: disable=unused-import

from .exceptions import SchemaException
from .ref_resolver import ContextType  # pylint: disable=unused-import
from .utils import aslist, json_dumps

# move to a regular typing import when Python 3.3-3.6 is no longer supported


_logger = logging.getLogger("salad")


def pred(
    datatype,  # type: MutableMapping[Text, Union[Dict[Text, Text], Text]]
    field,  # type: Optional[Dict[Text, Any]]
    name,  # type: str
    context,  # type: ContextType
    defaultBase,  # type: str
    namespaces,  # type: Dict[Text, rdflib.namespace.Namespace]
):  # type: (...) -> Union[Dict[Text, Text], Text]
    split = urllib.parse.urlsplit(name)

    vee = None  # type: Optional[Text]

    if split.scheme != "":
        vee = name
        (ns, ln) = rdflib.namespace.split_uri(six.text_type(vee))
        name = ln
        if ns[0:-1] in namespaces:
            vee = six.text_type(namespaces[ns[0:-1]][ln])
        _logger.debug("name, v %s %s", name, vee)

    v = None  # type: Optional[Dict[Text, Any]]

    if field is not None and "jsonldPredicate" in field:
        if isinstance(field["jsonldPredicate"], MutableMapping):
            v = {}
            for k, val in field["jsonldPredicate"].items():
                v[("@" + k[1:] if k.startswith("_") else k)] = val
            if "@id" not in v:
                v["@id"] = vee
        else:
            v = field["jsonldPredicate"]
    elif "jsonldPredicate" in datatype:
        if isinstance(datatype["jsonldPredicate"], Iterable):
            for d in datatype["jsonldPredicate"]:
                if isinstance(d, MutableMapping):
                    if d["symbol"] == name:
                        v = d["predicate"]
                else:
                    raise SchemaException(
                        "entries in the jsonldPredicate List must be " "Dictionaries"
                    )
        else:
            raise SchemaException("jsonldPredicate must be a List of Dictionaries.")

    ret = v or vee

    if not ret:
        ret = defaultBase + name

    if name in context:
        if context[name] != ret:
            raise SchemaException(
                "Predicate collision on {}, '{}' != '{}'".format(
                    name, context[name], ret
                )
            )
    else:
        _logger.debug("Adding to context '%s' %s (%s)", name, ret, type(ret))
        context[name] = ret

    return ret


def process_type(
    t,  # type: MutableMapping[Text, Any]
    g,  # type: Graph
    context,  # type: ContextType
    defaultBase,  # type: str
    namespaces,  # type: Dict[Text, rdflib.namespace.Namespace]
    defaultPrefix,  # type: str
):  # type: (...) -> None
    if t["type"] not in ("record", "enum"):
        return

    if "name" in t:
        recordname = t["name"]

        _logger.debug("Processing %s %s\n", t.get("type"), t)

        classnode = URIRef(recordname)
        g.add((classnode, RDF.type, RDFS.Class))

        split = urllib.parse.urlsplit(recordname)
        predicate = recordname
        if t.get("inVocab", True):
            if split.scheme:
                (ns, ln) = rdflib.namespace.split_uri(six.text_type(recordname))
                predicate = recordname
                recordname = ln
            else:
                predicate = "{}:{}".format(defaultPrefix, recordname)

        if context.get(recordname, predicate) != predicate:
            raise SchemaException(
                "Predicate collision on '{}', '{}' != '{}'".format(
                    recordname, context[recordname], predicate
                )
            )

        if not recordname:
            raise SchemaException("Unable to find/derive recordname for {}".format(t))

        _logger.debug(
            "Adding to context '%s' %s (%s)", recordname, predicate, type(predicate)
        )
        context[recordname] = predicate

    if t["type"] == "record":
        for i in t.get("fields", []):
            fieldname = i["name"]

            _logger.debug("Processing field %s", i)

            v = pred(
                t, i, fieldname, context, defaultPrefix, namespaces
            )  # type: Union[Dict[Any, Any], Text, None]

            if isinstance(v, six.string_types):
                v = v if v[0] != "@" else None
            elif v is not None:
                v = v["_@id"] if v.get("_@id", "@")[0] != "@" else None

            if bool(v):
                (ns, ln) = rdflib.namespace.split_uri(six.text_type(v))
                if ns[0:-1] in namespaces:
                    propnode = namespaces[ns[0:-1]][ln]
                else:
                    propnode = URIRef(v)

                g.add((propnode, RDF.type, RDF.Property))
                g.add((propnode, RDFS.domain, classnode))

                # TODO generate range from datatype.

            if isinstance(i["type"], MutableMapping):
                process_type(
                    i["type"], g, context, defaultBase, namespaces, defaultPrefix
                )

        if "extends" in t:
            for e in aslist(t["extends"]):
                g.add((classnode, RDFS.subClassOf, URIRef(e)))
    elif t["type"] == "enum":
        _logger.debug("Processing enum %s", t.get("name"))

        for i in t["symbols"]:
            pred(t, None, i, context, defaultBase, namespaces)


def salad_to_jsonld_context(
    j,  # type: Iterable[MutableMapping[Text, Any]]
    schema_ctx,  # type: MutableMapping[Text, Any]
):  # type: (...) -> Tuple[ContextType, Graph]
    context = {}  # type: ContextType
    namespaces = {}
    g = Graph()
    defaultPrefix = ""

    for k, v in schema_ctx.items():
        context[k] = v
        namespaces[k] = rdflib.namespace.Namespace(v)

    if "@base" in context:
        defaultBase = cast(str, context["@base"])
        del context["@base"]
    else:
        defaultBase = ""

    for k, v in namespaces.items():
        g.bind(str(k), v)

    for t in j:
        process_type(t, g, context, defaultBase, namespaces, defaultPrefix)

    return (context, g)


def fix_jsonld_ids(
    obj,  # type: Union[List[Dict[Text, Any]], MutableMapping[Text, Any]]
    ids,  # type: List[Text]
):  # type: (...) -> None
    if isinstance(obj, MutableMapping):
        for i in ids:
            if i in obj:
                obj["@id"] = obj[i]
        for v in obj.values():
            fix_jsonld_ids(v, ids)
    if isinstance(obj, MutableSequence):
        for entry in obj:
            fix_jsonld_ids(entry, ids)


def makerdf(
    workflow,  # type: Text
    wf,  # type: Union[List[Dict[Text, Any]], MutableMapping[Text, Any]]
    ctx,  # type: ContextType
    graph=None,  # type: Optional[Graph]
):  # type: (...) -> Graph
    prefixes = {}
    idfields = []
    for k, v in six.iteritems(ctx):
        if isinstance(v, MutableMapping):
            url = v["@id"]
        else:
            url = v
        if url == "@id":
            idfields.append(k)
        doc_url, frg = urllib.parse.urldefrag(url)
        if "/" in frg:
            p = frg.split("/")[0]
            prefixes[p] = u"{}#{}/".format(doc_url, p)

    fix_jsonld_ids(wf, idfields)

    if graph is None:
        g = Graph()
    else:
        g = graph

    if isinstance(wf, MutableSequence):
        for w in wf:
            w["@context"] = ctx
            g.parse(data=json_dumps(w), format="json-ld", publicID=str(workflow))
    else:
        wf["@context"] = ctx
        g.parse(data=json_dumps(wf), format="json-ld", publicID=str(workflow))

    # Bug in json-ld loader causes @id fields to be added to the graph
    for sub, pred, obj in g.triples((None, URIRef("@id"), None)):
        g.remove((sub, pred, obj))

    for k2, v2 in six.iteritems(prefixes):
        g.namespace_manager.bind(k2, v2)

    return g