diff planemo/lib/python3.7/site-packages/rdflib/plugins/stores/auditable.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/rdflib/plugins/stores/auditable.py	Fri Jul 31 00:32:28 2020 -0400
@@ -0,0 +1,139 @@
+"""
+
+This wrapper intercepts calls through the store interface and implements
+thread-safe logging of destructive operations (adds / removes) in reverse.
+This is persisted on the store instance and the reverse operations are
+executed In order to return the store to the state it was when the transaction
+began Since the reverse operations are persisted on the store, the store
+itself acts as a transaction.
+
+Calls to commit or rollback, flush the list of reverse operations This
+provides thread-safe atomicity and isolation (assuming concurrent operations
+occur with different store instances), but no durability (transactions are
+persisted in memory and wont  be available to reverse operations after the
+system fails): A and I out of ACID.
+
+"""
+
+from rdflib.store import Store
+from rdflib import Graph, ConjunctiveGraph
+import threading
+
+destructiveOpLocks = {
+    'add': None,
+    'remove': None,
+}
+
+
+class AuditableStore(Store):
+    def __init__(self, store):
+        self.store = store
+        self.context_aware = store.context_aware
+        # NOTE: this store can't be formula_aware as it doesn't have enough
+        # info to reverse the removal of a quoted statement
+        self.formula_aware = False  # store.formula_aware
+        self.transaction_aware = True  # This is only half true
+        self.reverseOps = []
+        self.rollbackLock = threading.RLock()
+
+    def open(self, configuration, create=True):
+        return self.store.open(configuration, create)
+
+    def close(self, commit_pending_transaction=False):
+        self.store.close()
+
+    def destroy(self, configuration):
+        self.store.destroy(configuration)
+
+    def query(self, *args, **kw):
+        return self.store.query(*args, **kw)
+
+    def add(self, triple, context, quoted=False):
+        (s, p, o) = triple
+        lock = destructiveOpLocks['add']
+        lock = lock if lock else threading.RLock()
+        with lock:
+            context = context.__class__(self.store, context.identifier) if context is not None else None
+            ctxId = context.identifier if context is not None else None
+            if list(self.store.triples(triple, context)):
+                return # triple already in store, do nothing
+            self.reverseOps.append((s, p, o, ctxId, 'remove'))
+            try:
+                self.reverseOps.remove((s, p, o, ctxId, 'add'))
+            except ValueError:
+                pass
+            self.store.add((s, p, o), context, quoted)
+
+    def remove(self, xxx_todo_changeme, context=None):
+        (subject, predicate, object_) = xxx_todo_changeme
+        lock = destructiveOpLocks['remove']
+        lock = lock if lock else threading.RLock()
+        with lock:
+            # Need to determine which quads will be removed if any term is a
+            # wildcard
+            context = context.__class__(self.store, context.identifier) if context is not None else None
+            ctxId = context.identifier if context is not None else None
+            if None in [subject, predicate, object_, context]:
+                if ctxId:
+                    for s, p, o in context.triples((subject, predicate, object_)):
+                        try:
+                            self.reverseOps.remove((s, p, o, ctxId, 'remove'))
+                        except ValueError:
+                            self.reverseOps.append((s, p, o, ctxId, 'add'))
+                else:
+                    for s, p, o, ctx in ConjunctiveGraph(self.store).quads((subject, predicate, object_)):
+                        try:
+                            self.reverseOps.remove((s, p, o, ctx.identifier, 'remove'))
+                        except ValueError:
+                            self.reverseOps.append((s, p, o, ctx.identifier, 'add'))
+            else:
+                if not list(self.triples((subject, predicate, object_), context)):
+                    return # triple not present in store, do nothing
+                try:
+                    self.reverseOps.remove((subject, predicate, object_, ctxId, 'remove'))
+                except ValueError:
+                    self.reverseOps.append((subject, predicate, object_, ctxId, 'add'))
+            self.store.remove((subject, predicate, object_), context)
+
+    def triples(self, triple, context=None):
+        (su, pr, ob) = triple
+        context = context.__class__(self.store, context.identifier) if context is not None else None
+        for (s, p, o), cg in self.store.triples((su, pr, ob), context):
+            yield (s, p, o), cg
+
+    def __len__(self, context=None):
+        context = context.__class__(self.store, context.identifier) if context is not None else None
+        return self.store.__len__(context)
+
+    def contexts(self, triple=None):
+        for ctx in self.store.contexts(triple):
+            yield ctx
+
+    def bind(self, prefix, namespace):
+        self.store.bind(prefix, namespace)
+
+    def prefix(self, namespace):
+        return self.store.prefix(namespace)
+
+    def namespace(self, prefix):
+        return self.store.namespace(prefix)
+
+    def namespaces(self):
+        return self.store.namespaces()
+
+    def commit(self):
+        self.reverseOps = []
+
+    def rollback(self):
+        # Aquire Rollback lock and apply reverse operations in the forward
+        # order
+        with self.rollbackLock:
+            for subject, predicate, obj, context, op in self.reverseOps:
+                if op == 'add':
+                    self.store.add(
+                        (subject, predicate, obj), Graph(self.store, context))
+                else:
+                    self.store.remove(
+                        (subject, predicate, obj), Graph(self.store, context))
+
+            self.reverseOps = []