Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/rdflib/plugins/stores/concurrent.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/rdflib/plugins/stores/concurrent.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,94 @@ +from threading import Lock + + +class ResponsibleGenerator(object): + """A generator that will help clean up when it is done being used.""" + + __slots__ = ['cleanup', 'gen'] + + def __init__(self, gen, cleanup): + self.cleanup = cleanup + self.gen = gen + + def __del__(self): + self.cleanup() + + def __iter__(self): + return self + + def __next__(self): + return next(self.gen) + + +class ConcurrentStore(object): + + def __init__(self, store): + self.store = store + + # number of calls to visit still in progress + self.__visit_count = 0 + + # lock for locking down the indices + self.__lock = Lock() + + # lists for keeping track of added and removed triples while + # we wait for the lock + self.__pending_removes = [] + self.__pending_adds = [] + + def add(self, triple): + (s, p, o) = triple + if self.__visit_count == 0: + self.store.add((s, p, o)) + else: + self.__pending_adds.append((s, p, o)) + + def remove(self, triple): + (s, p, o) = triple + if self.__visit_count == 0: + self.store.remove((s, p, o)) + else: + self.__pending_removes.append((s, p, o)) + + def triples(self, triple): + (su, pr, ob) = triple + g = self.store.triples((su, pr, ob)) + pending_removes = self.__pending_removes + self.__begin_read() + for s, p, o in ResponsibleGenerator(g, self.__end_read): + if not (s, p, o) in pending_removes: + yield s, p, o + + for (s, p, o) in self.__pending_adds: + if (su is None or su == s) \ + and (pr is None or pr == p) \ + and (ob is None or ob == o): + yield s, p, o + + def __len__(self): + return self.store.__len__() + + def __begin_read(self): + lock = self.__lock + lock.acquire() + self.__visit_count = self.__visit_count + 1 + lock.release() + + def __end_read(self): + lock = self.__lock + lock.acquire() + self.__visit_count = self.__visit_count - 1 + if self.__visit_count == 0: + pending_removes = self.__pending_removes + while pending_removes: + (s, p, o) = pending_removes.pop() + try: + self.store.remove((s, p, o)) + except: + # TODO: change to try finally? + print(s, p, o, "Not in store to remove") + pending_adds = self.__pending_adds + while pending_adds: + (s, p, o) = pending_adds.pop() + self.store.add((s, p, o)) + lock.release()