diff planemo/lib/python3.7/site-packages/rdflib/plugins/parsers/trix.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/rdflib/plugins/parsers/trix.py	Fri Jul 31 00:32:28 2020 -0400
@@ -0,0 +1,268 @@
+"""
+A TriX parser for RDFLib
+"""
+from rdflib.namespace import Namespace
+from rdflib.term import URIRef
+from rdflib.term import BNode
+from rdflib.term import Literal
+from rdflib.graph import Graph, ConjunctiveGraph
+from rdflib.exceptions import ParserError
+from rdflib.parser import Parser
+
+from xml.sax.saxutils import handler
+from xml.sax import make_parser
+from xml.sax.handler import ErrorHandler
+
+__all__ = ['create_parser', 'TriXHandler', 'TriXParser']
+
+
+TRIXNS = Namespace("http://www.w3.org/2004/03/trix/trix-1/")
+XMLNS = Namespace("http://www.w3.org/XML/1998/namespace")
+
+
+class TriXHandler(handler.ContentHandler):
+    """An Sax Handler for TriX. See http://sw.nokia.com/trix/"""
+
+    def __init__(self, store):
+        self.store = store
+        self.preserve_bnode_ids = False
+        self.reset()
+
+    def reset(self):
+        self.bnode = {}
+        self.graph = None
+        self.triple = None
+        self.state = 0
+        self.lang = None
+        self.datatype = None
+
+    # ContentHandler methods
+
+    def setDocumentLocator(self, locator):
+        self.locator = locator
+
+    def startDocument(self):
+        pass
+
+    def startPrefixMapping(self, prefix, namespace):
+        pass
+
+    def endPrefixMapping(self, prefix):
+        pass
+
+    def startElementNS(self, name, qname, attrs):
+
+        if name[0] != str(TRIXNS):
+            self.error(
+                "Only elements in the TriX namespace are allowed. %s!=%s"
+                % (name[0], TRIXNS))
+
+        if name[1] == "TriX":
+            if self.state == 0:
+                self.state = 1
+            else:
+                self.error("Unexpected TriX element")
+
+        elif name[1] == "graph":
+            if self.state == 1:
+                self.state = 2
+            else:
+                self.error("Unexpected graph element")
+
+        elif name[1] == "uri":
+            if self.state == 2:
+                # the context uri
+                self.state = 3
+            elif self.state == 4:
+                # part of a triple
+                pass
+            else:
+                self.error("Unexpected uri element")
+
+        elif name[1] == "triple":
+            if self.state == 2:
+                if self.graph is None:
+                    # anonymous graph, create one with random bnode id
+                    self.graph = Graph(store=self.store)
+                # start of a triple
+                self.triple = []
+                self.state = 4
+            else:
+                self.error("Unexpected triple element")
+
+        elif name[1] == "typedLiteral":
+            if self.state == 4:
+                # part of triple
+                self.lang = None
+                self.datatype = None
+
+                try:
+                    self.lang = attrs.getValue((str(XMLNS), "lang"))
+                except:
+                    # language not required - ignore
+                    pass
+                try:
+                    self.datatype = attrs.getValueByQName("datatype")
+                except KeyError:
+                    self.error("No required attribute 'datatype'")
+            else:
+                self.error("Unexpected typedLiteral element")
+
+        elif name[1] == "plainLiteral":
+            if self.state == 4:
+                # part of triple
+                self.lang = None
+                self.datatype = None
+                try:
+                    self.lang = attrs.getValue((str(XMLNS), "lang"))
+                except:
+                    # language not required - ignore
+                    pass
+
+            else:
+                self.error("Unexpected plainLiteral element")
+
+        elif name[1] == "id":
+            if self.state == 2:
+                # the context uri
+                self.state = 3
+
+            elif self.state == 4:
+                # part of triple
+                pass
+            else:
+                self.error("Unexpected id element")
+
+        else:
+            self.error("Unknown element %s in TriX namespace" % name[1])
+
+        self.chars = ""
+
+    def endElementNS(self, name, qname):
+        if name[0] != str(TRIXNS):
+            self.error(
+                "Only elements in the TriX namespace are allowed. %s!=%s"
+                % (name[0], TRIXNS))
+
+        if name[1] == "uri":
+            if self.state == 3:
+                self.graph = Graph(store=self.store,
+                                   identifier=URIRef(self.chars.strip()))
+                self.state = 2
+            elif self.state == 4:
+                self.triple += [URIRef(self.chars.strip())]
+            else:
+                self.error(
+                    "Illegal internal self.state - This should never " +
+                    "happen if the SAX parser ensures XML syntax correctness")
+
+        elif name[1] == "id":
+            if self.state == 3:
+                self.graph = Graph(self.store, identifier=self.get_bnode(
+                    self.chars.strip()))
+                self.state = 2
+            elif self.state == 4:
+                self.triple += [self.get_bnode(self.chars.strip())]
+            else:
+                self.error(
+                    "Illegal internal self.state - This should never " +
+                    "happen if the SAX parser ensures XML syntax correctness")
+
+        elif name[1] == "plainLiteral" or name[1] == "typedLiteral":
+            if self.state == 4:
+                self.triple += [Literal(
+                    self.chars, lang=self.lang, datatype=self.datatype)]
+            else:
+                self.error(
+                    "This should never happen if the SAX parser " +
+                    "ensures XML syntax correctness")
+
+        elif name[1] == "triple":
+            if self.state == 4:
+                if len(self.triple) != 3:
+                    self.error("Triple has wrong length, got %d elements: %s" %
+                               (len(self.triple), self.triple))
+
+                self.graph.add(self.triple)
+                # self.store.store.add(self.triple,context=self.graph)
+                # self.store.addN([self.triple+[self.graph]])
+                self.state = 2
+            else:
+                self.error(
+                    "This should never happen if the SAX parser " +
+                    "ensures XML syntax correctness")
+
+        elif name[1] == "graph":
+            self.graph = None
+            self.state = 1
+
+        elif name[1] == "TriX":
+            self.state = 0
+
+        else:
+            self.error("Unexpected close element")
+
+    def get_bnode(self, label):
+        if self.preserve_bnode_ids:
+            bn = BNode(label)
+        else:
+            if label in self.bnode:
+                bn = self.bnode[label]
+            else:
+                bn = BNode(label)
+                self.bnode[label] = bn
+        return bn
+
+    def characters(self, content):
+        self.chars += content
+
+    def ignorableWhitespace(self, content):
+        pass
+
+    def processingInstruction(self, target, data):
+        pass
+
+    def error(self, message):
+        locator = self.locator
+        info = "%s:%s:%s: " % (
+            locator.getSystemId(),
+            locator.getLineNumber(),
+            locator.getColumnNumber())
+        raise ParserError(info + message)
+
+
+def create_parser(store):
+    parser = make_parser()
+    try:
+        # Workaround for bug in expatreader.py. Needed when
+        # expatreader is trying to guess a prefix.
+        parser.start_namespace_decl(
+            "xml", "http://www.w3.org/XML/1998/namespace")
+    except AttributeError:
+        pass  # Not present in Jython (at least)
+    parser.setFeature(handler.feature_namespaces, 1)
+    trix = TriXHandler(store)
+    parser.setContentHandler(trix)
+    parser.setErrorHandler(ErrorHandler())
+    return parser
+
+
+class TriXParser(Parser):
+    """A parser for TriX. See http://sw.nokia.com/trix/"""
+
+    def __init__(self):
+        pass
+
+    def parse(self, source, sink, **args):
+        assert sink.store.context_aware, (
+            "TriXParser must be given a context aware store.")
+
+        self._parser = create_parser(sink.store)
+        content_handler = self._parser.getContentHandler()
+        preserve_bnode_ids = args.get("preserve_bnode_ids", None)
+        if preserve_bnode_ids is not None:
+            content_handler.preserve_bnode_ids = preserve_bnode_ids
+        # We're only using it once now
+        # content_handler.reset()
+        # self._parser.reset()
+        self._parser.parse(source)