Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/cwlrdf.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/cwlrdf.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,183 +0,0 @@ -from __future__ import absolute_import - -from typing import cast, IO, Any, Dict, MutableMapping - -from rdflib import Graph -from schema_salad.jsonld_context import makerdf -from schema_salad.ref_resolver import ContextType -from six.moves import urllib -from typing_extensions import Text # pylint: disable=unused-import -# move to a regular typing import when Python 3.3-3.6 is no longer supported - -from .process import Process - - -def gather(tool, ctx): # type: (Process, ContextType) -> Graph - g = Graph() - - def visitor(t): # type: (MutableMapping[Text, Any]) -> None - makerdf(t["id"], t, ctx, graph=g) - - tool.visit(visitor) - return g - - -def printrdf(wflow, ctx, style): # type: (Process, ContextType, str) -> Text - """Serialize the CWL document into a string, ready for printing.""" - rdf = gather(wflow, ctx).serialize(format=style, encoding='utf-8') - if not rdf: - return u"" - return cast(Text, rdf.decode('utf-8')) - - -def lastpart(uri): # type: (Any) -> Text - uri2 = Text(uri) - if "/" in uri2: - return uri2[uri2.rindex("/") + 1:] - return uri2 - - -def dot_with_parameters(g, stdout): # type: (Graph, IO[Any]) -> None - qres = g.query( - """SELECT ?step ?run ?runtype - WHERE { - ?step cwl:run ?run . - ?run rdf:type ?runtype . - }""") - - for step, run, _ in qres: - stdout.write(u'"%s" [label="%s"]\n' % (lastpart(step), "%s (%s)" % (lastpart(step), lastpart(run)))) - - qres = g.query( - """SELECT ?step ?inp ?source - WHERE { - ?wf Workflow:steps ?step . - ?step cwl:inputs ?inp . - ?inp cwl:source ?source . - }""") - - for step, inp, source in qres: - stdout.write(u'"%s" [shape=box]\n' % (lastpart(inp))) - stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(source), lastpart(inp), "")) - stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(inp), lastpart(step), "")) - - qres = g.query( - """SELECT ?step ?out - WHERE { - ?wf Workflow:steps ?step . - ?step cwl:outputs ?out . - }""") - - for step, out in qres: - stdout.write(u'"%s" [shape=box]\n' % (lastpart(out))) - stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(step), lastpart(out), "")) - - qres = g.query( - """SELECT ?out ?source - WHERE { - ?wf cwl:outputs ?out . - ?out cwl:source ?source . - }""") - - for out, source in qres: - stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(out))) - stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(source), lastpart(out), "")) - - qres = g.query( - """SELECT ?inp - WHERE { - ?wf rdf:type cwl:Workflow . - ?wf cwl:inputs ?inp . - }""") - - for (inp,) in qres: - stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(inp))) - - -def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None - dotname = {} # type: Dict[Text,Text] - clusternode = {} - - stdout.write("compound=true\n") - - subworkflows = set() - qres = g.query( - """SELECT ?run - WHERE { - ?wf rdf:type cwl:Workflow . - ?wf Workflow:steps ?step . - ?step cwl:run ?run . - ?run rdf:type cwl:Workflow . - } ORDER BY ?wf""") - for (run,) in qres: - subworkflows.add(run) - - qres = g.query( - """SELECT ?wf ?step ?run ?runtype - WHERE { - ?wf rdf:type cwl:Workflow . - ?wf Workflow:steps ?step . - ?step cwl:run ?run . - ?run rdf:type ?runtype . - } ORDER BY ?wf""") - - currentwf = None - for wf, step, run, runtype in qres: - if step not in dotname: - dotname[step] = lastpart(step) - - if wf != currentwf: - if currentwf is not None: - stdout.write("}\n") - if wf in subworkflows: - if wf not in dotname: - dotname[wf] = "cluster_" + lastpart(wf) - stdout.write(u'subgraph "%s" { label="%s"\n' % (dotname[wf], lastpart(wf))) - currentwf = wf - clusternode[wf] = step - else: - currentwf = None - - if Text(runtype) != "https://w3id.org/cwl/cwl#Workflow": - stdout.write(u'"%s" [label="%s"]\n' % (dotname[step], urllib.parse.urldefrag(Text(step))[1])) - - if currentwf is not None: - stdout.write("}\n") - - qres = g.query( - """SELECT DISTINCT ?src ?sink ?srcrun ?sinkrun - WHERE { - ?wf1 Workflow:steps ?src . - ?wf2 Workflow:steps ?sink . - ?src cwl:out ?out . - ?inp cwl:source ?out . - ?sink cwl:in ?inp . - ?src cwl:run ?srcrun . - ?sink cwl:run ?sinkrun . - }""") - - for src, sink, srcrun, sinkrun in qres: - attr = u"" - if srcrun in clusternode: - attr += u'ltail="%s"' % dotname[srcrun] - src = clusternode[srcrun] - if sinkrun in clusternode: - attr += u' lhead="%s"' % dotname[sinkrun] - sink = clusternode[sinkrun] - stdout.write(u'"%s" -> "%s" [%s]\n' % (dotname[src], dotname[sink], attr)) - - -def printdot(wf, ctx, stdout, include_parameters=False): - # type: (Process, ContextType, Any, bool) -> None - g = gather(wf, ctx) - - stdout.write("digraph {") - - # g.namespace_manager.qname(predicate) - - if include_parameters: - dot_with_parameters(g, stdout) - else: - dot_without_parameters(g, stdout) - - stdout.write("}")