Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/prov/serializers/provjson.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/prov/serializers/provjson.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,354 @@ +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +from collections import defaultdict +import datetime +import io +import json + +from prov.serializers import Serializer, Error +from prov.constants import * +from prov.model import (Literal, Identifier, QualifiedName, + Namespace, ProvDocument, ProvBundle, first, + parse_xsd_datetime) + +import logging +logger = logging.getLogger(__name__) + +__author__ = 'Trung Dong Huynh' +__email__ = 'trungdong@donggiang.com' + + +class ProvJSONException(Error): + pass + + +class AnonymousIDGenerator: + def __init__(self): + self._cache = {} + self._count = 0 + + def get_anon_id(self, obj, local_prefix='id'): + if obj not in self._cache: + self._count += 1 + self._cache[obj] = Identifier( + '_:%s%d' % (local_prefix, self._count) + ) + return self._cache[obj] + + +# Reverse map for prov.model.XSD_DATATYPE_PARSERS +LITERAL_XSDTYPE_MAP = { + float: 'xsd:double', + int: 'xsd:int' + # boolean, string values are supported natively by PROV-JSON + # datetime values are converted separately +} + +# Add long on Python 2 +if six.integer_types[-1] not in LITERAL_XSDTYPE_MAP: + LITERAL_XSDTYPE_MAP[six.integer_types[-1]] = 'xsd:long' + + +class ProvJSONSerializer(Serializer): + """ + PROV-JSON serializer for :class:`~prov.model.ProvDocument` + """ + def serialize(self, stream, **kwargs): + """ + Serializes a :class:`~prov.model.ProvDocument` instance to + `PROV-JSON <https://provenance.ecs.soton.ac.uk/prov-json/>`_. + + :param stream: Where to save the output. + """ + if six.PY2: + buf = io.BytesIO() + try: + json.dump(self.document, buf, cls=ProvJSONEncoder, + **kwargs) + buf.seek(0, 0) + # Right now this is a bytestream. If the object to stream to is + # a text object is must be decoded. We assume utf-8 here which + # should be fine for almost every case. + if isinstance(stream, io.TextIOBase): + stream.write(buf.read().decode('utf-8')) + else: + stream.write(buf.read()) + finally: + buf.close() + else: + buf = io.StringIO() + try: + json.dump(self.document, buf, cls=ProvJSONEncoder, + **kwargs) + buf.seek(0, 0) + # Right now this is a bytestream. If the object to stream to is + # a text object is must be decoded. We assume utf-8 here which + # should be fine for almost every case. + if isinstance(stream, io.TextIOBase): + stream.write(buf.read()) + else: + stream.write(buf.read().encode('utf-8')) + finally: + buf.close() + + def deserialize(self, stream, **kwargs): + """ + Deserialize from the `PROV JSON + <https://provenance.ecs.soton.ac.uk/prov-json/>`_ representation to a + :class:`~prov.model.ProvDocument` instance. + + :param stream: Input data. + """ + if not isinstance(stream, io.TextIOBase): + buf = io.StringIO(stream.read().decode('utf-8')) + stream = buf + return json.load(stream, cls=ProvJSONDecoder, **kwargs) + + +class ProvJSONEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, ProvDocument): + return encode_json_document(o) + else: + return super(ProvJSONEncoder, self).encode(o) + + +class ProvJSONDecoder(json.JSONDecoder): + def decode(self, s, *args, **kwargs): + container = super(ProvJSONDecoder, self).decode(s, *args, **kwargs) + document = ProvDocument() + decode_json_document(container, document) + return document + + +# Encoding/decoding functions +def valid_qualified_name(bundle, value): + if value is None: + return None + qualified_name = bundle.valid_qualified_name(value) + return qualified_name + + +def encode_json_document(document): + container = encode_json_container(document) + for bundle in document.bundles: + # encoding the sub-bundle + bundle_json = encode_json_container(bundle) + container['bundle'][six.text_type(bundle.identifier)] = bundle_json + return container + + +def encode_json_container(bundle): + container = defaultdict(dict) + prefixes = {} + for namespace in bundle._namespaces.get_registered_namespaces(): + prefixes[namespace.prefix] = namespace.uri + if bundle._namespaces._default: + prefixes['default'] = bundle._namespaces._default.uri + if prefixes: + container['prefix'] = prefixes + + id_generator = AnonymousIDGenerator() + + def real_or_anon_id(r): + return r._identifier if r._identifier else id_generator.get_anon_id(r) + + for record in bundle._records: + rec_type = record.get_type() + rec_label = PROV_N_MAP[rec_type] + identifier = six.text_type(real_or_anon_id(record)) + + record_json = {} + if record._attributes: + for (attr, values) in record._attributes.items(): + if not values: + continue + attr_name = six.text_type(attr) + if attr in PROV_ATTRIBUTE_QNAMES: + # TODO: QName export + record_json[attr_name] = six.text_type(first(values)) + elif attr in PROV_ATTRIBUTE_LITERALS: + record_json[attr_name] = first(values).isoformat() + else: + if len(values) == 1: + # single value + record_json[attr_name] = encode_json_representation( + first(values) + ) + else: + # multiple values + record_json[attr_name] = list( + encode_json_representation(value) + for value in values + ) + # Check if the container already has the id of the record + if identifier not in container[rec_label]: + # this is the first instance, just put in the new record + container[rec_label][identifier] = record_json + else: + # the container already has some record(s) of the same identifier + # check if this is the second instance + current_content = container[rec_label][identifier] + if hasattr(current_content, 'items'): + # this is a dict, make it a singleton list + container[rec_label][identifier] = [current_content] + # now append the new record to the list + container[rec_label][identifier].append(record_json) + + return container + + +def decode_json_document(content, document): + bundles = dict() + if 'bundle' in content: + bundles = content['bundle'] + del content['bundle'] + + decode_json_container(content, document) + + for bundle_id, bundle_content in bundles.items(): + bundle = ProvBundle(document=document) + decode_json_container(bundle_content, bundle) + document.add_bundle(bundle, bundle.valid_qualified_name(bundle_id)) + + +def decode_json_container(jc, bundle): + if 'prefix' in jc: + prefixes = jc['prefix'] + for prefix, uri in prefixes.items(): + if prefix != 'default': + bundle.add_namespace(Namespace(prefix, uri)) + else: + bundle.set_default_namespace(uri) + del jc['prefix'] + + for rec_type_str in jc: + rec_type = PROV_RECORD_IDS_MAP[rec_type_str] + for rec_id, content in jc[rec_type_str].items(): + if hasattr(content, 'items'): # it is a dict + # There is only one element, create a singleton list + elements = [content] + else: + # expect it to be a list of dictionaries + elements = content + + for element in elements: + attributes = dict() + other_attributes = [] + # this is for the multiple-entity membership hack to come + membership_extra_members = None + for attr_name, values in element.items(): + attr = ( + PROV_ATTRIBUTES_ID_MAP[attr_name] + if attr_name in PROV_ATTRIBUTES_ID_MAP + else valid_qualified_name(bundle, attr_name) + ) + if attr in PROV_ATTRIBUTES: + if isinstance(values, list): + # only one value is allowed + if len(values) > 1: + # unless it is the membership hack + if rec_type == PROV_MEMBERSHIP and \ + attr == PROV_ATTR_ENTITY: + # This is a membership relation with + # multiple entities + # HACK: create multiple membership + # relations, one for each entity + + # Store all the extra entities + membership_extra_members = values[1:] + # Create the first membership relation as + # normal for the first entity + value = values[0] + else: + error_msg = ( + 'The prov package does not support PROV' + ' attributes having multiple values.' + ) + logger.error(error_msg) + raise ProvJSONException(error_msg) + else: + value = values[0] + else: + value = values + value = ( + valid_qualified_name(bundle, value) + if attr in PROV_ATTRIBUTE_QNAMES + else parse_xsd_datetime(value) + ) + attributes[attr] = value + else: + if isinstance(values, list): + other_attributes.extend( + ( + attr, + decode_json_representation(value, bundle) + ) + for value in values + ) + else: + # single value + other_attributes.append( + ( + attr, + decode_json_representation(values, bundle) + ) + ) + bundle.new_record( + rec_type, rec_id, attributes, other_attributes + ) + # HACK: creating extra (unidentified) membership relations + if membership_extra_members: + collection = attributes[PROV_ATTR_COLLECTION] + for member in membership_extra_members: + bundle.membership( + collection, valid_qualified_name(bundle, member) + ) + + +def encode_json_representation(value): + if isinstance(value, Literal): + return literal_json_representation(value) + elif isinstance(value, datetime.datetime): + return {'$': value.isoformat(), 'type': 'xsd:dateTime'} + elif isinstance(value, QualifiedName): + # TODO Manage prefix in the whole structure consistently + # TODO QName export + return {'$': str(value), 'type': PROV_QUALIFIEDNAME._str} + elif isinstance(value, Identifier): + return {'$': value.uri, 'type': 'xsd:anyURI'} + elif type(value) in LITERAL_XSDTYPE_MAP: + return {'$': value, 'type': LITERAL_XSDTYPE_MAP[type(value)]} + else: + return value + + +def decode_json_representation(literal, bundle): + if isinstance(literal, dict): + # complex type + value = literal['$'] + datatype = literal['type'] if 'type' in literal else None + datatype = valid_qualified_name(bundle, datatype) + langtag = literal['lang'] if 'lang' in literal else None + if datatype == XSD_ANYURI: + return Identifier(value) + elif datatype == PROV_QUALIFIEDNAME: + return valid_qualified_name(bundle, value) + else: + # The literal of standard Python types is not converted here + # It will be automatically converted when added to a record by + # _auto_literal_conversion() + return Literal(value, datatype, langtag) + else: + # simple type, just return it + return literal + + +def literal_json_representation(literal): + # TODO: QName export + value, datatype, langtag = literal.value, literal.datatype, literal.langtag + if langtag: + return {'$': value, 'lang': langtag} + else: + return {'$': value, 'type': six.text_type(datatype)}