Mercurial > repos > guerler > springsuite
view planemo/lib/python3.7/site-packages/prov/serializers/provjson.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line source
from __future__ import (absolute_import, division, print_function, unicode_literals) from collections import defaultdict import datetime import io import json from prov.serializers import Serializer, Error from prov.constants import * from prov.model import (Literal, Identifier, QualifiedName, Namespace, ProvDocument, ProvBundle, first, parse_xsd_datetime) import logging logger = logging.getLogger(__name__) __author__ = 'Trung Dong Huynh' __email__ = 'trungdong@donggiang.com' class ProvJSONException(Error): pass class AnonymousIDGenerator: def __init__(self): self._cache = {} self._count = 0 def get_anon_id(self, obj, local_prefix='id'): if obj not in self._cache: self._count += 1 self._cache[obj] = Identifier( '_:%s%d' % (local_prefix, self._count) ) return self._cache[obj] # Reverse map for prov.model.XSD_DATATYPE_PARSERS LITERAL_XSDTYPE_MAP = { float: 'xsd:double', int: 'xsd:int' # boolean, string values are supported natively by PROV-JSON # datetime values are converted separately } # Add long on Python 2 if six.integer_types[-1] not in LITERAL_XSDTYPE_MAP: LITERAL_XSDTYPE_MAP[six.integer_types[-1]] = 'xsd:long' class ProvJSONSerializer(Serializer): """ PROV-JSON serializer for :class:`~prov.model.ProvDocument` """ def serialize(self, stream, **kwargs): """ Serializes a :class:`~prov.model.ProvDocument` instance to `PROV-JSON <https://provenance.ecs.soton.ac.uk/prov-json/>`_. :param stream: Where to save the output. """ if six.PY2: buf = io.BytesIO() try: json.dump(self.document, buf, cls=ProvJSONEncoder, **kwargs) buf.seek(0, 0) # Right now this is a bytestream. If the object to stream to is # a text object is must be decoded. We assume utf-8 here which # should be fine for almost every case. if isinstance(stream, io.TextIOBase): stream.write(buf.read().decode('utf-8')) else: stream.write(buf.read()) finally: buf.close() else: buf = io.StringIO() try: json.dump(self.document, buf, cls=ProvJSONEncoder, **kwargs) buf.seek(0, 0) # Right now this is a bytestream. If the object to stream to is # a text object is must be decoded. We assume utf-8 here which # should be fine for almost every case. if isinstance(stream, io.TextIOBase): stream.write(buf.read()) else: stream.write(buf.read().encode('utf-8')) finally: buf.close() def deserialize(self, stream, **kwargs): """ Deserialize from the `PROV JSON <https://provenance.ecs.soton.ac.uk/prov-json/>`_ representation to a :class:`~prov.model.ProvDocument` instance. :param stream: Input data. """ if not isinstance(stream, io.TextIOBase): buf = io.StringIO(stream.read().decode('utf-8')) stream = buf return json.load(stream, cls=ProvJSONDecoder, **kwargs) class ProvJSONEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, ProvDocument): return encode_json_document(o) else: return super(ProvJSONEncoder, self).encode(o) class ProvJSONDecoder(json.JSONDecoder): def decode(self, s, *args, **kwargs): container = super(ProvJSONDecoder, self).decode(s, *args, **kwargs) document = ProvDocument() decode_json_document(container, document) return document # Encoding/decoding functions def valid_qualified_name(bundle, value): if value is None: return None qualified_name = bundle.valid_qualified_name(value) return qualified_name def encode_json_document(document): container = encode_json_container(document) for bundle in document.bundles: # encoding the sub-bundle bundle_json = encode_json_container(bundle) container['bundle'][six.text_type(bundle.identifier)] = bundle_json return container def encode_json_container(bundle): container = defaultdict(dict) prefixes = {} for namespace in bundle._namespaces.get_registered_namespaces(): prefixes[namespace.prefix] = namespace.uri if bundle._namespaces._default: prefixes['default'] = bundle._namespaces._default.uri if prefixes: container['prefix'] = prefixes id_generator = AnonymousIDGenerator() def real_or_anon_id(r): return r._identifier if r._identifier else id_generator.get_anon_id(r) for record in bundle._records: rec_type = record.get_type() rec_label = PROV_N_MAP[rec_type] identifier = six.text_type(real_or_anon_id(record)) record_json = {} if record._attributes: for (attr, values) in record._attributes.items(): if not values: continue attr_name = six.text_type(attr) if attr in PROV_ATTRIBUTE_QNAMES: # TODO: QName export record_json[attr_name] = six.text_type(first(values)) elif attr in PROV_ATTRIBUTE_LITERALS: record_json[attr_name] = first(values).isoformat() else: if len(values) == 1: # single value record_json[attr_name] = encode_json_representation( first(values) ) else: # multiple values record_json[attr_name] = list( encode_json_representation(value) for value in values ) # Check if the container already has the id of the record if identifier not in container[rec_label]: # this is the first instance, just put in the new record container[rec_label][identifier] = record_json else: # the container already has some record(s) of the same identifier # check if this is the second instance current_content = container[rec_label][identifier] if hasattr(current_content, 'items'): # this is a dict, make it a singleton list container[rec_label][identifier] = [current_content] # now append the new record to the list container[rec_label][identifier].append(record_json) return container def decode_json_document(content, document): bundles = dict() if 'bundle' in content: bundles = content['bundle'] del content['bundle'] decode_json_container(content, document) for bundle_id, bundle_content in bundles.items(): bundle = ProvBundle(document=document) decode_json_container(bundle_content, bundle) document.add_bundle(bundle, bundle.valid_qualified_name(bundle_id)) def decode_json_container(jc, bundle): if 'prefix' in jc: prefixes = jc['prefix'] for prefix, uri in prefixes.items(): if prefix != 'default': bundle.add_namespace(Namespace(prefix, uri)) else: bundle.set_default_namespace(uri) del jc['prefix'] for rec_type_str in jc: rec_type = PROV_RECORD_IDS_MAP[rec_type_str] for rec_id, content in jc[rec_type_str].items(): if hasattr(content, 'items'): # it is a dict # There is only one element, create a singleton list elements = [content] else: # expect it to be a list of dictionaries elements = content for element in elements: attributes = dict() other_attributes = [] # this is for the multiple-entity membership hack to come membership_extra_members = None for attr_name, values in element.items(): attr = ( PROV_ATTRIBUTES_ID_MAP[attr_name] if attr_name in PROV_ATTRIBUTES_ID_MAP else valid_qualified_name(bundle, attr_name) ) if attr in PROV_ATTRIBUTES: if isinstance(values, list): # only one value is allowed if len(values) > 1: # unless it is the membership hack if rec_type == PROV_MEMBERSHIP and \ attr == PROV_ATTR_ENTITY: # This is a membership relation with # multiple entities # HACK: create multiple membership # relations, one for each entity # Store all the extra entities membership_extra_members = values[1:] # Create the first membership relation as # normal for the first entity value = values[0] else: error_msg = ( 'The prov package does not support PROV' ' attributes having multiple values.' ) logger.error(error_msg) raise ProvJSONException(error_msg) else: value = values[0] else: value = values value = ( valid_qualified_name(bundle, value) if attr in PROV_ATTRIBUTE_QNAMES else parse_xsd_datetime(value) ) attributes[attr] = value else: if isinstance(values, list): other_attributes.extend( ( attr, decode_json_representation(value, bundle) ) for value in values ) else: # single value other_attributes.append( ( attr, decode_json_representation(values, bundle) ) ) bundle.new_record( rec_type, rec_id, attributes, other_attributes ) # HACK: creating extra (unidentified) membership relations if membership_extra_members: collection = attributes[PROV_ATTR_COLLECTION] for member in membership_extra_members: bundle.membership( collection, valid_qualified_name(bundle, member) ) def encode_json_representation(value): if isinstance(value, Literal): return literal_json_representation(value) elif isinstance(value, datetime.datetime): return {'$': value.isoformat(), 'type': 'xsd:dateTime'} elif isinstance(value, QualifiedName): # TODO Manage prefix in the whole structure consistently # TODO QName export return {'$': str(value), 'type': PROV_QUALIFIEDNAME._str} elif isinstance(value, Identifier): return {'$': value.uri, 'type': 'xsd:anyURI'} elif type(value) in LITERAL_XSDTYPE_MAP: return {'$': value, 'type': LITERAL_XSDTYPE_MAP[type(value)]} else: return value def decode_json_representation(literal, bundle): if isinstance(literal, dict): # complex type value = literal['$'] datatype = literal['type'] if 'type' in literal else None datatype = valid_qualified_name(bundle, datatype) langtag = literal['lang'] if 'lang' in literal else None if datatype == XSD_ANYURI: return Identifier(value) elif datatype == PROV_QUALIFIEDNAME: return valid_qualified_name(bundle, value) else: # The literal of standard Python types is not converted here # It will be automatically converted when added to a record by # _auto_literal_conversion() return Literal(value, datatype, langtag) else: # simple type, just return it return literal def literal_json_representation(literal): # TODO: QName export value, datatype, langtag = literal.value, literal.datatype, literal.langtag if langtag: return {'$': value, 'lang': langtag} else: return {'$': value, 'type': six.text_type(datatype)}