Mercurial > repos > guerler > springsuite
view planemo/lib/python3.7/site-packages/rdflib/plugins/stores/concurrent.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line source
from threading import Lock class ResponsibleGenerator(object): """A generator that will help clean up when it is done being used.""" __slots__ = ['cleanup', 'gen'] def __init__(self, gen, cleanup): self.cleanup = cleanup self.gen = gen def __del__(self): self.cleanup() def __iter__(self): return self def __next__(self): return next(self.gen) class ConcurrentStore(object): def __init__(self, store): self.store = store # number of calls to visit still in progress self.__visit_count = 0 # lock for locking down the indices self.__lock = Lock() # lists for keeping track of added and removed triples while # we wait for the lock self.__pending_removes = [] self.__pending_adds = [] def add(self, triple): (s, p, o) = triple if self.__visit_count == 0: self.store.add((s, p, o)) else: self.__pending_adds.append((s, p, o)) def remove(self, triple): (s, p, o) = triple if self.__visit_count == 0: self.store.remove((s, p, o)) else: self.__pending_removes.append((s, p, o)) def triples(self, triple): (su, pr, ob) = triple g = self.store.triples((su, pr, ob)) pending_removes = self.__pending_removes self.__begin_read() for s, p, o in ResponsibleGenerator(g, self.__end_read): if not (s, p, o) in pending_removes: yield s, p, o for (s, p, o) in self.__pending_adds: if (su is None or su == s) \ and (pr is None or pr == p) \ and (ob is None or ob == o): yield s, p, o def __len__(self): return self.store.__len__() def __begin_read(self): lock = self.__lock lock.acquire() self.__visit_count = self.__visit_count + 1 lock.release() def __end_read(self): lock = self.__lock lock.acquire() self.__visit_count = self.__visit_count - 1 if self.__visit_count == 0: pending_removes = self.__pending_removes while pending_removes: (s, p, o) = pending_removes.pop() try: self.store.remove((s, p, o)) except: # TODO: change to try finally? print(s, p, o, "Not in store to remove") pending_adds = self.__pending_adds while pending_adds: (s, p, o) = pending_adds.pop() self.store.add((s, p, o)) lock.release()