Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/rdflib/plugins/stores/auditable.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/rdflib/plugins/stores/auditable.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,139 @@ +""" + +This wrapper intercepts calls through the store interface and implements +thread-safe logging of destructive operations (adds / removes) in reverse. +This is persisted on the store instance and the reverse operations are +executed In order to return the store to the state it was when the transaction +began Since the reverse operations are persisted on the store, the store +itself acts as a transaction. + +Calls to commit or rollback, flush the list of reverse operations This +provides thread-safe atomicity and isolation (assuming concurrent operations +occur with different store instances), but no durability (transactions are +persisted in memory and wont be available to reverse operations after the +system fails): A and I out of ACID. + +""" + +from rdflib.store import Store +from rdflib import Graph, ConjunctiveGraph +import threading + +destructiveOpLocks = { + 'add': None, + 'remove': None, +} + + +class AuditableStore(Store): + def __init__(self, store): + self.store = store + self.context_aware = store.context_aware + # NOTE: this store can't be formula_aware as it doesn't have enough + # info to reverse the removal of a quoted statement + self.formula_aware = False # store.formula_aware + self.transaction_aware = True # This is only half true + self.reverseOps = [] + self.rollbackLock = threading.RLock() + + def open(self, configuration, create=True): + return self.store.open(configuration, create) + + def close(self, commit_pending_transaction=False): + self.store.close() + + def destroy(self, configuration): + self.store.destroy(configuration) + + def query(self, *args, **kw): + return self.store.query(*args, **kw) + + def add(self, triple, context, quoted=False): + (s, p, o) = triple + lock = destructiveOpLocks['add'] + lock = lock if lock else threading.RLock() + with lock: + context = context.__class__(self.store, context.identifier) if context is not None else None + ctxId = context.identifier if context is not None else None + if list(self.store.triples(triple, context)): + return # triple already in store, do nothing + self.reverseOps.append((s, p, o, ctxId, 'remove')) + try: + self.reverseOps.remove((s, p, o, ctxId, 'add')) + except ValueError: + pass + self.store.add((s, p, o), context, quoted) + + def remove(self, xxx_todo_changeme, context=None): + (subject, predicate, object_) = xxx_todo_changeme + lock = destructiveOpLocks['remove'] + lock = lock if lock else threading.RLock() + with lock: + # Need to determine which quads will be removed if any term is a + # wildcard + context = context.__class__(self.store, context.identifier) if context is not None else None + ctxId = context.identifier if context is not None else None + if None in [subject, predicate, object_, context]: + if ctxId: + for s, p, o in context.triples((subject, predicate, object_)): + try: + self.reverseOps.remove((s, p, o, ctxId, 'remove')) + except ValueError: + self.reverseOps.append((s, p, o, ctxId, 'add')) + else: + for s, p, o, ctx in ConjunctiveGraph(self.store).quads((subject, predicate, object_)): + try: + self.reverseOps.remove((s, p, o, ctx.identifier, 'remove')) + except ValueError: + self.reverseOps.append((s, p, o, ctx.identifier, 'add')) + else: + if not list(self.triples((subject, predicate, object_), context)): + return # triple not present in store, do nothing + try: + self.reverseOps.remove((subject, predicate, object_, ctxId, 'remove')) + except ValueError: + self.reverseOps.append((subject, predicate, object_, ctxId, 'add')) + self.store.remove((subject, predicate, object_), context) + + def triples(self, triple, context=None): + (su, pr, ob) = triple + context = context.__class__(self.store, context.identifier) if context is not None else None + for (s, p, o), cg in self.store.triples((su, pr, ob), context): + yield (s, p, o), cg + + def __len__(self, context=None): + context = context.__class__(self.store, context.identifier) if context is not None else None + return self.store.__len__(context) + + def contexts(self, triple=None): + for ctx in self.store.contexts(triple): + yield ctx + + def bind(self, prefix, namespace): + self.store.bind(prefix, namespace) + + def prefix(self, namespace): + return self.store.prefix(namespace) + + def namespace(self, prefix): + return self.store.namespace(prefix) + + def namespaces(self): + return self.store.namespaces() + + def commit(self): + self.reverseOps = [] + + def rollback(self): + # Aquire Rollback lock and apply reverse operations in the forward + # order + with self.rollbackLock: + for subject, predicate, obj, context, op in self.reverseOps: + if op == 'add': + self.store.add( + (subject, predicate, obj), Graph(self.store, context)) + else: + self.store.remove( + (subject, predicate, obj), Graph(self.store, context)) + + self.reverseOps = []