Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/rdflib/plugins/stores/concurrent.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 from threading import Lock | |
2 | |
3 | |
4 class ResponsibleGenerator(object): | |
5 """A generator that will help clean up when it is done being used.""" | |
6 | |
7 __slots__ = ['cleanup', 'gen'] | |
8 | |
9 def __init__(self, gen, cleanup): | |
10 self.cleanup = cleanup | |
11 self.gen = gen | |
12 | |
13 def __del__(self): | |
14 self.cleanup() | |
15 | |
16 def __iter__(self): | |
17 return self | |
18 | |
19 def __next__(self): | |
20 return next(self.gen) | |
21 | |
22 | |
23 class ConcurrentStore(object): | |
24 | |
25 def __init__(self, store): | |
26 self.store = store | |
27 | |
28 # number of calls to visit still in progress | |
29 self.__visit_count = 0 | |
30 | |
31 # lock for locking down the indices | |
32 self.__lock = Lock() | |
33 | |
34 # lists for keeping track of added and removed triples while | |
35 # we wait for the lock | |
36 self.__pending_removes = [] | |
37 self.__pending_adds = [] | |
38 | |
39 def add(self, triple): | |
40 (s, p, o) = triple | |
41 if self.__visit_count == 0: | |
42 self.store.add((s, p, o)) | |
43 else: | |
44 self.__pending_adds.append((s, p, o)) | |
45 | |
46 def remove(self, triple): | |
47 (s, p, o) = triple | |
48 if self.__visit_count == 0: | |
49 self.store.remove((s, p, o)) | |
50 else: | |
51 self.__pending_removes.append((s, p, o)) | |
52 | |
53 def triples(self, triple): | |
54 (su, pr, ob) = triple | |
55 g = self.store.triples((su, pr, ob)) | |
56 pending_removes = self.__pending_removes | |
57 self.__begin_read() | |
58 for s, p, o in ResponsibleGenerator(g, self.__end_read): | |
59 if not (s, p, o) in pending_removes: | |
60 yield s, p, o | |
61 | |
62 for (s, p, o) in self.__pending_adds: | |
63 if (su is None or su == s) \ | |
64 and (pr is None or pr == p) \ | |
65 and (ob is None or ob == o): | |
66 yield s, p, o | |
67 | |
68 def __len__(self): | |
69 return self.store.__len__() | |
70 | |
71 def __begin_read(self): | |
72 lock = self.__lock | |
73 lock.acquire() | |
74 self.__visit_count = self.__visit_count + 1 | |
75 lock.release() | |
76 | |
77 def __end_read(self): | |
78 lock = self.__lock | |
79 lock.acquire() | |
80 self.__visit_count = self.__visit_count - 1 | |
81 if self.__visit_count == 0: | |
82 pending_removes = self.__pending_removes | |
83 while pending_removes: | |
84 (s, p, o) = pending_removes.pop() | |
85 try: | |
86 self.store.remove((s, p, o)) | |
87 except: | |
88 # TODO: change to try finally? | |
89 print(s, p, o, "Not in store to remove") | |
90 pending_adds = self.__pending_adds | |
91 while pending_adds: | |
92 (s, p, o) = pending_adds.pop() | |
93 self.store.add((s, p, o)) | |
94 lock.release() |