Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/subgraph.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac | 
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:26e78fe6e8c4 | 
|---|---|
| 1 import copy | |
| 2 from .utils import aslist, json_dumps | |
| 3 from collections import namedtuple | |
| 4 from typing import (Dict, MutableMapping, MutableSequence, Set, Any, Text, Optional, Tuple) | |
| 5 from .process import shortname | |
| 6 from six import itervalues | |
| 7 from six.moves import urllib | |
| 8 from .workflow import Workflow | |
| 9 from ruamel.yaml.comments import CommentedMap | |
| 10 | |
| 11 Node = namedtuple('Node', ('up', 'down', 'type')) | |
| 12 UP = "up" | |
| 13 DOWN = "down" | |
| 14 INPUT = "input" | |
| 15 OUTPUT = "output" | |
| 16 STEP = "step" | |
| 17 | |
| 18 def subgraph_visit(current, # type: Text | |
| 19 nodes, # type: MutableMapping[Text, Node] | |
| 20 visited, # type: Set[Text] | |
| 21 direction # type: Text | |
| 22 ): # type: (...) -> None | |
| 23 | |
| 24 if current in visited: | |
| 25 return | |
| 26 visited.add(current) | |
| 27 | |
| 28 if direction == DOWN: | |
| 29 d = nodes[current].down | |
| 30 if direction == UP: | |
| 31 d = nodes[current].up | |
| 32 for c in d: | |
| 33 subgraph_visit(c, nodes, visited, direction) | |
| 34 | |
| 35 def declare_node(nodes, nodeid, tp): | |
| 36 # type: (Dict[Text, Node], Text, Optional[Text]) -> Node | |
| 37 if nodeid in nodes: | |
| 38 n = nodes[nodeid] | |
| 39 if n.type is None: | |
| 40 nodes[nodeid] = Node(n.up, n.down, tp) | |
| 41 else: | |
| 42 nodes[nodeid] = Node([], [], tp) | |
| 43 return nodes[nodeid] | |
| 44 | |
| 45 def get_subgraph(roots, # type: MutableSequence[Text] | |
| 46 tool # type: Workflow | |
| 47 ): # type: (...) -> Optional[CommentedMap] | |
| 48 if tool.tool["class"] != "Workflow": | |
| 49 raise Exception("Can only extract subgraph from workflow") | |
| 50 | |
| 51 nodes = {} # type: Dict[Text, Node] | |
| 52 | |
| 53 for inp in tool.tool["inputs"]: | |
| 54 declare_node(nodes, inp["id"], INPUT) | |
| 55 | |
| 56 for out in tool.tool["outputs"]: | |
| 57 declare_node(nodes, out["id"], OUTPUT) | |
| 58 for i in aslist(out.get("outputSource", [])): | |
| 59 # source is upstream from output (dependency) | |
| 60 nodes[out["id"]].up.append(i) | |
| 61 # output is downstream from source | |
| 62 declare_node(nodes, i, None) | |
| 63 nodes[i].down.append(out["id"]) | |
| 64 | |
| 65 for st in tool.tool["steps"]: | |
| 66 step = declare_node(nodes, st["id"], STEP) | |
| 67 for i in st["in"]: | |
| 68 if "source" not in i: | |
| 69 continue | |
| 70 for src in aslist(i["source"]): | |
| 71 # source is upstream from step (dependency) | |
| 72 step.up.append(src) | |
| 73 # step is downstream from source | |
| 74 declare_node(nodes, src, None) | |
| 75 nodes[src].down.append(st["id"]) | |
| 76 for out in st["out"]: | |
| 77 # output is downstream from step | |
| 78 step.down.append(out) | |
| 79 # step is upstream from output | |
| 80 declare_node(nodes, out, None) | |
| 81 nodes[out].up.append(st["id"]) | |
| 82 | |
| 83 | |
| 84 # Find all the downstream nodes from the starting points | |
| 85 visited_down = set() # type: Set[Text] | |
| 86 for r in roots: | |
| 87 if nodes[r].type == OUTPUT: | |
| 88 subgraph_visit(r, nodes, visited_down, UP) | |
| 89 else: | |
| 90 subgraph_visit(r, nodes, visited_down, DOWN) | |
| 91 | |
| 92 def find_step(stepid): # type: (Text) -> Optional[MutableMapping[Text, Any]] | |
| 93 for st in tool.steps: | |
| 94 if st.tool["id"] == stepid: | |
| 95 return st.tool | |
| 96 return None | |
| 97 | |
| 98 # Now make sure all the nodes are connected to upstream inputs | |
| 99 visited = set() # type: Set[Text] | |
| 100 rewire = {} # type: Dict[Text, Tuple[Text, Text]] | |
| 101 for v in visited_down: | |
| 102 visited.add(v) | |
| 103 if nodes[v].type in (STEP, OUTPUT): | |
| 104 for u in nodes[v].up: | |
| 105 if u in visited_down: | |
| 106 continue | |
| 107 if nodes[u].type == INPUT: | |
| 108 visited.add(u) | |
| 109 else: | |
| 110 # rewire | |
| 111 df = urllib.parse.urldefrag(u) | |
| 112 rn = df[0] + "#" + df[1].replace("/", "_") | |
| 113 if nodes[v].type == STEP: | |
| 114 wfstep = find_step(v) | |
| 115 if wfstep is not None: | |
| 116 for inp in wfstep["inputs"]: | |
| 117 if u in inp["source"]: | |
| 118 rewire[u] = (rn, inp["type"]) | |
| 119 break | |
| 120 else: | |
| 121 raise Exception("Could not find step %s" % v) | |
| 122 | |
| 123 | |
| 124 extracted = CommentedMap() | |
| 125 for f in tool.tool: | |
| 126 if f in ("steps", "inputs", "outputs"): | |
| 127 extracted[f] = [] | |
| 128 for i in tool.tool[f]: | |
| 129 if i["id"] in visited: | |
| 130 if f == "steps": | |
| 131 for inport in i["in"]: | |
| 132 if "source" not in inport: | |
| 133 continue | |
| 134 if isinstance(inport["source"], MutableSequence): | |
| 135 inport["source"] = [rewire[s][0] for s in inport["source"] if s in rewire] | |
| 136 elif inport["source"] in rewire: | |
| 137 inport["source"] = rewire[inport["source"]][0] | |
| 138 extracted[f].append(i) | |
| 139 else: | |
| 140 extracted[f] = tool.tool[f] | |
| 141 | |
| 142 for rv in itervalues(rewire): | |
| 143 extracted["inputs"].append({ | |
| 144 "id": rv[0], | |
| 145 "type": rv[1] | |
| 146 }) | |
| 147 | |
| 148 return extracted | 
