comparison env/lib/python3.7/site-packages/cwltool/subgraph.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 import copy
2 from .utils import aslist, json_dumps
3 from collections import namedtuple
4 from typing import (Dict, MutableMapping, MutableSequence, Set, Any, Text, Optional, Tuple)
5 from .process import shortname
6 from six import itervalues
7 from six.moves import urllib
8 from .workflow import Workflow
9 from ruamel.yaml.comments import CommentedMap
10
11 Node = namedtuple('Node', ('up', 'down', 'type'))
12 UP = "up"
13 DOWN = "down"
14 INPUT = "input"
15 OUTPUT = "output"
16 STEP = "step"
17
18 def subgraph_visit(current, # type: Text
19 nodes, # type: MutableMapping[Text, Node]
20 visited, # type: Set[Text]
21 direction # type: Text
22 ): # type: (...) -> None
23
24 if current in visited:
25 return
26 visited.add(current)
27
28 if direction == DOWN:
29 d = nodes[current].down
30 if direction == UP:
31 d = nodes[current].up
32 for c in d:
33 subgraph_visit(c, nodes, visited, direction)
34
35 def declare_node(nodes, nodeid, tp):
36 # type: (Dict[Text, Node], Text, Optional[Text]) -> Node
37 if nodeid in nodes:
38 n = nodes[nodeid]
39 if n.type is None:
40 nodes[nodeid] = Node(n.up, n.down, tp)
41 else:
42 nodes[nodeid] = Node([], [], tp)
43 return nodes[nodeid]
44
45 def get_subgraph(roots, # type: MutableSequence[Text]
46 tool # type: Workflow
47 ): # type: (...) -> Optional[CommentedMap]
48 if tool.tool["class"] != "Workflow":
49 raise Exception("Can only extract subgraph from workflow")
50
51 nodes = {} # type: Dict[Text, Node]
52
53 for inp in tool.tool["inputs"]:
54 declare_node(nodes, inp["id"], INPUT)
55
56 for out in tool.tool["outputs"]:
57 declare_node(nodes, out["id"], OUTPUT)
58 for i in aslist(out.get("outputSource", [])):
59 # source is upstream from output (dependency)
60 nodes[out["id"]].up.append(i)
61 # output is downstream from source
62 declare_node(nodes, i, None)
63 nodes[i].down.append(out["id"])
64
65 for st in tool.tool["steps"]:
66 step = declare_node(nodes, st["id"], STEP)
67 for i in st["in"]:
68 if "source" not in i:
69 continue
70 for src in aslist(i["source"]):
71 # source is upstream from step (dependency)
72 step.up.append(src)
73 # step is downstream from source
74 declare_node(nodes, src, None)
75 nodes[src].down.append(st["id"])
76 for out in st["out"]:
77 # output is downstream from step
78 step.down.append(out)
79 # step is upstream from output
80 declare_node(nodes, out, None)
81 nodes[out].up.append(st["id"])
82
83
84 # Find all the downstream nodes from the starting points
85 visited_down = set() # type: Set[Text]
86 for r in roots:
87 if nodes[r].type == OUTPUT:
88 subgraph_visit(r, nodes, visited_down, UP)
89 else:
90 subgraph_visit(r, nodes, visited_down, DOWN)
91
92 def find_step(stepid): # type: (Text) -> Optional[MutableMapping[Text, Any]]
93 for st in tool.steps:
94 if st.tool["id"] == stepid:
95 return st.tool
96 return None
97
98 # Now make sure all the nodes are connected to upstream inputs
99 visited = set() # type: Set[Text]
100 rewire = {} # type: Dict[Text, Tuple[Text, Text]]
101 for v in visited_down:
102 visited.add(v)
103 if nodes[v].type in (STEP, OUTPUT):
104 for u in nodes[v].up:
105 if u in visited_down:
106 continue
107 if nodes[u].type == INPUT:
108 visited.add(u)
109 else:
110 # rewire
111 df = urllib.parse.urldefrag(u)
112 rn = df[0] + "#" + df[1].replace("/", "_")
113 if nodes[v].type == STEP:
114 wfstep = find_step(v)
115 if wfstep is not None:
116 for inp in wfstep["inputs"]:
117 if u in inp["source"]:
118 rewire[u] = (rn, inp["type"])
119 break
120 else:
121 raise Exception("Could not find step %s" % v)
122
123
124 extracted = CommentedMap()
125 for f in tool.tool:
126 if f in ("steps", "inputs", "outputs"):
127 extracted[f] = []
128 for i in tool.tool[f]:
129 if i["id"] in visited:
130 if f == "steps":
131 for inport in i["in"]:
132 if "source" not in inport:
133 continue
134 if isinstance(inport["source"], MutableSequence):
135 inport["source"] = [rewire[s][0] for s in inport["source"] if s in rewire]
136 elif inport["source"] in rewire:
137 inport["source"] = rewire[inport["source"]][0]
138 extracted[f].append(i)
139 else:
140 extracted[f] = tool.tool[f]
141
142 for rv in itervalues(rewire):
143 extracted["inputs"].append({
144 "id": rv[0],
145 "type": rv[1]
146 })
147
148 return extracted