Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/rdflib/plugins/stores/auditable.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 """ | |
2 | |
3 This wrapper intercepts calls through the store interface and implements | |
4 thread-safe logging of destructive operations (adds / removes) in reverse. | |
5 This is persisted on the store instance and the reverse operations are | |
6 executed In order to return the store to the state it was when the transaction | |
7 began Since the reverse operations are persisted on the store, the store | |
8 itself acts as a transaction. | |
9 | |
10 Calls to commit or rollback, flush the list of reverse operations This | |
11 provides thread-safe atomicity and isolation (assuming concurrent operations | |
12 occur with different store instances), but no durability (transactions are | |
13 persisted in memory and wont be available to reverse operations after the | |
14 system fails): A and I out of ACID. | |
15 | |
16 """ | |
17 | |
18 from rdflib.store import Store | |
19 from rdflib import Graph, ConjunctiveGraph | |
20 import threading | |
21 | |
22 destructiveOpLocks = { | |
23 'add': None, | |
24 'remove': None, | |
25 } | |
26 | |
27 | |
28 class AuditableStore(Store): | |
29 def __init__(self, store): | |
30 self.store = store | |
31 self.context_aware = store.context_aware | |
32 # NOTE: this store can't be formula_aware as it doesn't have enough | |
33 # info to reverse the removal of a quoted statement | |
34 self.formula_aware = False # store.formula_aware | |
35 self.transaction_aware = True # This is only half true | |
36 self.reverseOps = [] | |
37 self.rollbackLock = threading.RLock() | |
38 | |
39 def open(self, configuration, create=True): | |
40 return self.store.open(configuration, create) | |
41 | |
42 def close(self, commit_pending_transaction=False): | |
43 self.store.close() | |
44 | |
45 def destroy(self, configuration): | |
46 self.store.destroy(configuration) | |
47 | |
48 def query(self, *args, **kw): | |
49 return self.store.query(*args, **kw) | |
50 | |
51 def add(self, triple, context, quoted=False): | |
52 (s, p, o) = triple | |
53 lock = destructiveOpLocks['add'] | |
54 lock = lock if lock else threading.RLock() | |
55 with lock: | |
56 context = context.__class__(self.store, context.identifier) if context is not None else None | |
57 ctxId = context.identifier if context is not None else None | |
58 if list(self.store.triples(triple, context)): | |
59 return # triple already in store, do nothing | |
60 self.reverseOps.append((s, p, o, ctxId, 'remove')) | |
61 try: | |
62 self.reverseOps.remove((s, p, o, ctxId, 'add')) | |
63 except ValueError: | |
64 pass | |
65 self.store.add((s, p, o), context, quoted) | |
66 | |
67 def remove(self, xxx_todo_changeme, context=None): | |
68 (subject, predicate, object_) = xxx_todo_changeme | |
69 lock = destructiveOpLocks['remove'] | |
70 lock = lock if lock else threading.RLock() | |
71 with lock: | |
72 # Need to determine which quads will be removed if any term is a | |
73 # wildcard | |
74 context = context.__class__(self.store, context.identifier) if context is not None else None | |
75 ctxId = context.identifier if context is not None else None | |
76 if None in [subject, predicate, object_, context]: | |
77 if ctxId: | |
78 for s, p, o in context.triples((subject, predicate, object_)): | |
79 try: | |
80 self.reverseOps.remove((s, p, o, ctxId, 'remove')) | |
81 except ValueError: | |
82 self.reverseOps.append((s, p, o, ctxId, 'add')) | |
83 else: | |
84 for s, p, o, ctx in ConjunctiveGraph(self.store).quads((subject, predicate, object_)): | |
85 try: | |
86 self.reverseOps.remove((s, p, o, ctx.identifier, 'remove')) | |
87 except ValueError: | |
88 self.reverseOps.append((s, p, o, ctx.identifier, 'add')) | |
89 else: | |
90 if not list(self.triples((subject, predicate, object_), context)): | |
91 return # triple not present in store, do nothing | |
92 try: | |
93 self.reverseOps.remove((subject, predicate, object_, ctxId, 'remove')) | |
94 except ValueError: | |
95 self.reverseOps.append((subject, predicate, object_, ctxId, 'add')) | |
96 self.store.remove((subject, predicate, object_), context) | |
97 | |
98 def triples(self, triple, context=None): | |
99 (su, pr, ob) = triple | |
100 context = context.__class__(self.store, context.identifier) if context is not None else None | |
101 for (s, p, o), cg in self.store.triples((su, pr, ob), context): | |
102 yield (s, p, o), cg | |
103 | |
104 def __len__(self, context=None): | |
105 context = context.__class__(self.store, context.identifier) if context is not None else None | |
106 return self.store.__len__(context) | |
107 | |
108 def contexts(self, triple=None): | |
109 for ctx in self.store.contexts(triple): | |
110 yield ctx | |
111 | |
112 def bind(self, prefix, namespace): | |
113 self.store.bind(prefix, namespace) | |
114 | |
115 def prefix(self, namespace): | |
116 return self.store.prefix(namespace) | |
117 | |
118 def namespace(self, prefix): | |
119 return self.store.namespace(prefix) | |
120 | |
121 def namespaces(self): | |
122 return self.store.namespaces() | |
123 | |
124 def commit(self): | |
125 self.reverseOps = [] | |
126 | |
127 def rollback(self): | |
128 # Aquire Rollback lock and apply reverse operations in the forward | |
129 # order | |
130 with self.rollbackLock: | |
131 for subject, predicate, obj, context, op in self.reverseOps: | |
132 if op == 'add': | |
133 self.store.add( | |
134 (subject, predicate, obj), Graph(self.store, context)) | |
135 else: | |
136 self.store.remove( | |
137 (subject, predicate, obj), Graph(self.store, context)) | |
138 | |
139 self.reverseOps = [] |