comparison planemo/lib/python3.7/site-packages/rdflib/plugins/sleepycat.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 from rdflib.store import Store, VALID_STORE, NO_STORE
2 from rdflib.term import URIRef
3 from rdflib.py3compat import b
4
5
6 def bb(u):
7 return u.encode('utf-8')
8
9
10 try:
11 from bsddb import db
12 has_bsddb = True
13 except ImportError:
14 try:
15 from bsddb3 import db
16 has_bsddb = True
17 except ImportError:
18 has_bsddb = False
19 from os import mkdir
20 from os.path import exists, abspath
21 from urllib.request import pathname2url
22 from threading import Thread
23
24 if has_bsddb:
25 # These are passed to bsddb when creating DBs
26
27 # passed to db.DBEnv.set_flags
28 ENVSETFLAGS = db.DB_CDB_ALLDB
29 # passed to db.DBEnv.open
30 ENVFLAGS = db.DB_INIT_MPOOL | db.DB_INIT_CDB | db.DB_THREAD
31 CACHESIZE = 1024 * 1024 * 50
32
33 # passed to db.DB.Open()
34 DBOPENFLAGS = db.DB_THREAD
35
36 import logging
37 logger = logging.getLogger(__name__)
38
39 __all__ = ['Sleepycat']
40
41
42 class Sleepycat(Store):
43 context_aware = True
44 formula_aware = True
45 transaction_aware = False
46 graph_aware = True
47 db_env = None
48
49 def __init__(self, configuration=None, identifier=None):
50 if not has_bsddb:
51 raise ImportError(
52 "Unable to import bsddb/bsddb3, store is unusable.")
53 self.__open = False
54 self.__identifier = identifier
55 super(Sleepycat, self).__init__(configuration)
56 self._loads = self.node_pickler.loads
57 self._dumps = self.node_pickler.dumps
58
59 def __get_identifier(self):
60 return self.__identifier
61 identifier = property(__get_identifier)
62
63 def _init_db_environment(self, homeDir, create=True):
64 if not exists(homeDir):
65 if create is True:
66 mkdir(homeDir)
67 # TODO: implement create method and refactor this to it
68 self.create(homeDir)
69 else:
70 return NO_STORE
71 db_env = db.DBEnv()
72 db_env.set_cachesize(0, CACHESIZE) # TODO
73 # db_env.set_lg_max(1024*1024)
74 db_env.set_flags(ENVSETFLAGS, 1)
75 db_env.open(homeDir, ENVFLAGS | db.DB_CREATE)
76 return db_env
77
78 def is_open(self):
79 return self.__open
80
81 def open(self, path, create=True):
82 if not has_bsddb:
83 return NO_STORE
84 homeDir = path
85
86 if self.__identifier is None:
87 self.__identifier = URIRef(pathname2url(abspath(homeDir)))
88
89 db_env = self._init_db_environment(homeDir, create)
90 if db_env == NO_STORE:
91 return NO_STORE
92 self.db_env = db_env
93 self.__open = True
94
95 dbname = None
96 dbtype = db.DB_BTREE
97 # auto-commit ensures that the open-call commits when transactions
98 # are enabled
99
100 dbopenflags = DBOPENFLAGS
101 if self.transaction_aware is True:
102 dbopenflags |= db.DB_AUTO_COMMIT
103
104 if create:
105 dbopenflags |= db.DB_CREATE
106
107 dbmode = 0o660
108 dbsetflags = 0
109
110 # create and open the DBs
111 self.__indicies = [None, ] * 3
112 self.__indicies_info = [None, ] * 3
113 for i in range(0, 3):
114 index_name = to_key_func(
115 i)((b("s"), b("p"), b("o")), b("c")).decode()
116 index = db.DB(db_env)
117 index.set_flags(dbsetflags)
118 index.open(index_name, dbname, dbtype, dbopenflags, dbmode)
119 self.__indicies[i] = index
120 self.__indicies_info[i] = (index, to_key_func(i), from_key_func(i))
121
122 lookup = {}
123 for i in range(0, 8):
124 results = []
125 for start in range(0, 3):
126 score = 1
127 len = 0
128 for j in range(start, start + 3):
129 if i & (1 << (j % 3)):
130 score = score << 1
131 len += 1
132 else:
133 break
134 tie_break = 2 - start
135 results.append(((score, tie_break), start, len))
136
137 results.sort()
138 score, start, len = results[-1]
139
140 def get_prefix_func(start, end):
141 def get_prefix(triple, context):
142 if context is None:
143 yield ""
144 else:
145 yield context
146 i = start
147 while i < end:
148 yield triple[i % 3]
149 i += 1
150 yield ""
151 return get_prefix
152
153 lookup[i] = (
154 self.__indicies[start],
155 get_prefix_func(start, start + len),
156 from_key_func(start),
157 results_from_key_func(start, self._from_string))
158
159 self.__lookup_dict = lookup
160
161 self.__contexts = db.DB(db_env)
162 self.__contexts.set_flags(dbsetflags)
163 self.__contexts.open("contexts", dbname, dbtype, dbopenflags, dbmode)
164
165 self.__namespace = db.DB(db_env)
166 self.__namespace.set_flags(dbsetflags)
167 self.__namespace.open("namespace", dbname, dbtype, dbopenflags, dbmode)
168
169 self.__prefix = db.DB(db_env)
170 self.__prefix.set_flags(dbsetflags)
171 self.__prefix.open("prefix", dbname, dbtype, dbopenflags, dbmode)
172
173 self.__k2i = db.DB(db_env)
174 self.__k2i.set_flags(dbsetflags)
175 self.__k2i.open("k2i", dbname, db.DB_HASH, dbopenflags, dbmode)
176
177 self.__i2k = db.DB(db_env)
178 self.__i2k.set_flags(dbsetflags)
179 self.__i2k.open("i2k", dbname, db.DB_RECNO, dbopenflags, dbmode)
180
181 self.__needs_sync = False
182 t = Thread(target=self.__sync_run)
183 t.setDaemon(True)
184 t.start()
185 self.__sync_thread = t
186 return VALID_STORE
187
188 def __sync_run(self):
189 from time import sleep, time
190 try:
191 min_seconds, max_seconds = 10, 300
192 while self.__open:
193 if self.__needs_sync:
194 t0 = t1 = time()
195 self.__needs_sync = False
196 while self.__open:
197 sleep(.1)
198 if self.__needs_sync:
199 t1 = time()
200 self.__needs_sync = False
201 if time() - t1 > min_seconds \
202 or time() - t0 > max_seconds:
203 self.__needs_sync = False
204 logger.debug("sync")
205 self.sync()
206 break
207 else:
208 sleep(1)
209 except Exception as e:
210 logger.exception(e)
211
212 def sync(self):
213 if self.__open:
214 for i in self.__indicies:
215 i.sync()
216 self.__contexts.sync()
217 self.__namespace.sync()
218 self.__prefix.sync()
219 self.__i2k.sync()
220 self.__k2i.sync()
221
222 def close(self, commit_pending_transaction=False):
223 self.__open = False
224 self.__sync_thread.join()
225 for i in self.__indicies:
226 i.close()
227 self.__contexts.close()
228 self.__namespace.close()
229 self.__prefix.close()
230 self.__i2k.close()
231 self.__k2i.close()
232 self.db_env.close()
233
234 def add(self, triple, context, quoted=False, txn=None):
235 """\
236 Add a triple to the store of triples.
237 """
238 (subject, predicate, object) = triple
239 assert self.__open, "The Store must be open."
240 assert context != self, "Can not add triple directly to store"
241 Store.add(self, (subject, predicate, object), context, quoted)
242
243 _to_string = self._to_string
244
245 s = _to_string(subject, txn=txn)
246 p = _to_string(predicate, txn=txn)
247 o = _to_string(object, txn=txn)
248 c = _to_string(context, txn=txn)
249
250 cspo, cpos, cosp = self.__indicies
251
252 value = cspo.get(bb("%s^%s^%s^%s^" % (c, s, p, o)), txn=txn)
253 if value is None:
254 self.__contexts.put(bb(c), "", txn=txn)
255
256 contexts_value = cspo.get(
257 bb("%s^%s^%s^%s^" % ("", s, p, o)), txn=txn) or b("")
258 contexts = set(contexts_value.split(b("^")))
259 contexts.add(bb(c))
260 contexts_value = b("^").join(contexts)
261 assert contexts_value is not None
262
263 cspo.put(bb("%s^%s^%s^%s^" % (c, s, p, o)), "", txn=txn)
264 cpos.put(bb("%s^%s^%s^%s^" % (c, p, o, s)), "", txn=txn)
265 cosp.put(bb("%s^%s^%s^%s^" % (c, o, s, p)), "", txn=txn)
266 if not quoted:
267 cspo.put(bb(
268 "%s^%s^%s^%s^" % ("", s, p, o)), contexts_value, txn=txn)
269 cpos.put(bb(
270 "%s^%s^%s^%s^" % ("", p, o, s)), contexts_value, txn=txn)
271 cosp.put(bb(
272 "%s^%s^%s^%s^" % ("", o, s, p)), contexts_value, txn=txn)
273
274 self.__needs_sync = True
275
276 def __remove(self, xxx_todo_changeme, c, quoted=False, txn=None):
277 (s, p, o) = xxx_todo_changeme
278 cspo, cpos, cosp = self.__indicies
279 contexts_value = cspo.get(
280 b("^").join([b(""), s, p, o, b("")]), txn=txn) or b("")
281 contexts = set(contexts_value.split(b("^")))
282 contexts.discard(c)
283 contexts_value = b("^").join(contexts)
284 for i, _to_key, _from_key in self.__indicies_info:
285 i.delete(_to_key((s, p, o), c), txn=txn)
286 if not quoted:
287 if contexts_value:
288 for i, _to_key, _from_key in self.__indicies_info:
289 i.put(_to_key((s, p, o), b("")), contexts_value, txn=txn)
290 else:
291 for i, _to_key, _from_key in self.__indicies_info:
292 try:
293 i.delete(_to_key((s, p, o), b("")), txn=txn)
294 except db.DBNotFoundError:
295 pass # TODO: is it okay to ignore these?
296
297 def remove(self, xxx_todo_changeme1, context, txn=None):
298 (subject, predicate, object) = xxx_todo_changeme1
299 assert self.__open, "The Store must be open."
300 Store.remove(self, (subject, predicate, object), context)
301 _to_string = self._to_string
302
303 if context is not None:
304 if context == self:
305 context = None
306
307 if subject is not None \
308 and predicate is not None \
309 and object is not None \
310 and context is not None:
311 s = _to_string(subject, txn=txn)
312 p = _to_string(predicate, txn=txn)
313 o = _to_string(object, txn=txn)
314 c = _to_string(context, txn=txn)
315 value = self.__indicies[0].get(bb("%s^%s^%s^%s^" %
316 (c, s, p, o)), txn=txn)
317 if value is not None:
318 self.__remove((bb(s), bb(p), bb(o)), bb(c), txn=txn)
319 self.__needs_sync = True
320 else:
321 cspo, cpos, cosp = self.__indicies
322 index, prefix, from_key, results_from_key = self.__lookup(
323 (subject, predicate, object), context, txn=txn)
324
325 cursor = index.cursor(txn=txn)
326 try:
327 current = cursor.set_range(prefix)
328 needs_sync = True
329 except db.DBNotFoundError:
330 current = None
331 needs_sync = False
332 cursor.close()
333 while current:
334 key, value = current
335 cursor = index.cursor(txn=txn)
336 try:
337 cursor.set_range(key)
338 # Hack to stop 2to3 converting this to next(cursor)
339 current = getattr(cursor, 'next')()
340 except db.DBNotFoundError:
341 current = None
342 cursor.close()
343 if key.startswith(prefix):
344 c, s, p, o = from_key(key)
345 if context is None:
346 contexts_value = index.get(key, txn=txn) or b("")
347 # remove triple from all non quoted contexts
348 contexts = set(contexts_value.split(b("^")))
349 # and from the conjunctive index
350 contexts.add(b(""))
351 for c in contexts:
352 for i, _to_key, _ in self.__indicies_info:
353 i.delete(_to_key((s, p, o), c), txn=txn)
354 else:
355 self.__remove((s, p, o), c, txn=txn)
356 else:
357 break
358
359 if context is not None:
360 if subject is None and predicate is None and object is None:
361 # TODO: also if context becomes empty and not just on
362 # remove((None, None, None), c)
363 try:
364 self.__contexts.delete(
365 bb(_to_string(context, txn=txn)), txn=txn)
366 except db.DBNotFoundError:
367 pass
368
369 self.__needs_sync = needs_sync
370
371 def triples(self, xxx_todo_changeme2, context=None, txn=None):
372 """A generator over all the triples matching """
373 (subject, predicate, object) = xxx_todo_changeme2
374 assert self.__open, "The Store must be open."
375
376 if context is not None:
377 if context == self:
378 context = None
379
380 # _from_string = self._from_string ## UNUSED
381 index, prefix, from_key, results_from_key = self.__lookup(
382 (subject, predicate, object), context, txn=txn)
383
384 cursor = index.cursor(txn=txn)
385 try:
386 current = cursor.set_range(prefix)
387 except db.DBNotFoundError:
388 current = None
389 cursor.close()
390 while current:
391 key, value = current
392 cursor = index.cursor(txn=txn)
393 try:
394 cursor.set_range(key)
395 # Cheap hack so 2to3 doesn't convert to next(cursor)
396 current = getattr(cursor, 'next')()
397 except db.DBNotFoundError:
398 current = None
399 cursor.close()
400 if key and key.startswith(prefix):
401 contexts_value = index.get(key, txn=txn)
402 yield results_from_key(
403 key, subject, predicate, object, contexts_value)
404 else:
405 break
406
407 def __len__(self, context=None):
408 assert self.__open, "The Store must be open."
409 if context is not None:
410 if context == self:
411 context = None
412
413 if context is None:
414 prefix = b("^")
415 else:
416 prefix = bb("%s^" % self._to_string(context))
417
418 index = self.__indicies[0]
419 cursor = index.cursor()
420 current = cursor.set_range(prefix)
421 count = 0
422 while current:
423 key, value = current
424 if key.startswith(prefix):
425 count += 1
426 # Hack to stop 2to3 converting this to next(cursor)
427 current = getattr(cursor, 'next')()
428 else:
429 break
430 cursor.close()
431 return count
432
433 def bind(self, prefix, namespace):
434 prefix = prefix.encode("utf-8")
435 namespace = namespace.encode("utf-8")
436 bound_prefix = self.__prefix.get(namespace)
437 if bound_prefix:
438 self.__namespace.delete(bound_prefix)
439 self.__prefix[namespace] = prefix
440 self.__namespace[prefix] = namespace
441
442 def namespace(self, prefix):
443 prefix = prefix.encode("utf-8")
444 ns = self.__namespace.get(prefix, None)
445 if ns is not None:
446 return URIRef(ns.decode('utf-8'))
447 return None
448
449 def prefix(self, namespace):
450 namespace = namespace.encode("utf-8")
451 prefix = self.__prefix.get(namespace, None)
452 if prefix is not None:
453 return prefix.decode('utf-8')
454 return None
455
456 def namespaces(self):
457 cursor = self.__namespace.cursor()
458 results = []
459 current = cursor.first()
460 while current:
461 prefix, namespace = current
462 results.append((prefix.decode('utf-8'), namespace.decode('utf-8')))
463 # Hack to stop 2to3 converting this to next(cursor)
464 current = getattr(cursor, 'next')()
465 cursor.close()
466 for prefix, namespace in results:
467 yield prefix, URIRef(namespace)
468
469 def contexts(self, triple=None):
470 _from_string = self._from_string
471 _to_string = self._to_string
472
473 if triple:
474 s, p, o = triple
475 s = _to_string(s)
476 p = _to_string(p)
477 o = _to_string(o)
478 contexts = self.__indicies[0].get(bb(
479 "%s^%s^%s^%s^" % ("", s, p, o)))
480 if contexts:
481 for c in contexts.split(b("^")):
482 if c:
483 yield _from_string(c)
484 else:
485 index = self.__contexts
486 cursor = index.cursor()
487 current = cursor.first()
488 cursor.close()
489 while current:
490 key, value = current
491 context = _from_string(key)
492 yield context
493 cursor = index.cursor()
494 try:
495 cursor.set_range(key)
496 # Hack to stop 2to3 converting this to next(cursor)
497 current = getattr(cursor, 'next')()
498 except db.DBNotFoundError:
499 current = None
500 cursor.close()
501
502 def add_graph(self, graph):
503 self.__contexts.put(bb(self._to_string(graph)), "")
504
505 def remove_graph(self, graph):
506 self.remove((None, None, None), graph)
507
508 def _from_string(self, i):
509 k = self.__i2k.get(int(i))
510 return self._loads(k)
511
512 def _to_string(self, term, txn=None):
513 k = self._dumps(term)
514 i = self.__k2i.get(k, txn=txn)
515 if i is None:
516 # weird behavoir from bsddb not taking a txn as a keyword argument
517 # for append
518 if self.transaction_aware:
519 i = "%s" % self.__i2k.append(k, txn)
520 else:
521 i = "%s" % self.__i2k.append(k)
522
523 self.__k2i.put(k, i, txn=txn)
524 else:
525 i = i.decode()
526 return i
527
528 def __lookup(self, xxx_todo_changeme3, context, txn=None):
529 (subject, predicate, object) = xxx_todo_changeme3
530 _to_string = self._to_string
531 if context is not None:
532 context = _to_string(context, txn=txn)
533 i = 0
534 if subject is not None:
535 i += 1
536 subject = _to_string(subject, txn=txn)
537 if predicate is not None:
538 i += 2
539 predicate = _to_string(predicate, txn=txn)
540 if object is not None:
541 i += 4
542 object = _to_string(object, txn=txn)
543 index, prefix_func, from_key, results_from_key = self.__lookup_dict[i]
544 # print (subject, predicate, object), context, prefix_func, index
545 # #DEBUG
546 prefix = bb(
547 "^".join(prefix_func((subject, predicate, object), context)))
548 return index, prefix, from_key, results_from_key
549
550
551 def to_key_func(i):
552 def to_key(triple, context):
553 "Takes a string; returns key"
554 return b("^").join(
555 (context,
556 triple[i % 3],
557 triple[(i + 1) % 3],
558 triple[(i + 2) % 3], b(""))) # "" to tac on the trailing ^
559 return to_key
560
561
562 def from_key_func(i):
563 def from_key(key):
564 "Takes a key; returns string"
565 parts = key.split(b("^"))
566 return \
567 parts[0], \
568 parts[(3 - i + 0) % 3 + 1], \
569 parts[(3 - i + 1) % 3 + 1], \
570 parts[(3 - i + 2) % 3 + 1]
571 return from_key
572
573
574 def results_from_key_func(i, from_string):
575 def from_key(key, subject, predicate, object, contexts_value):
576 "Takes a key and subject, predicate, object; returns tuple for yield"
577 parts = key.split(b("^"))
578 if subject is None:
579 # TODO: i & 1: # dis assemble and/or measure to see which is faster
580 # subject is None or i & 1
581 s = from_string(parts[(3 - i + 0) % 3 + 1])
582 else:
583 s = subject
584 if predicate is None: # i & 2:
585 p = from_string(parts[(3 - i + 1) % 3 + 1])
586 else:
587 p = predicate
588 if object is None: # i & 4:
589 o = from_string(parts[(3 - i + 2) % 3 + 1])
590 else:
591 o = object
592 return (s, p, o), (
593 from_string(c) for c in contexts_value.split(b("^")) if c)
594 return from_key
595
596
597 def readable_index(i):
598 s, p, o = "?" * 3
599 if i & 1:
600 s = "s"
601 if i & 2:
602 p = "p"
603 if i & 4:
604 o = "o"
605 return "%s,%s,%s" % (s, p, o)