comparison planemo/lib/python3.7/site-packages/rdflib/plugins/stores/auditable.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 """
2
3 This wrapper intercepts calls through the store interface and implements
4 thread-safe logging of destructive operations (adds / removes) in reverse.
5 This is persisted on the store instance and the reverse operations are
6 executed In order to return the store to the state it was when the transaction
7 began Since the reverse operations are persisted on the store, the store
8 itself acts as a transaction.
9
10 Calls to commit or rollback, flush the list of reverse operations This
11 provides thread-safe atomicity and isolation (assuming concurrent operations
12 occur with different store instances), but no durability (transactions are
13 persisted in memory and wont be available to reverse operations after the
14 system fails): A and I out of ACID.
15
16 """
17
18 from rdflib.store import Store
19 from rdflib import Graph, ConjunctiveGraph
20 import threading
21
22 destructiveOpLocks = {
23 'add': None,
24 'remove': None,
25 }
26
27
28 class AuditableStore(Store):
29 def __init__(self, store):
30 self.store = store
31 self.context_aware = store.context_aware
32 # NOTE: this store can't be formula_aware as it doesn't have enough
33 # info to reverse the removal of a quoted statement
34 self.formula_aware = False # store.formula_aware
35 self.transaction_aware = True # This is only half true
36 self.reverseOps = []
37 self.rollbackLock = threading.RLock()
38
39 def open(self, configuration, create=True):
40 return self.store.open(configuration, create)
41
42 def close(self, commit_pending_transaction=False):
43 self.store.close()
44
45 def destroy(self, configuration):
46 self.store.destroy(configuration)
47
48 def query(self, *args, **kw):
49 return self.store.query(*args, **kw)
50
51 def add(self, triple, context, quoted=False):
52 (s, p, o) = triple
53 lock = destructiveOpLocks['add']
54 lock = lock if lock else threading.RLock()
55 with lock:
56 context = context.__class__(self.store, context.identifier) if context is not None else None
57 ctxId = context.identifier if context is not None else None
58 if list(self.store.triples(triple, context)):
59 return # triple already in store, do nothing
60 self.reverseOps.append((s, p, o, ctxId, 'remove'))
61 try:
62 self.reverseOps.remove((s, p, o, ctxId, 'add'))
63 except ValueError:
64 pass
65 self.store.add((s, p, o), context, quoted)
66
67 def remove(self, xxx_todo_changeme, context=None):
68 (subject, predicate, object_) = xxx_todo_changeme
69 lock = destructiveOpLocks['remove']
70 lock = lock if lock else threading.RLock()
71 with lock:
72 # Need to determine which quads will be removed if any term is a
73 # wildcard
74 context = context.__class__(self.store, context.identifier) if context is not None else None
75 ctxId = context.identifier if context is not None else None
76 if None in [subject, predicate, object_, context]:
77 if ctxId:
78 for s, p, o in context.triples((subject, predicate, object_)):
79 try:
80 self.reverseOps.remove((s, p, o, ctxId, 'remove'))
81 except ValueError:
82 self.reverseOps.append((s, p, o, ctxId, 'add'))
83 else:
84 for s, p, o, ctx in ConjunctiveGraph(self.store).quads((subject, predicate, object_)):
85 try:
86 self.reverseOps.remove((s, p, o, ctx.identifier, 'remove'))
87 except ValueError:
88 self.reverseOps.append((s, p, o, ctx.identifier, 'add'))
89 else:
90 if not list(self.triples((subject, predicate, object_), context)):
91 return # triple not present in store, do nothing
92 try:
93 self.reverseOps.remove((subject, predicate, object_, ctxId, 'remove'))
94 except ValueError:
95 self.reverseOps.append((subject, predicate, object_, ctxId, 'add'))
96 self.store.remove((subject, predicate, object_), context)
97
98 def triples(self, triple, context=None):
99 (su, pr, ob) = triple
100 context = context.__class__(self.store, context.identifier) if context is not None else None
101 for (s, p, o), cg in self.store.triples((su, pr, ob), context):
102 yield (s, p, o), cg
103
104 def __len__(self, context=None):
105 context = context.__class__(self.store, context.identifier) if context is not None else None
106 return self.store.__len__(context)
107
108 def contexts(self, triple=None):
109 for ctx in self.store.contexts(triple):
110 yield ctx
111
112 def bind(self, prefix, namespace):
113 self.store.bind(prefix, namespace)
114
115 def prefix(self, namespace):
116 return self.store.prefix(namespace)
117
118 def namespace(self, prefix):
119 return self.store.namespace(prefix)
120
121 def namespaces(self):
122 return self.store.namespaces()
123
124 def commit(self):
125 self.reverseOps = []
126
127 def rollback(self):
128 # Aquire Rollback lock and apply reverse operations in the forward
129 # order
130 with self.rollbackLock:
131 for subject, predicate, obj, context, op in self.reverseOps:
132 if op == 'add':
133 self.store.add(
134 (subject, predicate, obj), Graph(self.store, context))
135 else:
136 self.store.remove(
137 (subject, predicate, obj), Graph(self.store, context))
138
139 self.reverseOps = []