diff planemo/lib/python3.7/site-packages/rdflib/plugins/parsers/ntriples.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/rdflib/plugins/parsers/ntriples.py	Fri Jul 31 00:32:28 2020 -0400
@@ -0,0 +1,283 @@
+#!/usr/bin/env python
+__doc__ = """
+N-Triples Parser
+License: GPL 2, W3C, BSD, or MIT
+Author: Sean B. Palmer, inamidst.com
+"""
+
+import re
+import codecs
+
+from rdflib.term import URIRef as URI
+from rdflib.term import BNode as bNode
+from rdflib.term import Literal
+
+from rdflib.py3compat import cast_bytes, decodeUnicodeEscape
+
+__all__ = ['unquote', 'uriquote', 'Sink', 'NTriplesParser']
+
+uriref = r'<([^:]+:[^\s"<>]+)>'
+literal = r'"([^"\\]*(?:\\.[^"\\]*)*)"'
+litinfo = r'(?:@([a-z]+(?:-[a-zA-Z0-9]+)*)|\^\^' + uriref + r')?'
+
+r_line = re.compile(r'([^\r\n]*)(?:\r\n|\r|\n)')
+r_wspace = re.compile(r'[ \t]*')
+r_wspaces = re.compile(r'[ \t]+')
+r_tail = re.compile(r'[ \t]*\.[ \t]*(#.*)?')
+r_uriref = re.compile(uriref)
+r_nodeid = re.compile(r'_:([A-Za-z0-9]*)')
+r_literal = re.compile(literal + litinfo)
+
+bufsiz = 2048
+validate = False
+
+
+class Node(str):
+    pass
+
+
+class ParseError(Exception):
+    pass
+
+
+class Sink(object):
+    def __init__(self):
+        self.length = 0
+
+    def triple(self, s, p, o):
+        self.length += 1
+        print((s, p, o))
+
+quot = {'t': '\t', 'n': '\n', 'r': '\r', '"': '"', '\\':
+        '\\'}
+r_safe = re.compile(r'([\x20\x21\x23-\x5B\x5D-\x7E]+)')
+r_quot = re.compile(r'\\(t|n|r|"|\\)')
+r_uniquot = re.compile(r'\\u([0-9A-F]{4})|\\U([0-9A-F]{8})')
+
+
+def unquote(s):
+    """Unquote an N-Triples string."""
+    if not validate:
+
+        if isinstance(s, str): # nquads
+            s = decodeUnicodeEscape(s)
+        else:
+            s = s.decode('unicode-escape')
+
+        return s
+    else:
+        result = []
+        while s:
+            m = r_safe.match(s)
+            if m:
+                s = s[m.end():]
+                result.append(m.group(1))
+                continue
+
+            m = r_quot.match(s)
+            if m:
+                s = s[2:]
+                result.append(quot[m.group(1)])
+                continue
+
+            m = r_uniquot.match(s)
+            if m:
+                s = s[m.end():]
+                u, U = m.groups()
+                codepoint = int(u or U, 16)
+                if codepoint > 0x10FFFF:
+                    raise ParseError("Disallowed codepoint: %08X" % codepoint)
+                result.append(chr(codepoint))
+            elif s.startswith('\\'):
+                raise ParseError("Illegal escape at: %s..." % s[:10])
+            else:
+                raise ParseError("Illegal literal character: %r" % s[0])
+        return ''.join(result)
+
+r_hibyte = re.compile(r'([\x80-\xFF])')
+
+
+def uriquote(uri):
+    if not validate:
+        return uri
+    else:
+        return r_hibyte.sub(
+            lambda m: '%%%02X' % ord(m.group(1)), uri)
+
+
+class NTriplesParser(object):
+    """An N-Triples Parser.
+
+    Usage::
+
+          p = NTriplesParser(sink=MySink())
+          sink = p.parse(f) # file; use parsestring for a string
+    """
+
+    _bnode_ids = {}
+
+    def __init__(self, sink=None):
+        if sink is not None:
+            self.sink = sink
+        else:
+            self.sink = Sink()
+
+    def parse(self, f):
+        """Parse f as an N-Triples file."""
+        if not hasattr(f, 'read'):
+            raise ParseError("Item to parse must be a file-like object.")
+
+        # since N-Triples 1.1 files can and should be utf-8 encoded
+        f = codecs.getreader('utf-8')(f)
+
+        self.file = f
+        self.buffer = ''
+        while True:
+            self.line = self.readline()
+            if self.line is None:
+                break
+            try:
+                self.parseline()
+            except ParseError:
+                raise ParseError("Invalid line: %r" % self.line)
+        return self.sink
+
+    def parsestring(self, s):
+        """Parse s as an N-Triples string."""
+        if not isinstance(s, str):
+            raise ParseError("Item to parse must be a string instance.")
+        try:
+            from io import BytesIO
+            assert BytesIO
+        except ImportError:
+            from io import StringIO as BytesIO
+            assert BytesIO
+        f = BytesIO()
+        f.write(cast_bytes(s))
+        f.seek(0)
+        self.parse(f)
+
+    def readline(self):
+        """Read an N-Triples line from buffered input."""
+        # N-Triples lines end in either CRLF, CR, or LF
+        # Therefore, we can't just use f.readline()
+        if not self.buffer:
+            buffer = self.file.read(bufsiz)
+            if not buffer:
+                return None
+            self.buffer = buffer
+
+        while True:
+            m = r_line.match(self.buffer)
+            if m:  # the more likely prospect
+                self.buffer = self.buffer[m.end():]
+                return m.group(1)
+            else:
+                buffer = self.file.read(bufsiz)
+                if not buffer and not self.buffer.isspace():
+                    # Last line does not need to be terminated with a newline
+                    buffer += "\n"
+                elif not buffer:
+                    return None
+                self.buffer += buffer
+
+    def parseline(self):
+        self.eat(r_wspace)
+        if (not self.line) or self.line.startswith('#'):
+            return  # The line is empty or a comment
+
+        subject = self.subject()
+        self.eat(r_wspaces)
+
+        predicate = self.predicate()
+        self.eat(r_wspaces)
+
+        object = self.object()
+        self.eat(r_tail)
+
+        if self.line:
+            raise ParseError("Trailing garbage")
+        self.sink.triple(subject, predicate, object)
+
+    def peek(self, token):
+        return self.line.startswith(token)
+
+    def eat(self, pattern):
+        m = pattern.match(self.line)
+        if not m:  # @@ Why can't we get the original pattern?
+            # print(dir(pattern))
+            # print repr(self.line), type(self.line)
+            raise ParseError("Failed to eat %s at %s" % (pattern.pattern, self.line))
+        self.line = self.line[m.end():]
+        return m
+
+    def subject(self):
+        # @@ Consider using dictionary cases
+        subj = self.uriref() or self.nodeid()
+        if not subj:
+            raise ParseError("Subject must be uriref or nodeID")
+        return subj
+
+    def predicate(self):
+        pred = self.uriref()
+        if not pred:
+            raise ParseError("Predicate must be uriref")
+        return pred
+
+    def object(self):
+        objt = self.uriref() or self.nodeid() or self.literal()
+        if objt is False:
+            raise ParseError("Unrecognised object type")
+        return objt
+
+    def uriref(self):
+        if self.peek('<'):
+            uri = self.eat(r_uriref).group(1)
+            uri = unquote(uri)
+            uri = uriquote(uri)
+            return URI(uri)
+        return False
+
+    def nodeid(self):
+        if self.peek('_'):
+            # Fix for https://github.com/RDFLib/rdflib/issues/204
+            bnode_id = self.eat(r_nodeid).group(1)
+            new_id = self._bnode_ids.get(bnode_id, None)
+            if new_id is not None:
+                # Re-map to id specfic to this doc
+                return bNode(new_id)
+            else:
+                # Replace with freshly-generated document-specific BNode id
+                bnode = bNode()
+                # Store the mapping
+                self._bnode_ids[bnode_id] = bnode
+                return bnode
+        return False
+
+    def literal(self):
+        if self.peek('"'):
+            lit, lang, dtype = self.eat(r_literal).groups()
+            if lang:
+                lang = lang
+            else:
+                lang = None
+            if dtype:
+                dtype = dtype
+            else:
+                dtype = None
+            if lang and dtype:
+                raise ParseError("Can't have both a language and a datatype")
+            lit = unquote(lit)
+            return Literal(lit, lang, dtype)
+        return False
+
+# # Obsolete, unused
+# def parseURI(uri):
+#     import urllib
+#     parser = NTriplesParser()
+#     u = urllib.urlopen(uri)
+#     sink = parser.parse(u)
+#     u.close()
+#     # for triple in sink:
+#     #     print triple
+#     print 'Length of input:', sink.length