view planemo/lib/python3.7/site-packages/rdflib/plugins/stores/concurrent.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
line wrap: on
line source

from threading import Lock


class ResponsibleGenerator(object):
    """A generator that will help clean up when it is done being used."""

    __slots__ = ['cleanup', 'gen']

    def __init__(self, gen, cleanup):
        self.cleanup = cleanup
        self.gen = gen

    def __del__(self):
        self.cleanup()

    def __iter__(self):
        return self

    def __next__(self):
        return next(self.gen)


class ConcurrentStore(object):

    def __init__(self, store):
        self.store = store

        # number of calls to visit still in progress
        self.__visit_count = 0

        # lock for locking down the indices
        self.__lock = Lock()

        # lists for keeping track of added and removed triples while
        # we wait for the lock
        self.__pending_removes = []
        self.__pending_adds = []

    def add(self, triple):
        (s, p, o) = triple
        if self.__visit_count == 0:
            self.store.add((s, p, o))
        else:
            self.__pending_adds.append((s, p, o))

    def remove(self, triple):
        (s, p, o) = triple
        if self.__visit_count == 0:
            self.store.remove((s, p, o))
        else:
            self.__pending_removes.append((s, p, o))

    def triples(self, triple):
        (su, pr, ob) = triple
        g = self.store.triples((su, pr, ob))
        pending_removes = self.__pending_removes
        self.__begin_read()
        for s, p, o in ResponsibleGenerator(g, self.__end_read):
            if not (s, p, o) in pending_removes:
                yield s, p, o

        for (s, p, o) in self.__pending_adds:
            if (su is None or su == s) \
                    and (pr is None or pr == p) \
                    and (ob is None or ob == o):
                yield s, p, o

    def __len__(self):
        return self.store.__len__()

    def __begin_read(self):
        lock = self.__lock
        lock.acquire()
        self.__visit_count = self.__visit_count + 1
        lock.release()

    def __end_read(self):
        lock = self.__lock
        lock.acquire()
        self.__visit_count = self.__visit_count - 1
        if self.__visit_count == 0:
            pending_removes = self.__pending_removes
            while pending_removes:
                (s, p, o) = pending_removes.pop()
                try:
                    self.store.remove((s, p, o))
                except:
                    # TODO: change to try finally?
                    print(s, p, o, "Not in store to remove")
            pending_adds = self.__pending_adds
            while pending_adds:
                (s, p, o) = pending_adds.pop()
                self.store.add((s, p, o))
        lock.release()