Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/rdflib/plugins/serializers/trix.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/rdflib/plugins/serializers/trix.py Thu May 14 14:56:58 2020 -0400 @@ -0,0 +1,80 @@ +from rdflib.serializer import Serializer +from rdflib.plugins.serializers.xmlwriter import XMLWriter + +from rdflib.term import URIRef, Literal, BNode +from rdflib.namespace import Namespace + +from rdflib.graph import Graph, ConjunctiveGraph + +from rdflib.py3compat import b + +__all__ = ['TriXSerializer'] + +## TODO: MOve this somewhere central +TRIXNS = Namespace("http://www.w3.org/2004/03/trix/trix-1/") +XMLNS = Namespace("http://www.w3.org/XML/1998/namespace") + + +class TriXSerializer(Serializer): + def __init__(self, store): + super(TriXSerializer, self).__init__(store) + if not store.context_aware: + raise Exception( + "TriX serialization only makes sense for context-aware stores") + + def serialize(self, stream, base=None, encoding=None, **args): + + nm = self.store.namespace_manager + + self.writer = XMLWriter(stream, nm, encoding, extra_ns={"": TRIXNS}) + + self.writer.push(TRIXNS["TriX"]) + self.writer.namespaces() + + if isinstance(self.store, ConjunctiveGraph): + for subgraph in self.store.contexts(): + self._writeGraph(subgraph) + elif isinstance(self.store, Graph): + self._writeGraph(self.store) + else: + raise Exception("Unknown graph type: " + type(self.store)) + + self.writer.pop() + stream.write(b("\n")) + + def _writeGraph(self, graph): + self.writer.push(TRIXNS["graph"]) + if isinstance(graph.identifier, URIRef): + self.writer.element( + TRIXNS["uri"], content=str(graph.identifier)) + + for triple in graph.triples((None, None, None)): + self._writeTriple(triple) + self.writer.pop() + + def _writeTriple(self, triple): + self.writer.push(TRIXNS["triple"]) + for component in triple: + if isinstance(component, URIRef): + self.writer.element(TRIXNS["uri"], + content=str(component)) + elif isinstance(component, BNode): + self.writer.element(TRIXNS["id"], + content=str(component)) + elif isinstance(component, Literal): + if component.datatype: + self.writer.element(TRIXNS["typedLiteral"], + content=str(component), + attributes={TRIXNS["datatype"]: + str( + component.datatype)}) + elif component.language: + self.writer.element(TRIXNS["plainLiteral"], + content=str(component), + attributes={XMLNS["lang"]: + str( + component.language)}) + else: + self.writer.element(TRIXNS["plainLiteral"], + content=str(component)) + self.writer.pop()