view env/lib/python3.7/site-packages/prov/serializers/provrdf.py @ 4:79f47841a781 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:47:39 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

"""PROV-RDF serializers for ProvDocument
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import base64
from collections import OrderedDict
import datetime
import io

import dateutil.parser
import six

from rdflib.term import URIRef, BNode
from rdflib.term import Literal as RDFLiteral
from rdflib.graph import ConjunctiveGraph
from rdflib.namespace import RDF, RDFS, XSD

import prov.model as pm
from prov.constants import (
    PROV, PROV_ID_ATTRIBUTES_MAP, PROV_N_MAP, PROV_BASE_CLS, XSD_QNAME,
    PROV_END, PROV_START, PROV_USAGE, PROV_GENERATION, PROV_DERIVATION, PROV_INVALIDATION,
    PROV_ALTERNATE, PROV_MENTION, PROV_DELEGATION, PROV_ACTIVITY, PROV_ATTR_STARTTIME,
    PROV_ATTR_ENDTIME, PROV_LOCATION, PROV_ATTR_TIME, PROV_ROLE, PROV_COMMUNICATION,
    PROV_ATTR_INFORMANT, PROV_ATTR_RESPONSIBLE, PROV_ATTR_TRIGGER, PROV_ATTR_ENDER,
    PROV_ATTR_STARTER, PROV_ATTR_USED_ENTITY)
from prov.serializers import Serializer, Error


__author__ = 'Satrajit S. Ghosh'
__email__ = 'satra@mit.edu'


class ProvRDFException(Error):
    pass


class AnonymousIDGenerator:
    def __init__(self):
        self._cache = {}
        self._count = 0

    def get_anon_id(self, obj, local_prefix="id"):
        if obj not in self._cache:
            self._count += 1
            self._cache[obj] = pm.Identifier(
                '_:%s%d' % (local_prefix, self._count)
            ).uri
        return self._cache[obj]


# Reverse map for prov.model.XSD_DATATYPE_PARSERS
LITERAL_XSDTYPE_MAP = {
    float: XSD['double'],
    int: XSD['int'],
    six.text_type: XSD['string'],
    # boolean, string values are supported natively by PROV-RDF
    # datetime values are converted separately
}

# Add long on Python 2
if six.integer_types[-1] not in LITERAL_XSDTYPE_MAP:
    LITERAL_XSDTYPE_MAP[six.integer_types[-1]] = XSD['long']


def attr2rdf(attr):
    return URIRef(PROV[PROV_ID_ATTRIBUTES_MAP[attr].split('prov:')[1]].uri)


def valid_qualified_name(bundle, value, xsd_qname=False):
    if value is None:
        return None
    qualified_name = bundle.valid_qualified_name(value)
    return qualified_name if not xsd_qname else XSD_QNAME(qualified_name)


class ProvRDFSerializer(Serializer):
    """
    PROV-O serializer for :class:`~prov.model.ProvDocument`
    """

    def serialize(self, stream=None, rdf_format='trig', **kwargs):
        """
        Serializes a :class:`~prov.model.ProvDocument` instance to
        `PROV-O <https://www.w3.org/TR/prov-o/>`_.

        :param stream: Where to save the output.
        :param rdf_format: The RDF format of the output, default to TRiG.
        """
        container = self.encode_document(self.document)
        newargs = kwargs.copy()
        newargs['format'] = rdf_format

        if six.PY2:
            buf = io.BytesIO()
            try:
                container.serialize(buf, **newargs)
                buf.seek(0, 0)
                # Right now this is a bytestream. If the object to stream to is
                # a text object is must be decoded. We assume utf-8 here which
                # should be fine for almost every case.
                if isinstance(stream, io.TextIOBase):
                    stream.write(buf.read().decode('utf-8'))
                else:
                    stream.write(buf.read())
            finally:
                buf.close()
        else:
            buf = io.BytesIO()
            try:
                container.serialize(buf, **newargs)
                buf.seek(0, 0)
                # Right now this is a bytestream. If the object to stream to is
                # a text object is must be decoded. We assume utf-8 here which
                # should be fine for almost every case.
                if isinstance(stream, io.TextIOBase):
                    stream.write(buf.read().decode('utf-8'))
                else:
                    stream.write(buf.read())
            finally:
                buf.close()

    def deserialize(self, stream, rdf_format='trig', **kwargs):
        """
        Deserialize from the `PROV-O <https://www.w3.org/TR/prov-o/>`_
        representation to a :class:`~prov.model.ProvDocument` instance.

        :param stream: Input data.
        :param rdf_format: The RDF format of the input data, default: TRiG.
        """
        newargs = kwargs.copy()
        newargs['format'] = rdf_format
        container = ConjunctiveGraph()
        container.parse(stream, **newargs)
        document = pm.ProvDocument()
        self.document = document
        self.decode_document(container, document)
        return document

    def valid_identifier(self, value):
        return self.document.valid_qualified_name(value)

    def encode_rdf_representation(self, value):
        if isinstance(value, URIRef):
            return value
        elif isinstance(value, pm.Literal):
            return literal_rdf_representation(value)
        elif isinstance(value, datetime.datetime):
            return RDFLiteral(value.isoformat(), datatype=XSD['dateTime'])
        elif isinstance(value, pm.QualifiedName):
            return URIRef(value.uri)
        elif isinstance(value, pm.Identifier):
            return RDFLiteral(value.uri, datatype=XSD['anyURI'])
        elif type(value) in LITERAL_XSDTYPE_MAP:
            return RDFLiteral(value, datatype=LITERAL_XSDTYPE_MAP[type(value)])
        else:
            return RDFLiteral(value)

    def decode_rdf_representation(self, literal, graph):
        if isinstance(literal, RDFLiteral):
            value = literal.value if literal.value is not None else literal
            datatype = literal.datatype if hasattr(literal, 'datatype') else None
            langtag = literal.language if hasattr(literal, 'language') else None
            if datatype and 'XMLLiteral' in datatype:
                value = literal
            if datatype and 'base64Binary' in datatype:
                value = base64.standard_b64encode(value)
            if datatype == XSD['QName']:
                return pm.Literal(literal, datatype=XSD_QNAME)
            if datatype == XSD['dateTime']:
                return dateutil.parser.parse(literal)
            if datatype == XSD['gYear']:
                return pm.Literal(dateutil.parser.parse(literal).year,
                                  datatype=self.valid_identifier(datatype))
            if datatype == XSD['gYearMonth']:
                parsed_info = dateutil.parser.parse(literal)
                return pm.Literal('{0}-{1:02d}'.format(parsed_info.year, parsed_info.month),
                                  datatype=self.valid_identifier(datatype))
            else:
                # The literal of standard Python types is not converted here
                # It will be automatically converted when added to a record by
                # _auto_literal_conversion()
                return pm.Literal(value, self.valid_identifier(datatype), langtag)
        elif isinstance(literal, URIRef):
            rval = self.valid_identifier(literal)
            if rval is None:
                prefix, iri, _ = graph.namespace_manager.compute_qname(literal)
                ns = self.document.add_namespace(prefix, iri)
                rval = pm.QualifiedName(ns, literal.replace(ns.uri, ''))
            return rval
        else:
            # simple type, just return it
            return literal

    def encode_document(self, document):
        container = self.encode_container(document)
        for item in document.bundles:
            #  encoding the sub-bundle
            bundle = self.encode_container(item, identifier=item.identifier.uri)
            container.addN(bundle.quads())
        return container

    def encode_container(self, bundle, container=None, identifier=None):
        if container is None:
            container = ConjunctiveGraph(identifier=identifier)
            nm = container.namespace_manager
            nm.bind('prov', PROV.uri)

        for namespace in bundle.namespaces:
            container.bind(namespace.prefix, namespace.uri)

        id_generator = AnonymousIDGenerator()
        real_or_anon_id = lambda record: record._identifier.uri if \
            record._identifier else id_generator.get_anon_id(record)

        for record in bundle._records:
            rec_type = record.get_type()
            if hasattr(record, 'identifier') and record.identifier:
                identifier = URIRef(six.text_type(real_or_anon_id(record)))
                container.add((identifier, RDF.type, URIRef(rec_type.uri)))
            else:
                identifier = None
            if record.attributes:
                bnode = None
                formal_objects = []
                used_objects = []
                all_attributes = list(record.formal_attributes) + list(record.attributes)
                formal_qualifiers = False
                for attrid, (attr, value) in enumerate(list(record.formal_attributes)):
                    if (identifier is not None and value is not None) or \
                            (identifier is None and value is not None and attrid > 1):
                        formal_qualifiers = True
                has_qualifiers = len(record.extra_attributes) > 0 or formal_qualifiers
                for idx, (attr, value) in enumerate(all_attributes):
                    if record.is_relation():
                        pred = URIRef(PROV[PROV_N_MAP[rec_type]].uri)
                        # create bnode relation
                        if bnode is None:
                            valid_formal_indices = set()
                            for idx, (key, val) in enumerate(record.formal_attributes):
                                formal_objects.append(key)
                                if val:
                                    valid_formal_indices.add(idx)
                            used_objects = [record.formal_attributes[0][0]]
                            subj = None
                            if record.formal_attributes[0][1]:
                                subj = URIRef(record.formal_attributes[0][1].uri)
                            if identifier is None and subj is not None:
                                try:
                                    obj_val = record.formal_attributes[1][1]
                                    obj_attr = URIRef(record.formal_attributes[1][0].uri)
                                    # TODO: Why is obj_attr above not used anywhere?
                                except IndexError:
                                    obj_val = None
                                if obj_val and (rec_type not in {PROV_END,
                                                                PROV_START,
                                                                PROV_USAGE,
                                                                PROV_GENERATION,
                                                                PROV_DERIVATION,
                                                                PROV_INVALIDATION} or
                                                (valid_formal_indices == {0, 1} and
                                                 len(record.extra_attributes) == 0)):
                                    used_objects.append(record.formal_attributes[1][0])
                                    obj_val = self.encode_rdf_representation(obj_val)
                                    if rec_type == PROV_ALTERNATE:
                                        subj, obj_val = obj_val, subj
                                    container.add((subj, pred, obj_val))
                                    if rec_type == PROV_MENTION:
                                        if record.formal_attributes[2][1]:
                                            used_objects.append(record.formal_attributes[2][0])
                                            obj_val = self.encode_rdf_representation(record.formal_attributes[2][1])
                                            container.add((subj, URIRef(PROV['asInBundle'].uri), obj_val))
                                        has_qualifiers = False
                            if rec_type in [PROV_ALTERNATE]:
                                continue
                            if subj and (has_qualifiers or identifier):
                                qualifier = rec_type._localpart
                                rec_uri = rec_type.uri
                                for attr_name, val in record.extra_attributes:
                                    if attr_name == PROV['type']:
                                        if PROV['Revision'] == val or \
                                              PROV['Quotation'] == val or \
                                                PROV['PrimarySource'] == val:
                                            qualifier = val._localpart
                                            rec_uri = val.uri
                                            if identifier is not None:
                                                container.remove((identifier,
                                                                  RDF.type,
                                                                  URIRef(rec_type.uri)))
                                QRole = URIRef(PROV['qualified' + qualifier].uri)
                                if identifier is not None:
                                    container.add((subj, QRole, identifier))
                                else:
                                    bnode = identifier = BNode()
                                    container.add((subj, QRole, identifier))
                                    container.add(
                                        (identifier, RDF.type, URIRef(rec_uri))
                                    )  # reset identifier to BNode
                        if value is not None and attr not in used_objects:
                            if attr in formal_objects:
                                pred = attr2rdf(attr)
                            elif attr == PROV['role']:
                                pred = URIRef(PROV['hadRole'].uri)
                            elif attr == PROV['plan']:
                                pred = URIRef(PROV['hadPlan'].uri)
                            elif attr == PROV['type']:
                                pred = RDF.type
                            elif attr == PROV['label']:
                                pred = RDFS.label
                            elif isinstance(attr, pm.QualifiedName):
                                pred = URIRef(attr.uri)
                            else:
                                pred = self.encode_rdf_representation(attr)
                            if PROV['plan'].uri in pred:
                                pred = URIRef(PROV['hadPlan'].uri)
                            if PROV['informant'].uri in pred:
                                pred = URIRef(PROV['activity'].uri)
                            if PROV['responsible'].uri in pred:
                                pred = URIRef(PROV['agent'].uri)
                            if rec_type == PROV_DELEGATION and PROV['activity'].uri in pred:
                                pred = URIRef(PROV['hadActivity'].uri)
                            if (rec_type in [PROV_END, PROV_START] and PROV['trigger'].uri in pred) or\
                                (rec_type in [PROV_USAGE] and PROV['used'].uri in pred):
                                pred = URIRef(PROV['entity'].uri)
                            if rec_type in [PROV_GENERATION, PROV_END,
                                            PROV_START, PROV_USAGE,
                                            PROV_INVALIDATION]:
                                if PROV['time'].uri in pred:
                                    pred = URIRef(PROV['atTime'].uri)
                                if PROV['ender'].uri in pred:
                                    pred = URIRef(PROV['hadActivity'].uri)
                                if PROV['starter'].uri in pred:
                                    pred = URIRef(PROV['hadActivity'].uri)
                                if PROV['location'].uri in pred:
                                    pred = URIRef(PROV['atLocation'].uri)
                            if rec_type in [PROV_ACTIVITY]:
                                if PROV_ATTR_STARTTIME in pred:
                                    pred = URIRef(PROV['startedAtTime'].uri)
                                if PROV_ATTR_ENDTIME in pred:
                                    pred = URIRef(PROV['endedAtTime'].uri)
                            if rec_type == PROV_DERIVATION:
                                if PROV['activity'].uri in pred:
                                    pred = URIRef(PROV['hadActivity'].uri)
                                if PROV['generation'].uri in pred:
                                    pred = URIRef(PROV['hadGeneration'].uri)
                                if PROV['usage'].uri in pred:
                                    pred = URIRef(PROV['hadUsage'].uri)
                                if PROV['usedEntity'].uri in pred:
                                    pred = URIRef(PROV['entity'].uri)
                            container.add((identifier, pred,
                                           self.encode_rdf_representation(value)))
                        continue
                    if value is None:
                        continue
                    if isinstance(value, pm.ProvRecord):
                        obj = URIRef(six.text_type(real_or_anon_id(value)))
                    else:
                        #  Assuming this is a datetime value
                        obj = self.encode_rdf_representation(value)
                    if attr == PROV['location']:
                        pred = URIRef(PROV['atLocation'].uri)
                        if False and isinstance(value, (URIRef, pm.QualifiedName)):
                            if isinstance(value, pm.QualifiedName):
                                value = URIRef(value.uri)
                            container.add((identifier, pred, value))
                        else:
                            container.add((identifier, pred,
                                           self.encode_rdf_representation(obj)))
                        continue
                    if attr == PROV['type']:
                        pred = RDF.type
                    elif attr == PROV['label']:
                        pred = RDFS.label
                    elif attr == PROV_ATTR_STARTTIME:
                        pred = URIRef(PROV['startedAtTime'].uri)
                    elif attr == PROV_ATTR_ENDTIME:
                        pred = URIRef(PROV['endedAtTime'].uri)
                    else:
                        pred = self.encode_rdf_representation(attr)
                    container.add((identifier, pred, obj))
        return container

    def decode_document(self, content, document):
        for prefix, url in content.namespaces():
            document.add_namespace(prefix, six.text_type(url))
        if hasattr(content, 'contexts'):
            for graph in content.contexts():
                if isinstance(graph.identifier, BNode):
                    self.decode_container(graph, document)
                else:
                    bundle_id = six.text_type(graph.identifier)
                    bundle = document.bundle(bundle_id)
                    self.decode_container(graph, bundle)
        else:
            self.decode_container(content, document)

    def decode_container(self, graph, bundle):
        ids = {}
        PROV_CLS_MAP = {}
        formal_attributes = {}
        unique_sets = {}
        for key, val in PROV_BASE_CLS.items():
            PROV_CLS_MAP[key.uri] = PROV_BASE_CLS[key]
        relation_mapper = {URIRef(PROV['alternateOf'].uri): 'alternate',
                           URIRef(PROV['actedOnBehalfOf'].uri): 'delegation',
                           URIRef(PROV['specializationOf'].uri): 'specialization',
                           URIRef(PROV['mentionOf'].uri): 'mention',
                           URIRef(PROV['wasAssociatedWith'].uri): 'association',
                           URIRef(PROV['wasDerivedFrom'].uri): 'derivation',
                           URIRef(PROV['wasAttributedTo'].uri): 'attribution',
                           URIRef(PROV['wasInformedBy'].uri): 'communication',
                           URIRef(PROV['wasGeneratedBy'].uri): 'generation',
                           URIRef(PROV['wasInfluencedBy'].uri): 'influence',
                           URIRef(PROV['wasInvalidatedBy'].uri): 'invalidation',
                           URIRef(PROV['wasEndedBy'].uri): 'end',
                           URIRef(PROV['wasStartedBy'].uri): 'start',
                           URIRef(PROV['hadMember'].uri): 'membership',
                           URIRef(PROV['used'].uri): 'usage',
                           }
        predicate_mapper = {RDFS.label: pm.PROV['label'],
                            URIRef(PROV['atLocation'].uri): PROV_LOCATION,
                            URIRef(PROV['startedAtTime'].uri): PROV_ATTR_STARTTIME,
                            URIRef(PROV['endedAtTime'].uri): PROV_ATTR_ENDTIME,
                            URIRef(PROV['atTime'].uri): PROV_ATTR_TIME,
                            URIRef(PROV['hadRole'].uri): PROV_ROLE,
                            URIRef(PROV['hadPlan'].uri): pm.PROV_ATTR_PLAN,
                            URIRef(PROV['hadUsage'].uri): pm.PROV_ATTR_USAGE,
                            URIRef(PROV['hadGeneration'].uri): pm.PROV_ATTR_GENERATION,
                            URIRef(PROV['hadActivity'].uri): pm.PROV_ATTR_ACTIVITY,
                            }
        other_attributes = {}
        for stmt in graph.triples((None, RDF.type, None)):
            id = six.text_type(stmt[0])
            obj = six.text_type(stmt[2])
            if obj in PROV_CLS_MAP:
                if not isinstance(stmt[0], BNode) and self.valid_identifier(id) is None:
                    prefix, iri, _ = graph.namespace_manager.compute_qname(id)
                    self.document.add_namespace(prefix, iri)
                try:
                    prov_obj = PROV_CLS_MAP[obj]
                except AttributeError:
                    prov_obj = None
                add_attr = True
                isderivation = pm.PROV['Revision'].uri in stmt[2] or \
                               pm.PROV['Quotation'].uri in stmt[2] or \
                               pm.PROV['PrimarySource'].uri in stmt[2]
                if id not in ids and prov_obj and (prov_obj.uri == obj or
                                                    isderivation or
                                                       isinstance(stmt[0], BNode)):
                    ids[id] = prov_obj
                    klass = pm.PROV_REC_CLS[prov_obj]
                    formal_attributes[id] = OrderedDict([(key, None) for key in klass.FORMAL_ATTRIBUTES])
                    unique_sets[id] = OrderedDict([(key, []) for key in klass.FORMAL_ATTRIBUTES])
                    add_attr = False or ((isinstance(stmt[0], BNode) or isderivation) and prov_obj.uri != obj)
                if add_attr:
                    if id not in other_attributes:
                        other_attributes[id] = []
                    obj_formatted = self.decode_rdf_representation(stmt[2], graph)
                    other_attributes[id].append((pm.PROV['type'], obj_formatted))
            else:
                if id not in other_attributes:
                    other_attributes[id] = []
                obj = self.decode_rdf_representation(stmt[2], graph)
                other_attributes[id].append((pm.PROV['type'], obj))
        for id, pred, obj in graph:
            id = six.text_type(id)
            if id not in other_attributes:
                other_attributes[id] = []
            if pred == RDF.type:
                continue
            if pred in relation_mapper:
                if 'alternateOf' in pred:
                    getattr(bundle, relation_mapper[pred])(obj, id)
                elif 'mentionOf' in pred:
                    mentionBundle = None
                    for stmt in graph.triples((URIRef(id), URIRef(pm.PROV['asInBundle'].uri), None)):
                        mentionBundle = stmt[2]
                    getattr(bundle, relation_mapper[pred])(id, six.text_type(obj), mentionBundle)
                elif 'actedOnBehalfOf' in pred or 'wasAssociatedWith' in pred:
                    qualifier = 'qualified' + relation_mapper[pred].upper()[0] + relation_mapper[pred][1:]
                    qualifier_bnode = None
                    for stmt in graph.triples((URIRef(id), URIRef(pm.PROV[qualifier].uri), None)):
                        qualifier_bnode = stmt[2]
                    if qualifier_bnode is None:
                        getattr(bundle, relation_mapper[pred])(id, six.text_type(obj))
                    else:
                        fakeys = list(formal_attributes[six.text_type(qualifier_bnode)].keys())
                        formal_attributes[six.text_type(qualifier_bnode)][fakeys[0]] = id
                        formal_attributes[six.text_type(qualifier_bnode)][fakeys[1]] = six.text_type(obj)
                else:
                    getattr(bundle, relation_mapper[pred])(id, six.text_type(obj))
            elif id in ids:
                obj1 = self.decode_rdf_representation(obj, graph)
                if obj is not None and obj1 is None:
                    raise ValueError(('Error transforming', obj))
                pred_new = pred
                if pred in predicate_mapper:
                    pred_new = predicate_mapper[pred]
                if ids[id] == PROV_COMMUNICATION and 'activity' in six.text_type(pred_new):
                    pred_new = PROV_ATTR_INFORMANT
                if ids[id] == PROV_DELEGATION and 'agent' in six.text_type(pred_new):
                    pred_new = PROV_ATTR_RESPONSIBLE
                if ids[id] in [PROV_END, PROV_START] and 'entity' in six.text_type(pred_new):
                    pred_new = PROV_ATTR_TRIGGER
                if ids[id] in [PROV_END] and 'activity' in six.text_type(pred_new):
                    pred_new = PROV_ATTR_ENDER
                if ids[id] in [PROV_START] and 'activity' in six.text_type(pred_new):
                    pred_new = PROV_ATTR_STARTER
                if ids[id] == PROV_DERIVATION and 'entity' in six.text_type(pred_new):
                    pred_new = PROV_ATTR_USED_ENTITY
                if six.text_type(pred_new) in [val.uri for val in formal_attributes[id]]:
                    qname_key = self.valid_identifier(pred_new)
                    formal_attributes[id][qname_key] = obj1
                    unique_sets[id][qname_key].append(obj1)
                    if len(unique_sets[id][qname_key]) > 1:
                        formal_attributes[id][qname_key] = None
                else:
                    if 'qualified' not in six.text_type(pred_new) and \
                                    'asInBundle' not in six.text_type(pred_new):
                        other_attributes[id].append((six.text_type(pred_new), obj1))
            local_key = six.text_type(obj)
            if local_key in ids:
                if 'qualified' in pred:
                    formal_attributes[local_key][list(formal_attributes[local_key].keys())[0]] = id
        for id in ids:
            attrs = None
            if id in other_attributes:
                attrs = other_attributes[id]
            items_to_walk = []
            for qname, values in unique_sets[id].items():
                if values and len(values) > 1:
                    items_to_walk.append((qname, values))
            if items_to_walk:
                for subset in list(walk(items_to_walk)):
                    for key, value in subset.items():
                        formal_attributes[id][key] = value
                    bundle.new_record(ids[id], id, formal_attributes[id], attrs)
            else:
                bundle.new_record(ids[id], id, formal_attributes[id], attrs)
            ids[id] = None
            if attrs is not None:
                other_attributes[id] = []
        for key, val in other_attributes.items():
            if val:
                ids[key].add_attributes(val)


def walk(children, level=0, path=None, usename=True):
    """Generate all the full paths in a tree, as a dict.

    :Example:

    >>> from prov.serializers.provrdf import walk
    >>> iterables = [('a', lambda: [1, 2]), ('b', lambda: [3, 4])]
    >>> [val['a'] for val in walk(iterables)]
    [1, 1, 2, 2]
    >>> [val['b'] for val in walk(iterables)]
    [3, 4, 3, 4]
    """
    # Entry point
    if level == 0:
        path = {}
    # Exit condition
    if not children:
        yield path.copy()
        return
    # Tree recursion
    head, tail = children[0], children[1:]
    name, func = head
    for child in func:
        # We can use the arg name or the tree level as a key
        if usename:
            path[name] = child
        else:
            path[level] = child
        # Recurse into the next level
        for child_paths in walk(tail, level + 1, path, usename):
            yield child_paths


def literal_rdf_representation(literal):
    value = six.text_type(literal.value) if literal.value else literal
    if literal.langtag:
        #  a language tag can only go with prov:InternationalizedString
        return RDFLiteral(value, lang=str(literal.langtag))
    else:
        datatype = literal.datatype
        if 'base64Binary' in datatype.uri:
            if six.PY2:
                value = base64.standard_b64encode(value)
            else:
                value = literal.value.encode()
        return RDFLiteral(value, datatype=datatype.uri)