diff env/lib/python3.7/site-packages/cwltool/cwlrdf.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/cwltool/cwlrdf.py	Thu May 14 14:56:58 2020 -0400
@@ -0,0 +1,183 @@
+from __future__ import absolute_import
+
+from typing import cast, IO, Any, Dict, MutableMapping
+
+from rdflib import Graph
+from schema_salad.jsonld_context import makerdf
+from schema_salad.ref_resolver import ContextType
+from six.moves import urllib
+from typing_extensions import Text  # pylint: disable=unused-import
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+from .process import Process
+
+
+def gather(tool, ctx):  # type: (Process, ContextType) -> Graph
+    g = Graph()
+
+    def visitor(t):  # type: (MutableMapping[Text, Any]) -> None
+        makerdf(t["id"], t, ctx, graph=g)
+
+    tool.visit(visitor)
+    return g
+
+
+def printrdf(wflow, ctx, style):  # type: (Process, ContextType, str) -> Text
+    """Serialize the CWL document into a string, ready for printing."""
+    rdf = gather(wflow, ctx).serialize(format=style, encoding='utf-8')
+    if not rdf:
+        return u""
+    return cast(Text, rdf.decode('utf-8'))
+
+
+def lastpart(uri):  # type: (Any) -> Text
+    uri2 = Text(uri)
+    if "/" in uri2:
+        return uri2[uri2.rindex("/") + 1:]
+    return uri2
+
+
+def dot_with_parameters(g, stdout):  # type: (Graph, IO[Any]) -> None
+    qres = g.query(
+        """SELECT ?step ?run ?runtype
+           WHERE {
+              ?step cwl:run ?run .
+              ?run rdf:type ?runtype .
+           }""")
+
+    for step, run, _ in qres:
+        stdout.write(u'"%s" [label="%s"]\n' % (lastpart(step), "%s (%s)" % (lastpart(step), lastpart(run))))
+
+    qres = g.query(
+        """SELECT ?step ?inp ?source
+           WHERE {
+              ?wf Workflow:steps ?step .
+              ?step cwl:inputs ?inp .
+              ?inp cwl:source ?source .
+           }""")
+
+    for step, inp, source in qres:
+        stdout.write(u'"%s" [shape=box]\n' % (lastpart(inp)))
+        stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(source), lastpart(inp), ""))
+        stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(inp), lastpart(step), ""))
+
+    qres = g.query(
+        """SELECT ?step ?out
+           WHERE {
+              ?wf Workflow:steps ?step .
+              ?step cwl:outputs ?out .
+           }""")
+
+    for step, out in qres:
+        stdout.write(u'"%s" [shape=box]\n' % (lastpart(out)))
+        stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(step), lastpart(out), ""))
+
+    qres = g.query(
+        """SELECT ?out ?source
+           WHERE {
+              ?wf cwl:outputs ?out .
+              ?out cwl:source ?source .
+           }""")
+
+    for out, source in qres:
+        stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(out)))
+        stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(source), lastpart(out), ""))
+
+    qres = g.query(
+        """SELECT ?inp
+           WHERE {
+              ?wf rdf:type cwl:Workflow .
+              ?wf cwl:inputs ?inp .
+           }""")
+
+    for (inp,) in qres:
+        stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(inp)))
+
+
+def dot_without_parameters(g, stdout):  # type: (Graph, IO[Any]) -> None
+    dotname = {}  # type: Dict[Text,Text]
+    clusternode = {}
+
+    stdout.write("compound=true\n")
+
+    subworkflows = set()
+    qres = g.query(
+        """SELECT ?run
+           WHERE {
+              ?wf rdf:type cwl:Workflow .
+              ?wf Workflow:steps ?step .
+              ?step cwl:run ?run .
+              ?run rdf:type cwl:Workflow .
+           } ORDER BY ?wf""")
+    for (run,) in qres:
+        subworkflows.add(run)
+
+    qres = g.query(
+        """SELECT ?wf ?step ?run ?runtype
+           WHERE {
+              ?wf rdf:type cwl:Workflow .
+              ?wf Workflow:steps ?step .
+              ?step cwl:run ?run .
+              ?run rdf:type ?runtype .
+           } ORDER BY ?wf""")
+
+    currentwf = None
+    for wf, step, run, runtype in qres:
+        if step not in dotname:
+            dotname[step] = lastpart(step)
+
+        if wf != currentwf:
+            if currentwf is not None:
+                stdout.write("}\n")
+            if wf in subworkflows:
+                if wf not in dotname:
+                    dotname[wf] = "cluster_" + lastpart(wf)
+                stdout.write(u'subgraph "%s" { label="%s"\n' % (dotname[wf], lastpart(wf)))
+                currentwf = wf
+                clusternode[wf] = step
+            else:
+                currentwf = None
+
+        if Text(runtype) != "https://w3id.org/cwl/cwl#Workflow":
+            stdout.write(u'"%s" [label="%s"]\n' % (dotname[step], urllib.parse.urldefrag(Text(step))[1]))
+
+    if currentwf is not None:
+        stdout.write("}\n")
+
+    qres = g.query(
+        """SELECT DISTINCT ?src ?sink ?srcrun ?sinkrun
+           WHERE {
+              ?wf1 Workflow:steps ?src .
+              ?wf2 Workflow:steps ?sink .
+              ?src cwl:out ?out .
+              ?inp cwl:source ?out .
+              ?sink cwl:in ?inp .
+              ?src cwl:run ?srcrun .
+              ?sink cwl:run ?sinkrun .
+           }""")
+
+    for src, sink, srcrun, sinkrun in qres:
+        attr = u""
+        if srcrun in clusternode:
+            attr += u'ltail="%s"' % dotname[srcrun]
+            src = clusternode[srcrun]
+        if sinkrun in clusternode:
+            attr += u' lhead="%s"' % dotname[sinkrun]
+            sink = clusternode[sinkrun]
+        stdout.write(u'"%s" -> "%s" [%s]\n' % (dotname[src], dotname[sink], attr))
+
+
+def printdot(wf, ctx, stdout, include_parameters=False):
+    # type: (Process, ContextType, Any, bool) -> None
+    g = gather(wf, ctx)
+
+    stdout.write("digraph {")
+
+    # g.namespace_manager.qname(predicate)
+
+    if include_parameters:
+        dot_with_parameters(g, stdout)
+    else:
+        dot_without_parameters(g, stdout)
+
+    stdout.write("}")