Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/rdflib/plugins/serializers/rdfxml.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 (2020-06-01) |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/rdflib/plugins/serializers/rdfxml.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,340 +0,0 @@ -from rdflib.plugins.serializers.xmlwriter import XMLWriter - -from rdflib.namespace import Namespace, RDF, RDFS # , split_uri - -from rdflib.term import URIRef, Literal, BNode -from rdflib.util import first, more_than -from rdflib.collection import Collection -from rdflib.serializer import Serializer - -# from rdflib.exceptions import Error - -from rdflib.py3compat import b - -from xml.sax.saxutils import quoteattr, escape -import xml.dom.minidom - -from .xmlwriter import ESCAPE_ENTITIES - -__all__ = ['fix', 'XMLSerializer', 'PrettyXMLSerializer'] - - -class XMLSerializer(Serializer): - - def __init__(self, store): - super(XMLSerializer, self).__init__(store) - - def __bindings(self): - store = self.store - nm = store.namespace_manager - bindings = {} - - for predicate in set(store.predicates()): - prefix, namespace, name = nm.compute_qname(predicate) - bindings[prefix] = URIRef(namespace) - - RDFNS = URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#") - - if "rdf" in bindings: - assert bindings["rdf"] == RDFNS - else: - bindings["rdf"] = RDFNS - - for prefix, namespace in bindings.items(): - yield prefix, namespace - - def serialize(self, stream, base=None, encoding=None, **args): - self.base = base - self.__stream = stream - self.__serialized = {} - encoding = self.encoding - self.write = write = lambda uni: stream.write( - uni.encode(encoding, 'replace')) - - # startDocument - write('<?xml version="1.0" encoding="%s"?>\n' % self.encoding) - - # startRDF - write('<rdf:RDF\n') - - # If provided, write xml:base attribute for the RDF - if "xml_base" in args: - write(' xml:base="%s"\n' % args['xml_base']) - # TODO: - # assert( - # namespaces["http://www.w3.org/1999/02/22-rdf-syntax-ns#"]=='rdf') - bindings = list(self.__bindings()) - bindings.sort() - - for prefix, namespace in bindings: - if prefix: - write(' xmlns:%s="%s"\n' % (prefix, namespace)) - else: - write(' xmlns="%s"\n' % namespace) - write('>\n') - - # write out triples by subject - for subject in self.store.subjects(): - self.subject(subject, 1) - - # endRDF - write("</rdf:RDF>\n") - - # Set to None so that the memory can get garbage collected. - # self.__serialized = None - del self.__serialized - - def subject(self, subject, depth=1): - if not subject in self.__serialized: - self.__serialized[subject] = 1 - - if isinstance(subject, (BNode, URIRef)): - write = self.write - indent = " " * depth - element_name = "rdf:Description" - - if isinstance(subject, BNode): - write('%s<%s rdf:nodeID="%s"' % ( - indent, element_name, subject)) - else: - uri = quoteattr(self.relativize(subject)) - write("%s<%s rdf:about=%s" % (indent, element_name, uri)) - - if (subject, None, None) in self.store: - write(">\n") - - for predicate, object in self.store.predicate_objects( - subject): - self.predicate(predicate, object, depth + 1) - write("%s</%s>\n" % (indent, element_name)) - - else: - write("/>\n") - - def predicate(self, predicate, object, depth=1): - write = self.write - indent = " " * depth - qname = self.store.namespace_manager.qname(predicate) - - if isinstance(object, Literal): - attributes = "" - - if object.language: - attributes += ' xml:lang="%s"' % object.language - - if object.datatype: - attributes += ' rdf:datatype="%s"' % object.datatype - - write("%s<%s%s>%s</%s>\n" % - (indent, qname, attributes, - escape(object, ESCAPE_ENTITIES), qname)) - else: - - if isinstance(object, BNode): - write('%s<%s rdf:nodeID="%s"/>\n' % - (indent, qname, object)) - else: - write("%s<%s rdf:resource=%s/>\n" % - (indent, qname, quoteattr(self.relativize(object)))) - -XMLLANG = "http://www.w3.org/XML/1998/namespacelang" -XMLBASE = "http://www.w3.org/XML/1998/namespacebase" -OWL_NS = Namespace('http://www.w3.org/2002/07/owl#') - - -# TODO: -def fix(val): - "strip off _: from nodeIDs... as they are not valid NCNames" - if val.startswith("_:"): - return val[2:] - else: - return val - - -class PrettyXMLSerializer(Serializer): - - def __init__(self, store, max_depth=3): - super(PrettyXMLSerializer, self).__init__(store) - self.forceRDFAbout = set() - - def serialize(self, stream, base=None, encoding=None, **args): - self.__serialized = {} - store = self.store - self.base = base - self.max_depth = args.get("max_depth", 3) - assert self.max_depth > 0, "max_depth must be greater than 0" - - self.nm = nm = store.namespace_manager - self.writer = writer = XMLWriter(stream, nm, encoding) - namespaces = {} - - possible = set(store.predicates()).union( - store.objects(None, RDF.type)) - - for predicate in possible: - prefix, namespace, local = nm.compute_qname(predicate) - namespaces[prefix] = namespace - - namespaces["rdf"] = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" - - writer.push(RDF.RDF) - - if "xml_base" in args: - writer.attribute(XMLBASE, args["xml_base"]) - - writer.namespaces(iter(namespaces.items())) - - # Write out subjects that can not be inline - for subject in store.subjects(): - if (None, None, subject) in store: - if (subject, None, subject) in store: - self.subject(subject, 1) - else: - self.subject(subject, 1) - - # write out anything that has not yet been reached - # write out BNodes last (to ensure they can be inlined where possible) - bnodes = set() - - for subject in store.subjects(): - if isinstance(subject, BNode): - bnodes.add(subject) - continue - self.subject(subject, 1) - - # now serialize only those BNodes that have not been serialized yet - for bnode in bnodes: - if bnode not in self.__serialized: - self.subject(subject, 1) - - writer.pop(RDF.RDF) - stream.write(b("\n")) - - # Set to None so that the memory can get garbage collected. - self.__serialized = None - - def subject(self, subject, depth=1): - store = self.store - writer = self.writer - - if subject in self.forceRDFAbout: - writer.push(RDF.Description) - writer.attribute(RDF.about, self.relativize(subject)) - writer.pop(RDF.Description) - self.forceRDFAbout.remove(subject) - - elif not subject in self.__serialized: - self.__serialized[subject] = 1 - type = first(store.objects(subject, RDF.type)) - - try: - self.nm.qname(type) - except: - type = None - - element = type or RDF.Description - writer.push(element) - - if isinstance(subject, BNode): - def subj_as_obj_more_than(ceil): - return True - # more_than(store.triples((None, None, subject)), ceil) - - # here we only include BNode labels if they are referenced - # more than once (this reduces the use of redundant BNode - # identifiers) - if subj_as_obj_more_than(1): - writer.attribute(RDF.nodeID, fix(subject)) - - else: - writer.attribute(RDF.about, self.relativize(subject)) - - if (subject, None, None) in store: - for predicate, object in store.predicate_objects(subject): - if not (predicate == RDF.type and object == type): - self.predicate(predicate, object, depth + 1) - - writer.pop(element) - - elif subject in self.forceRDFAbout: - writer.push(RDF.Description) - writer.attribute(RDF.about, self.relativize(subject)) - writer.pop(RDF.Description) - self.forceRDFAbout.remove(subject) - - def predicate(self, predicate, object, depth=1): - writer = self.writer - store = self.store - writer.push(predicate) - - if isinstance(object, Literal): - if object.language: - writer.attribute(XMLLANG, object.language) - - if (object.datatype == RDF.XMLLiteral and - isinstance(object.value, xml.dom.minidom.Document)): - writer.attribute(RDF.parseType, "Literal") - writer.text("") - writer.stream.write(object) - else: - if object.datatype: - writer.attribute(RDF.datatype, object.datatype) - writer.text(object) - - elif object in self.__serialized or not (object, None, None) in store: - - if isinstance(object, BNode): - if more_than(store.triples((None, None, object)), 0): - writer.attribute(RDF.nodeID, fix(object)) - else: - writer.attribute(RDF.resource, self.relativize(object)) - - else: - if first(store.objects(object, RDF.first)): # may not have type - # RDF.List - - self.__serialized[object] = 1 - - # Warn that any assertions on object other than - # RDF.first and RDF.rest are ignored... including RDF.List - import warnings - warnings.warn( - "Assertions on %s other than RDF.first " % repr(object) + - "and RDF.rest are ignored ... including RDF.List", - UserWarning, stacklevel=2) - writer.attribute(RDF.parseType, "Collection") - - col = Collection(store, object) - - for item in col: - - if isinstance(item, URIRef): - self.forceRDFAbout.add(item) - self.subject(item) - - if not isinstance(item, URIRef): - self.__serialized[item] = 1 - else: - if first(store.triples_choices( - (object, RDF.type, [OWL_NS.Class, RDFS.Class]))) \ - and isinstance(object, URIRef): - writer.attribute(RDF.resource, self.relativize(object)) - - elif depth <= self.max_depth: - self.subject(object, depth + 1) - - elif isinstance(object, BNode): - - if not object in self.__serialized \ - and (object, None, None) in store \ - and len(list(store.subjects(object=object))) == 1: - # inline blank nodes if they haven't been serialized yet - # and are only referenced once (regardless of depth) - self.subject(object, depth + 1) - else: - writer.attribute(RDF.nodeID, fix(object)) - - else: - writer.attribute(RDF.resource, self.relativize(object)) - - writer.pop(predicate)