Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/subgraph.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/cwltool/subgraph.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,148 @@ +import copy +from .utils import aslist, json_dumps +from collections import namedtuple +from typing import (Dict, MutableMapping, MutableSequence, Set, Any, Text, Optional, Tuple) +from .process import shortname +from six import itervalues +from six.moves import urllib +from .workflow import Workflow +from ruamel.yaml.comments import CommentedMap + +Node = namedtuple('Node', ('up', 'down', 'type')) +UP = "up" +DOWN = "down" +INPUT = "input" +OUTPUT = "output" +STEP = "step" + +def subgraph_visit(current, # type: Text + nodes, # type: MutableMapping[Text, Node] + visited, # type: Set[Text] + direction # type: Text +): # type: (...) -> None + + if current in visited: + return + visited.add(current) + + if direction == DOWN: + d = nodes[current].down + if direction == UP: + d = nodes[current].up + for c in d: + subgraph_visit(c, nodes, visited, direction) + +def declare_node(nodes, nodeid, tp): + # type: (Dict[Text, Node], Text, Optional[Text]) -> Node + if nodeid in nodes: + n = nodes[nodeid] + if n.type is None: + nodes[nodeid] = Node(n.up, n.down, tp) + else: + nodes[nodeid] = Node([], [], tp) + return nodes[nodeid] + +def get_subgraph(roots, # type: MutableSequence[Text] + tool # type: Workflow + ): # type: (...) -> Optional[CommentedMap] + if tool.tool["class"] != "Workflow": + raise Exception("Can only extract subgraph from workflow") + + nodes = {} # type: Dict[Text, Node] + + for inp in tool.tool["inputs"]: + declare_node(nodes, inp["id"], INPUT) + + for out in tool.tool["outputs"]: + declare_node(nodes, out["id"], OUTPUT) + for i in aslist(out.get("outputSource", [])): + # source is upstream from output (dependency) + nodes[out["id"]].up.append(i) + # output is downstream from source + declare_node(nodes, i, None) + nodes[i].down.append(out["id"]) + + for st in tool.tool["steps"]: + step = declare_node(nodes, st["id"], STEP) + for i in st["in"]: + if "source" not in i: + continue + for src in aslist(i["source"]): + # source is upstream from step (dependency) + step.up.append(src) + # step is downstream from source + declare_node(nodes, src, None) + nodes[src].down.append(st["id"]) + for out in st["out"]: + # output is downstream from step + step.down.append(out) + # step is upstream from output + declare_node(nodes, out, None) + nodes[out].up.append(st["id"]) + + + # Find all the downstream nodes from the starting points + visited_down = set() # type: Set[Text] + for r in roots: + if nodes[r].type == OUTPUT: + subgraph_visit(r, nodes, visited_down, UP) + else: + subgraph_visit(r, nodes, visited_down, DOWN) + + def find_step(stepid): # type: (Text) -> Optional[MutableMapping[Text, Any]] + for st in tool.steps: + if st.tool["id"] == stepid: + return st.tool + return None + + # Now make sure all the nodes are connected to upstream inputs + visited = set() # type: Set[Text] + rewire = {} # type: Dict[Text, Tuple[Text, Text]] + for v in visited_down: + visited.add(v) + if nodes[v].type in (STEP, OUTPUT): + for u in nodes[v].up: + if u in visited_down: + continue + if nodes[u].type == INPUT: + visited.add(u) + else: + # rewire + df = urllib.parse.urldefrag(u) + rn = df[0] + "#" + df[1].replace("/", "_") + if nodes[v].type == STEP: + wfstep = find_step(v) + if wfstep is not None: + for inp in wfstep["inputs"]: + if u in inp["source"]: + rewire[u] = (rn, inp["type"]) + break + else: + raise Exception("Could not find step %s" % v) + + + extracted = CommentedMap() + for f in tool.tool: + if f in ("steps", "inputs", "outputs"): + extracted[f] = [] + for i in tool.tool[f]: + if i["id"] in visited: + if f == "steps": + for inport in i["in"]: + if "source" not in inport: + continue + if isinstance(inport["source"], MutableSequence): + inport["source"] = [rewire[s][0] for s in inport["source"] if s in rewire] + elif inport["source"] in rewire: + inport["source"] = rewire[inport["source"]][0] + extracted[f].append(i) + else: + extracted[f] = tool.tool[f] + + for rv in itervalues(rewire): + extracted["inputs"].append({ + "id": rv[0], + "type": rv[1] + }) + + return extracted