diff planemo/lib/python3.7/site-packages/rdflib/plugins/stores/concurrent.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/rdflib/plugins/stores/concurrent.py	Fri Jul 31 00:32:28 2020 -0400
@@ -0,0 +1,94 @@
+from threading import Lock
+
+
+class ResponsibleGenerator(object):
+    """A generator that will help clean up when it is done being used."""
+
+    __slots__ = ['cleanup', 'gen']
+
+    def __init__(self, gen, cleanup):
+        self.cleanup = cleanup
+        self.gen = gen
+
+    def __del__(self):
+        self.cleanup()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        return next(self.gen)
+
+
+class ConcurrentStore(object):
+
+    def __init__(self, store):
+        self.store = store
+
+        # number of calls to visit still in progress
+        self.__visit_count = 0
+
+        # lock for locking down the indices
+        self.__lock = Lock()
+
+        # lists for keeping track of added and removed triples while
+        # we wait for the lock
+        self.__pending_removes = []
+        self.__pending_adds = []
+
+    def add(self, triple):
+        (s, p, o) = triple
+        if self.__visit_count == 0:
+            self.store.add((s, p, o))
+        else:
+            self.__pending_adds.append((s, p, o))
+
+    def remove(self, triple):
+        (s, p, o) = triple
+        if self.__visit_count == 0:
+            self.store.remove((s, p, o))
+        else:
+            self.__pending_removes.append((s, p, o))
+
+    def triples(self, triple):
+        (su, pr, ob) = triple
+        g = self.store.triples((su, pr, ob))
+        pending_removes = self.__pending_removes
+        self.__begin_read()
+        for s, p, o in ResponsibleGenerator(g, self.__end_read):
+            if not (s, p, o) in pending_removes:
+                yield s, p, o
+
+        for (s, p, o) in self.__pending_adds:
+            if (su is None or su == s) \
+                    and (pr is None or pr == p) \
+                    and (ob is None or ob == o):
+                yield s, p, o
+
+    def __len__(self):
+        return self.store.__len__()
+
+    def __begin_read(self):
+        lock = self.__lock
+        lock.acquire()
+        self.__visit_count = self.__visit_count + 1
+        lock.release()
+
+    def __end_read(self):
+        lock = self.__lock
+        lock.acquire()
+        self.__visit_count = self.__visit_count - 1
+        if self.__visit_count == 0:
+            pending_removes = self.__pending_removes
+            while pending_removes:
+                (s, p, o) = pending_removes.pop()
+                try:
+                    self.store.remove((s, p, o))
+                except:
+                    # TODO: change to try finally?
+                    print(s, p, o, "Not in store to remove")
+            pending_adds = self.__pending_adds
+            while pending_adds:
+                (s, p, o) = pending_adds.pop()
+                self.store.add((s, p, o))
+        lock.release()