comparison planemo/lib/python3.7/site-packages/rdflib/plugins/sparql/evaluate.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 """
2 These method recursively evaluate the SPARQL Algebra
3
4 evalQuery is the entry-point, it will setup context and
5 return the SPARQLResult object
6
7 evalPart is called on each level and will delegate to the right method
8
9 A rdflib.plugins.sparql.sparql.QueryContext is passed along, keeping
10 information needed for evaluation
11
12 A list of dicts (solution mappings) is returned, apart from GroupBy which may
13 also return a dict of list of dicts
14
15 """
16
17 import collections
18
19 from rdflib import Variable, Graph, BNode, URIRef, Literal
20
21 from rdflib.plugins.sparql import CUSTOM_EVALS
22 from rdflib.plugins.sparql.parserutils import value
23 from rdflib.plugins.sparql.sparql import (
24 QueryContext, AlreadyBound, FrozenBindings, SPARQLError)
25 from rdflib.plugins.sparql.evalutils import (
26 _filter, _eval, _join, _diff, _minus, _fillTemplate, _ebv, _val)
27
28 from rdflib.plugins.sparql.aggregates import Aggregator
29 from rdflib.plugins.sparql.algebra import Join, ToMultiSet, Values
30
31 def evalBGP(ctx, bgp):
32
33 """
34 A basic graph pattern
35 """
36
37 if not bgp:
38 yield ctx.solution()
39 return
40
41 s, p, o = bgp[0]
42
43 _s = ctx[s]
44 _p = ctx[p]
45 _o = ctx[o]
46
47 for ss, sp, so in ctx.graph.triples((_s, _p, _o)):
48 if None in (_s, _p, _o):
49 c = ctx.push()
50 else:
51 c = ctx
52
53 if _s is None:
54 c[s] = ss
55
56 try:
57 if _p is None:
58 c[p] = sp
59 except AlreadyBound:
60 continue
61
62 try:
63 if _o is None:
64 c[o] = so
65 except AlreadyBound:
66 continue
67
68 for x in evalBGP(c, bgp[1:]):
69 yield x
70
71
72 def evalExtend(ctx, extend):
73 # TODO: Deal with dict returned from evalPart from GROUP BY
74
75 for c in evalPart(ctx, extend.p):
76 try:
77 e = _eval(extend.expr, c.forget(ctx, _except=extend._vars))
78 if isinstance(e, SPARQLError):
79 raise e
80
81 yield c.merge({extend.var: e})
82
83 except SPARQLError:
84 yield c
85
86
87 def evalLazyJoin(ctx, join):
88 """
89 A lazy join will push the variables bound
90 in the first part to the second part,
91 essentially doing the join implicitly
92 hopefully evaluating much fewer triples
93 """
94 for a in evalPart(ctx, join.p1):
95 c = ctx.thaw(a)
96 for b in evalPart(c, join.p2):
97 yield b.merge(a) # merge, as some bindings may have been forgotten
98
99
100 def evalJoin(ctx, join):
101
102 # TODO: Deal with dict returned from evalPart from GROUP BY
103 # only ever for join.p1
104
105 if join.lazy:
106 return evalLazyJoin(ctx, join)
107 else:
108 a = evalPart(ctx, join.p1)
109 b = set(evalPart(ctx, join.p2))
110 return _join(a, b)
111
112
113 def evalUnion(ctx, union):
114 res = set()
115 for x in evalPart(ctx, union.p1):
116 res.add(x)
117 yield x
118 for x in evalPart(ctx, union.p2):
119 if x not in res:
120 yield x
121
122
123 def evalMinus(ctx, minus):
124 a = evalPart(ctx, minus.p1)
125 b = set(evalPart(ctx, minus.p2))
126 return _minus(a, b)
127
128
129 def evalLeftJoin(ctx, join):
130 # import pdb; pdb.set_trace()
131 for a in evalPart(ctx, join.p1):
132 ok = False
133 c = ctx.thaw(a)
134 for b in evalPart(c, join.p2):
135 if _ebv(join.expr, b.forget(ctx)):
136 ok = True
137 yield b
138 if not ok:
139 # we've cheated, the ctx above may contain
140 # vars bound outside our scope
141 # before we yield a solution without the OPTIONAL part
142 # check that we would have had no OPTIONAL matches
143 # even without prior bindings...
144 p1_vars = join.p1._vars
145 if p1_vars is None \
146 or not any(_ebv(join.expr, b) for b in
147 evalPart(ctx.thaw(a.remember(p1_vars)), join.p2)):
148
149 yield a
150
151
152 def evalFilter(ctx, part):
153 # TODO: Deal with dict returned from evalPart!
154 for c in evalPart(ctx, part.p):
155 if _ebv(part.expr, c.forget(ctx, _except=part._vars) if not part.no_isolated_scope else c):
156 yield c
157
158
159 def evalGraph(ctx, part):
160
161 if ctx.dataset is None:
162 raise Exception(
163 "Non-conjunctive-graph doesn't know about " +
164 "graphs. Try a query without GRAPH.")
165
166 ctx = ctx.clone()
167 graph = ctx[part.term]
168 if graph is None:
169
170 for graph in ctx.dataset.contexts():
171
172 # in SPARQL the default graph is NOT a named graph
173 if graph == ctx.dataset.default_context:
174 continue
175
176 c = ctx.pushGraph(graph)
177 c = c.push()
178 graphSolution = [{part.term: graph.identifier}]
179 for x in _join(evalPart(c, part.p), graphSolution):
180 yield x
181
182 else:
183 c = ctx.pushGraph(ctx.dataset.get_context(graph))
184 for x in evalPart(c, part.p):
185 yield x
186
187
188 def evalValues(ctx, part):
189 for r in part.p.res:
190 c = ctx.push()
191 try:
192 for k, v in r.items():
193 if v != 'UNDEF':
194 c[k] = v
195 except AlreadyBound:
196 continue
197
198 yield c.solution()
199
200
201 def evalMultiset(ctx, part):
202
203 if part.p.name == 'values':
204 return evalValues(ctx, part)
205
206 return evalPart(ctx, part.p)
207
208
209 def evalPart(ctx, part):
210
211 # try custom evaluation functions
212 for name, c in list(CUSTOM_EVALS.items()):
213 try:
214 return c(ctx, part)
215 except NotImplementedError:
216 pass # the given custome-function did not handle this part
217
218 if part.name == 'BGP':
219 # Reorder triples patterns by number of bound nodes in the current ctx
220 # Do patterns with more bound nodes first
221 triples = sorted(part.triples, key=lambda t: len([n for n in t if ctx[n] is None]))
222
223 return evalBGP(ctx, triples)
224 elif part.name == 'Filter':
225 return evalFilter(ctx, part)
226 elif part.name == 'Join':
227 return evalJoin(ctx, part)
228 elif part.name == 'LeftJoin':
229 return evalLeftJoin(ctx, part)
230 elif part.name == 'Graph':
231 return evalGraph(ctx, part)
232 elif part.name == 'Union':
233 return evalUnion(ctx, part)
234 elif part.name == 'ToMultiSet':
235 return evalMultiset(ctx, part)
236 elif part.name == 'Extend':
237 return evalExtend(ctx, part)
238 elif part.name == 'Minus':
239 return evalMinus(ctx, part)
240
241 elif part.name == 'Project':
242 return evalProject(ctx, part)
243 elif part.name == 'Slice':
244 return evalSlice(ctx, part)
245 elif part.name == 'Distinct':
246 return evalDistinct(ctx, part)
247 elif part.name == 'Reduced':
248 return evalReduced(ctx, part)
249
250 elif part.name == 'OrderBy':
251 return evalOrderBy(ctx, part)
252 elif part.name == 'Group':
253 return evalGroup(ctx, part)
254 elif part.name == 'AggregateJoin':
255 return evalAggregateJoin(ctx, part)
256
257 elif part.name == 'SelectQuery':
258 return evalSelectQuery(ctx, part)
259 elif part.name == 'AskQuery':
260 return evalAskQuery(ctx, part)
261 elif part.name == 'ConstructQuery':
262 return evalConstructQuery(ctx, part)
263
264 elif part.name == 'ServiceGraphPattern':
265 raise Exception('ServiceGraphPattern not implemented')
266
267 elif part.name == 'DescribeQuery':
268 raise Exception('DESCRIBE not implemented')
269
270 else:
271 # import pdb ; pdb.set_trace()
272 raise Exception('I dont know: %s' % part.name)
273
274
275 def evalGroup(ctx, group):
276
277 """
278 http://www.w3.org/TR/sparql11-query/#defn_algGroup
279 """
280 # grouping should be implemented by evalAggregateJoin
281 return evalPart(ctx, group.p)
282
283
284 def evalAggregateJoin(ctx, agg):
285 # import pdb ; pdb.set_trace()
286 p = evalPart(ctx, agg.p)
287 # p is always a Group, we always get a dict back
288
289 group_expr = agg.p.expr
290 res = collections.defaultdict(lambda: Aggregator(aggregations=agg.A))
291
292 if group_expr is None:
293 # no grouping, just COUNT in SELECT clause
294 # get 1 aggregator for counting
295 aggregator = res[True]
296 for row in p:
297 aggregator.update(row)
298 else:
299 for row in p:
300 # determine right group aggregator for row
301 k = tuple(_eval(e, row, False) for e in group_expr)
302 res[k].update(row)
303
304 # all rows are done; yield aggregated values
305 for aggregator in res.values():
306 yield FrozenBindings(ctx, aggregator.get_bindings())
307
308 # there were no matches
309 if len(res) == 0:
310 yield FrozenBindings(ctx)
311
312
313 def evalOrderBy(ctx, part):
314
315 res = evalPart(ctx, part.p)
316
317 for e in reversed(part.expr):
318
319 reverse = bool(e.order and e.order == 'DESC')
320 res = sorted(res, key=lambda x: _val(value(x, e.expr, variables=True)), reverse=reverse)
321
322 return res
323
324
325 def evalSlice(ctx, slice):
326 # import pdb; pdb.set_trace()
327 res = evalPart(ctx, slice.p)
328 i = 0
329 while i < slice.start:
330 next(res)
331 i += 1
332 i = 0
333 for x in res:
334 i += 1
335 if slice.length is None:
336 yield x
337 else:
338 if i <= slice.length:
339 yield x
340 else:
341 break
342
343
344 def evalReduced(ctx, part):
345 """apply REDUCED to result
346
347 REDUCED is not as strict as DISTINCT, but if the incoming rows were sorted
348 it should produce the same result with limited extra memory and time per
349 incoming row.
350 """
351
352 # This implementation uses a most recently used strategy and a limited
353 # buffer size. It relates to a LRU caching algorithm:
354 # https://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used_.28LRU.29
355 MAX = 1
356 # TODO: add configuration or determine "best" size for most use cases
357 # 0: No reduction
358 # 1: compare only with the last row, almost no reduction with
359 # unordered incoming rows
360 # N: The greater the buffer size the greater the reduction but more
361 # memory and time are needed
362
363 # mixed data structure: set for lookup, deque for append/pop/remove
364 mru_set = set()
365 mru_queue = collections.deque()
366
367 for row in evalPart(ctx, part.p):
368 if row in mru_set:
369 # forget last position of row
370 mru_queue.remove(row)
371 else:
372 #row seems to be new
373 yield row
374 mru_set.add(row)
375 if len(mru_set) > MAX:
376 # drop the least recently used row from buffer
377 mru_set.remove(mru_queue.pop())
378 # put row to the front
379 mru_queue.appendleft(row)
380
381
382 def evalDistinct(ctx, part):
383 res = evalPart(ctx, part.p)
384
385 done = set()
386 for x in res:
387 if x not in done:
388 yield x
389 done.add(x)
390
391
392 def evalProject(ctx, project):
393 res = evalPart(ctx, project.p)
394
395 return (row.project(project.PV) for row in res)
396
397
398 def evalSelectQuery(ctx, query):
399
400 res = {}
401 res["type_"] = "SELECT"
402 res["bindings"] = evalPart(ctx, query.p)
403 res["vars_"] = query.PV
404 return res
405
406
407 def evalAskQuery(ctx, query):
408 res = {}
409 res["type_"] = "ASK"
410 res["askAnswer"] = False
411 for x in evalPart(ctx, query.p):
412 res["askAnswer"] = True
413 break
414
415 return res
416
417
418 def evalConstructQuery(ctx, query):
419 template = query.template
420
421 if not template:
422 # a construct-where query
423 template = query.p.p.triples # query->project->bgp ...
424
425 graph = Graph()
426
427 for c in evalPart(ctx, query.p):
428 graph += _fillTemplate(template, c)
429
430 res = {}
431 res["type_"] = "CONSTRUCT"
432 res["graph"] = graph
433
434 return res
435
436
437 def evalQuery(graph, query, initBindings, base=None):
438
439 initBindings = dict( ( Variable(k),v ) for k,v in initBindings.items() )
440
441 ctx = QueryContext(graph, initBindings=initBindings)
442
443 ctx.prologue = query.prologue
444 main = query.algebra
445
446 if main.datasetClause:
447 if ctx.dataset is None:
448 raise Exception(
449 "Non-conjunctive-graph doesn't know about " +
450 "graphs! Try a query without FROM (NAMED).")
451
452 ctx = ctx.clone() # or push/pop?
453
454 firstDefault = False
455 for d in main.datasetClause:
456 if d.default:
457
458 if firstDefault:
459 # replace current default graph
460 dg = ctx.dataset.get_context(BNode())
461 ctx = ctx.pushGraph(dg)
462 firstDefault = True
463
464 ctx.load(d.default, default=True)
465
466 elif d.named:
467 g = d.named
468 ctx.load(g, default=False)
469
470 return evalPart(ctx, main)