Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/rdflib/plugins/sparql/evaluate.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/rdflib/plugins/sparql/evaluate.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,470 @@ +""" +These method recursively evaluate the SPARQL Algebra + +evalQuery is the entry-point, it will setup context and +return the SPARQLResult object + +evalPart is called on each level and will delegate to the right method + +A rdflib.plugins.sparql.sparql.QueryContext is passed along, keeping +information needed for evaluation + +A list of dicts (solution mappings) is returned, apart from GroupBy which may +also return a dict of list of dicts + +""" + +import collections + +from rdflib import Variable, Graph, BNode, URIRef, Literal + +from rdflib.plugins.sparql import CUSTOM_EVALS +from rdflib.plugins.sparql.parserutils import value +from rdflib.plugins.sparql.sparql import ( + QueryContext, AlreadyBound, FrozenBindings, SPARQLError) +from rdflib.plugins.sparql.evalutils import ( + _filter, _eval, _join, _diff, _minus, _fillTemplate, _ebv, _val) + +from rdflib.plugins.sparql.aggregates import Aggregator +from rdflib.plugins.sparql.algebra import Join, ToMultiSet, Values + +def evalBGP(ctx, bgp): + + """ + A basic graph pattern + """ + + if not bgp: + yield ctx.solution() + return + + s, p, o = bgp[0] + + _s = ctx[s] + _p = ctx[p] + _o = ctx[o] + + for ss, sp, so in ctx.graph.triples((_s, _p, _o)): + if None in (_s, _p, _o): + c = ctx.push() + else: + c = ctx + + if _s is None: + c[s] = ss + + try: + if _p is None: + c[p] = sp + except AlreadyBound: + continue + + try: + if _o is None: + c[o] = so + except AlreadyBound: + continue + + for x in evalBGP(c, bgp[1:]): + yield x + + +def evalExtend(ctx, extend): + # TODO: Deal with dict returned from evalPart from GROUP BY + + for c in evalPart(ctx, extend.p): + try: + e = _eval(extend.expr, c.forget(ctx, _except=extend._vars)) + if isinstance(e, SPARQLError): + raise e + + yield c.merge({extend.var: e}) + + except SPARQLError: + yield c + + +def evalLazyJoin(ctx, join): + """ + A lazy join will push the variables bound + in the first part to the second part, + essentially doing the join implicitly + hopefully evaluating much fewer triples + """ + for a in evalPart(ctx, join.p1): + c = ctx.thaw(a) + for b in evalPart(c, join.p2): + yield b.merge(a) # merge, as some bindings may have been forgotten + + +def evalJoin(ctx, join): + + # TODO: Deal with dict returned from evalPart from GROUP BY + # only ever for join.p1 + + if join.lazy: + return evalLazyJoin(ctx, join) + else: + a = evalPart(ctx, join.p1) + b = set(evalPart(ctx, join.p2)) + return _join(a, b) + + +def evalUnion(ctx, union): + res = set() + for x in evalPart(ctx, union.p1): + res.add(x) + yield x + for x in evalPart(ctx, union.p2): + if x not in res: + yield x + + +def evalMinus(ctx, minus): + a = evalPart(ctx, minus.p1) + b = set(evalPart(ctx, minus.p2)) + return _minus(a, b) + + +def evalLeftJoin(ctx, join): + # import pdb; pdb.set_trace() + for a in evalPart(ctx, join.p1): + ok = False + c = ctx.thaw(a) + for b in evalPart(c, join.p2): + if _ebv(join.expr, b.forget(ctx)): + ok = True + yield b + if not ok: + # we've cheated, the ctx above may contain + # vars bound outside our scope + # before we yield a solution without the OPTIONAL part + # check that we would have had no OPTIONAL matches + # even without prior bindings... + p1_vars = join.p1._vars + if p1_vars is None \ + or not any(_ebv(join.expr, b) for b in + evalPart(ctx.thaw(a.remember(p1_vars)), join.p2)): + + yield a + + +def evalFilter(ctx, part): + # TODO: Deal with dict returned from evalPart! + for c in evalPart(ctx, part.p): + if _ebv(part.expr, c.forget(ctx, _except=part._vars) if not part.no_isolated_scope else c): + yield c + + +def evalGraph(ctx, part): + + if ctx.dataset is None: + raise Exception( + "Non-conjunctive-graph doesn't know about " + + "graphs. Try a query without GRAPH.") + + ctx = ctx.clone() + graph = ctx[part.term] + if graph is None: + + for graph in ctx.dataset.contexts(): + + # in SPARQL the default graph is NOT a named graph + if graph == ctx.dataset.default_context: + continue + + c = ctx.pushGraph(graph) + c = c.push() + graphSolution = [{part.term: graph.identifier}] + for x in _join(evalPart(c, part.p), graphSolution): + yield x + + else: + c = ctx.pushGraph(ctx.dataset.get_context(graph)) + for x in evalPart(c, part.p): + yield x + + +def evalValues(ctx, part): + for r in part.p.res: + c = ctx.push() + try: + for k, v in r.items(): + if v != 'UNDEF': + c[k] = v + except AlreadyBound: + continue + + yield c.solution() + + +def evalMultiset(ctx, part): + + if part.p.name == 'values': + return evalValues(ctx, part) + + return evalPart(ctx, part.p) + + +def evalPart(ctx, part): + + # try custom evaluation functions + for name, c in list(CUSTOM_EVALS.items()): + try: + return c(ctx, part) + except NotImplementedError: + pass # the given custome-function did not handle this part + + if part.name == 'BGP': + # Reorder triples patterns by number of bound nodes in the current ctx + # Do patterns with more bound nodes first + triples = sorted(part.triples, key=lambda t: len([n for n in t if ctx[n] is None])) + + return evalBGP(ctx, triples) + elif part.name == 'Filter': + return evalFilter(ctx, part) + elif part.name == 'Join': + return evalJoin(ctx, part) + elif part.name == 'LeftJoin': + return evalLeftJoin(ctx, part) + elif part.name == 'Graph': + return evalGraph(ctx, part) + elif part.name == 'Union': + return evalUnion(ctx, part) + elif part.name == 'ToMultiSet': + return evalMultiset(ctx, part) + elif part.name == 'Extend': + return evalExtend(ctx, part) + elif part.name == 'Minus': + return evalMinus(ctx, part) + + elif part.name == 'Project': + return evalProject(ctx, part) + elif part.name == 'Slice': + return evalSlice(ctx, part) + elif part.name == 'Distinct': + return evalDistinct(ctx, part) + elif part.name == 'Reduced': + return evalReduced(ctx, part) + + elif part.name == 'OrderBy': + return evalOrderBy(ctx, part) + elif part.name == 'Group': + return evalGroup(ctx, part) + elif part.name == 'AggregateJoin': + return evalAggregateJoin(ctx, part) + + elif part.name == 'SelectQuery': + return evalSelectQuery(ctx, part) + elif part.name == 'AskQuery': + return evalAskQuery(ctx, part) + elif part.name == 'ConstructQuery': + return evalConstructQuery(ctx, part) + + elif part.name == 'ServiceGraphPattern': + raise Exception('ServiceGraphPattern not implemented') + + elif part.name == 'DescribeQuery': + raise Exception('DESCRIBE not implemented') + + else: + # import pdb ; pdb.set_trace() + raise Exception('I dont know: %s' % part.name) + + +def evalGroup(ctx, group): + + """ + http://www.w3.org/TR/sparql11-query/#defn_algGroup + """ + # grouping should be implemented by evalAggregateJoin + return evalPart(ctx, group.p) + + +def evalAggregateJoin(ctx, agg): + # import pdb ; pdb.set_trace() + p = evalPart(ctx, agg.p) + # p is always a Group, we always get a dict back + + group_expr = agg.p.expr + res = collections.defaultdict(lambda: Aggregator(aggregations=agg.A)) + + if group_expr is None: + # no grouping, just COUNT in SELECT clause + # get 1 aggregator for counting + aggregator = res[True] + for row in p: + aggregator.update(row) + else: + for row in p: + # determine right group aggregator for row + k = tuple(_eval(e, row, False) for e in group_expr) + res[k].update(row) + + # all rows are done; yield aggregated values + for aggregator in res.values(): + yield FrozenBindings(ctx, aggregator.get_bindings()) + + # there were no matches + if len(res) == 0: + yield FrozenBindings(ctx) + + +def evalOrderBy(ctx, part): + + res = evalPart(ctx, part.p) + + for e in reversed(part.expr): + + reverse = bool(e.order and e.order == 'DESC') + res = sorted(res, key=lambda x: _val(value(x, e.expr, variables=True)), reverse=reverse) + + return res + + +def evalSlice(ctx, slice): + # import pdb; pdb.set_trace() + res = evalPart(ctx, slice.p) + i = 0 + while i < slice.start: + next(res) + i += 1 + i = 0 + for x in res: + i += 1 + if slice.length is None: + yield x + else: + if i <= slice.length: + yield x + else: + break + + +def evalReduced(ctx, part): + """apply REDUCED to result + + REDUCED is not as strict as DISTINCT, but if the incoming rows were sorted + it should produce the same result with limited extra memory and time per + incoming row. + """ + + # This implementation uses a most recently used strategy and a limited + # buffer size. It relates to a LRU caching algorithm: + # https://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used_.28LRU.29 + MAX = 1 + # TODO: add configuration or determine "best" size for most use cases + # 0: No reduction + # 1: compare only with the last row, almost no reduction with + # unordered incoming rows + # N: The greater the buffer size the greater the reduction but more + # memory and time are needed + + # mixed data structure: set for lookup, deque for append/pop/remove + mru_set = set() + mru_queue = collections.deque() + + for row in evalPart(ctx, part.p): + if row in mru_set: + # forget last position of row + mru_queue.remove(row) + else: + #row seems to be new + yield row + mru_set.add(row) + if len(mru_set) > MAX: + # drop the least recently used row from buffer + mru_set.remove(mru_queue.pop()) + # put row to the front + mru_queue.appendleft(row) + + +def evalDistinct(ctx, part): + res = evalPart(ctx, part.p) + + done = set() + for x in res: + if x not in done: + yield x + done.add(x) + + +def evalProject(ctx, project): + res = evalPart(ctx, project.p) + + return (row.project(project.PV) for row in res) + + +def evalSelectQuery(ctx, query): + + res = {} + res["type_"] = "SELECT" + res["bindings"] = evalPart(ctx, query.p) + res["vars_"] = query.PV + return res + + +def evalAskQuery(ctx, query): + res = {} + res["type_"] = "ASK" + res["askAnswer"] = False + for x in evalPart(ctx, query.p): + res["askAnswer"] = True + break + + return res + + +def evalConstructQuery(ctx, query): + template = query.template + + if not template: + # a construct-where query + template = query.p.p.triples # query->project->bgp ... + + graph = Graph() + + for c in evalPart(ctx, query.p): + graph += _fillTemplate(template, c) + + res = {} + res["type_"] = "CONSTRUCT" + res["graph"] = graph + + return res + + +def evalQuery(graph, query, initBindings, base=None): + + initBindings = dict( ( Variable(k),v ) for k,v in initBindings.items() ) + + ctx = QueryContext(graph, initBindings=initBindings) + + ctx.prologue = query.prologue + main = query.algebra + + if main.datasetClause: + if ctx.dataset is None: + raise Exception( + "Non-conjunctive-graph doesn't know about " + + "graphs! Try a query without FROM (NAMED).") + + ctx = ctx.clone() # or push/pop? + + firstDefault = False + for d in main.datasetClause: + if d.default: + + if firstDefault: + # replace current default graph + dg = ctx.dataset.get_context(BNode()) + ctx = ctx.pushGraph(dg) + firstDefault = True + + ctx.load(d.default, default=True) + + elif d.named: + g = d.named + ctx.load(g, default=False) + + return evalPart(ctx, main)