Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/cwlrdf.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
from __future__ import absolute_import from typing import cast, IO, Any, Dict, MutableMapping from rdflib import Graph from schema_salad.jsonld_context import makerdf from schema_salad.ref_resolver import ContextType from six.moves import urllib from typing_extensions import Text # pylint: disable=unused-import # move to a regular typing import when Python 3.3-3.6 is no longer supported from .process import Process def gather(tool, ctx): # type: (Process, ContextType) -> Graph g = Graph() def visitor(t): # type: (MutableMapping[Text, Any]) -> None makerdf(t["id"], t, ctx, graph=g) tool.visit(visitor) return g def printrdf(wflow, ctx, style): # type: (Process, ContextType, str) -> Text """Serialize the CWL document into a string, ready for printing.""" rdf = gather(wflow, ctx).serialize(format=style, encoding='utf-8') if not rdf: return u"" return cast(Text, rdf.decode('utf-8')) def lastpart(uri): # type: (Any) -> Text uri2 = Text(uri) if "/" in uri2: return uri2[uri2.rindex("/") + 1:] return uri2 def dot_with_parameters(g, stdout): # type: (Graph, IO[Any]) -> None qres = g.query( """SELECT ?step ?run ?runtype WHERE { ?step cwl:run ?run . ?run rdf:type ?runtype . }""") for step, run, _ in qres: stdout.write(u'"%s" [label="%s"]\n' % (lastpart(step), "%s (%s)" % (lastpart(step), lastpart(run)))) qres = g.query( """SELECT ?step ?inp ?source WHERE { ?wf Workflow:steps ?step . ?step cwl:inputs ?inp . ?inp cwl:source ?source . }""") for step, inp, source in qres: stdout.write(u'"%s" [shape=box]\n' % (lastpart(inp))) stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(source), lastpart(inp), "")) stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(inp), lastpart(step), "")) qres = g.query( """SELECT ?step ?out WHERE { ?wf Workflow:steps ?step . ?step cwl:outputs ?out . }""") for step, out in qres: stdout.write(u'"%s" [shape=box]\n' % (lastpart(out))) stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(step), lastpart(out), "")) qres = g.query( """SELECT ?out ?source WHERE { ?wf cwl:outputs ?out . ?out cwl:source ?source . }""") for out, source in qres: stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(out))) stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(source), lastpart(out), "")) qres = g.query( """SELECT ?inp WHERE { ?wf rdf:type cwl:Workflow . ?wf cwl:inputs ?inp . }""") for (inp,) in qres: stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(inp))) def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None dotname = {} # type: Dict[Text,Text] clusternode = {} stdout.write("compound=true\n") subworkflows = set() qres = g.query( """SELECT ?run WHERE { ?wf rdf:type cwl:Workflow . ?wf Workflow:steps ?step . ?step cwl:run ?run . ?run rdf:type cwl:Workflow . } ORDER BY ?wf""") for (run,) in qres: subworkflows.add(run) qres = g.query( """SELECT ?wf ?step ?run ?runtype WHERE { ?wf rdf:type cwl:Workflow . ?wf Workflow:steps ?step . ?step cwl:run ?run . ?run rdf:type ?runtype . } ORDER BY ?wf""") currentwf = None for wf, step, run, runtype in qres: if step not in dotname: dotname[step] = lastpart(step) if wf != currentwf: if currentwf is not None: stdout.write("}\n") if wf in subworkflows: if wf not in dotname: dotname[wf] = "cluster_" + lastpart(wf) stdout.write(u'subgraph "%s" { label="%s"\n' % (dotname[wf], lastpart(wf))) currentwf = wf clusternode[wf] = step else: currentwf = None if Text(runtype) != "https://w3id.org/cwl/cwl#Workflow": stdout.write(u'"%s" [label="%s"]\n' % (dotname[step], urllib.parse.urldefrag(Text(step))[1])) if currentwf is not None: stdout.write("}\n") qres = g.query( """SELECT DISTINCT ?src ?sink ?srcrun ?sinkrun WHERE { ?wf1 Workflow:steps ?src . ?wf2 Workflow:steps ?sink . ?src cwl:out ?out . ?inp cwl:source ?out . ?sink cwl:in ?inp . ?src cwl:run ?srcrun . ?sink cwl:run ?sinkrun . }""") for src, sink, srcrun, sinkrun in qres: attr = u"" if srcrun in clusternode: attr += u'ltail="%s"' % dotname[srcrun] src = clusternode[srcrun] if sinkrun in clusternode: attr += u' lhead="%s"' % dotname[sinkrun] sink = clusternode[sinkrun] stdout.write(u'"%s" -> "%s" [%s]\n' % (dotname[src], dotname[sink], attr)) def printdot(wf, ctx, stdout, include_parameters=False): # type: (Process, ContextType, Any, bool) -> None g = gather(wf, ctx) stdout.write("digraph {") # g.namespace_manager.qname(predicate) if include_parameters: dot_with_parameters(g, stdout) else: dot_without_parameters(g, stdout) stdout.write("}")