comparison env/lib/python3.7/site-packages/cwltool/cwlrdf.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400 (2020-05-14)
parents 26e78fe6e8c4
children
comparison
equal deleted inserted replaced
1:75ca89e9b81c 2:6af9afd405e9
1 from __future__ import absolute_import
2
3 from typing import cast, IO, Any, Dict, MutableMapping
4
5 from rdflib import Graph
6 from schema_salad.jsonld_context import makerdf
7 from schema_salad.ref_resolver import ContextType
8 from six.moves import urllib
9 from typing_extensions import Text # pylint: disable=unused-import
10 # move to a regular typing import when Python 3.3-3.6 is no longer supported
11
12 from .process import Process
13
14
15 def gather(tool, ctx): # type: (Process, ContextType) -> Graph
16 g = Graph()
17
18 def visitor(t): # type: (MutableMapping[Text, Any]) -> None
19 makerdf(t["id"], t, ctx, graph=g)
20
21 tool.visit(visitor)
22 return g
23
24
25 def printrdf(wflow, ctx, style): # type: (Process, ContextType, str) -> Text
26 """Serialize the CWL document into a string, ready for printing."""
27 rdf = gather(wflow, ctx).serialize(format=style, encoding='utf-8')
28 if not rdf:
29 return u""
30 return cast(Text, rdf.decode('utf-8'))
31
32
33 def lastpart(uri): # type: (Any) -> Text
34 uri2 = Text(uri)
35 if "/" in uri2:
36 return uri2[uri2.rindex("/") + 1:]
37 return uri2
38
39
40 def dot_with_parameters(g, stdout): # type: (Graph, IO[Any]) -> None
41 qres = g.query(
42 """SELECT ?step ?run ?runtype
43 WHERE {
44 ?step cwl:run ?run .
45 ?run rdf:type ?runtype .
46 }""")
47
48 for step, run, _ in qres:
49 stdout.write(u'"%s" [label="%s"]\n' % (lastpart(step), "%s (%s)" % (lastpart(step), lastpart(run))))
50
51 qres = g.query(
52 """SELECT ?step ?inp ?source
53 WHERE {
54 ?wf Workflow:steps ?step .
55 ?step cwl:inputs ?inp .
56 ?inp cwl:source ?source .
57 }""")
58
59 for step, inp, source in qres:
60 stdout.write(u'"%s" [shape=box]\n' % (lastpart(inp)))
61 stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(source), lastpart(inp), ""))
62 stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(inp), lastpart(step), ""))
63
64 qres = g.query(
65 """SELECT ?step ?out
66 WHERE {
67 ?wf Workflow:steps ?step .
68 ?step cwl:outputs ?out .
69 }""")
70
71 for step, out in qres:
72 stdout.write(u'"%s" [shape=box]\n' % (lastpart(out)))
73 stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(step), lastpart(out), ""))
74
75 qres = g.query(
76 """SELECT ?out ?source
77 WHERE {
78 ?wf cwl:outputs ?out .
79 ?out cwl:source ?source .
80 }""")
81
82 for out, source in qres:
83 stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(out)))
84 stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(source), lastpart(out), ""))
85
86 qres = g.query(
87 """SELECT ?inp
88 WHERE {
89 ?wf rdf:type cwl:Workflow .
90 ?wf cwl:inputs ?inp .
91 }""")
92
93 for (inp,) in qres:
94 stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(inp)))
95
96
97 def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None
98 dotname = {} # type: Dict[Text,Text]
99 clusternode = {}
100
101 stdout.write("compound=true\n")
102
103 subworkflows = set()
104 qres = g.query(
105 """SELECT ?run
106 WHERE {
107 ?wf rdf:type cwl:Workflow .
108 ?wf Workflow:steps ?step .
109 ?step cwl:run ?run .
110 ?run rdf:type cwl:Workflow .
111 } ORDER BY ?wf""")
112 for (run,) in qres:
113 subworkflows.add(run)
114
115 qres = g.query(
116 """SELECT ?wf ?step ?run ?runtype
117 WHERE {
118 ?wf rdf:type cwl:Workflow .
119 ?wf Workflow:steps ?step .
120 ?step cwl:run ?run .
121 ?run rdf:type ?runtype .
122 } ORDER BY ?wf""")
123
124 currentwf = None
125 for wf, step, run, runtype in qres:
126 if step not in dotname:
127 dotname[step] = lastpart(step)
128
129 if wf != currentwf:
130 if currentwf is not None:
131 stdout.write("}\n")
132 if wf in subworkflows:
133 if wf not in dotname:
134 dotname[wf] = "cluster_" + lastpart(wf)
135 stdout.write(u'subgraph "%s" { label="%s"\n' % (dotname[wf], lastpart(wf)))
136 currentwf = wf
137 clusternode[wf] = step
138 else:
139 currentwf = None
140
141 if Text(runtype) != "https://w3id.org/cwl/cwl#Workflow":
142 stdout.write(u'"%s" [label="%s"]\n' % (dotname[step], urllib.parse.urldefrag(Text(step))[1]))
143
144 if currentwf is not None:
145 stdout.write("}\n")
146
147 qres = g.query(
148 """SELECT DISTINCT ?src ?sink ?srcrun ?sinkrun
149 WHERE {
150 ?wf1 Workflow:steps ?src .
151 ?wf2 Workflow:steps ?sink .
152 ?src cwl:out ?out .
153 ?inp cwl:source ?out .
154 ?sink cwl:in ?inp .
155 ?src cwl:run ?srcrun .
156 ?sink cwl:run ?sinkrun .
157 }""")
158
159 for src, sink, srcrun, sinkrun in qres:
160 attr = u""
161 if srcrun in clusternode:
162 attr += u'ltail="%s"' % dotname[srcrun]
163 src = clusternode[srcrun]
164 if sinkrun in clusternode:
165 attr += u' lhead="%s"' % dotname[sinkrun]
166 sink = clusternode[sinkrun]
167 stdout.write(u'"%s" -> "%s" [%s]\n' % (dotname[src], dotname[sink], attr))
168
169
170 def printdot(wf, ctx, stdout, include_parameters=False):
171 # type: (Process, ContextType, Any, bool) -> None
172 g = gather(wf, ctx)
173
174 stdout.write("digraph {")
175
176 # g.namespace_manager.qname(predicate)
177
178 if include_parameters:
179 dot_with_parameters(g, stdout)
180 else:
181 dot_without_parameters(g, stdout)
182
183 stdout.write("}")