comparison planemo/lib/python3.7/site-packages/rdflib/plugins/stores/concurrent.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 from threading import Lock
2
3
4 class ResponsibleGenerator(object):
5 """A generator that will help clean up when it is done being used."""
6
7 __slots__ = ['cleanup', 'gen']
8
9 def __init__(self, gen, cleanup):
10 self.cleanup = cleanup
11 self.gen = gen
12
13 def __del__(self):
14 self.cleanup()
15
16 def __iter__(self):
17 return self
18
19 def __next__(self):
20 return next(self.gen)
21
22
23 class ConcurrentStore(object):
24
25 def __init__(self, store):
26 self.store = store
27
28 # number of calls to visit still in progress
29 self.__visit_count = 0
30
31 # lock for locking down the indices
32 self.__lock = Lock()
33
34 # lists for keeping track of added and removed triples while
35 # we wait for the lock
36 self.__pending_removes = []
37 self.__pending_adds = []
38
39 def add(self, triple):
40 (s, p, o) = triple
41 if self.__visit_count == 0:
42 self.store.add((s, p, o))
43 else:
44 self.__pending_adds.append((s, p, o))
45
46 def remove(self, triple):
47 (s, p, o) = triple
48 if self.__visit_count == 0:
49 self.store.remove((s, p, o))
50 else:
51 self.__pending_removes.append((s, p, o))
52
53 def triples(self, triple):
54 (su, pr, ob) = triple
55 g = self.store.triples((su, pr, ob))
56 pending_removes = self.__pending_removes
57 self.__begin_read()
58 for s, p, o in ResponsibleGenerator(g, self.__end_read):
59 if not (s, p, o) in pending_removes:
60 yield s, p, o
61
62 for (s, p, o) in self.__pending_adds:
63 if (su is None or su == s) \
64 and (pr is None or pr == p) \
65 and (ob is None or ob == o):
66 yield s, p, o
67
68 def __len__(self):
69 return self.store.__len__()
70
71 def __begin_read(self):
72 lock = self.__lock
73 lock.acquire()
74 self.__visit_count = self.__visit_count + 1
75 lock.release()
76
77 def __end_read(self):
78 lock = self.__lock
79 lock.acquire()
80 self.__visit_count = self.__visit_count - 1
81 if self.__visit_count == 0:
82 pending_removes = self.__pending_removes
83 while pending_removes:
84 (s, p, o) = pending_removes.pop()
85 try:
86 self.store.remove((s, p, o))
87 except:
88 # TODO: change to try finally?
89 print(s, p, o, "Not in store to remove")
90 pending_adds = self.__pending_adds
91 while pending_adds:
92 (s, p, o) = pending_adds.pop()
93 self.store.add((s, p, o))
94 lock.release()