Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/subgraph.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line source
import copy from .utils import aslist, json_dumps from collections import namedtuple from typing import (Dict, MutableMapping, MutableSequence, Set, Any, Text, Optional, Tuple) from .process import shortname from six import itervalues from six.moves import urllib from .workflow import Workflow from ruamel.yaml.comments import CommentedMap Node = namedtuple('Node', ('up', 'down', 'type')) UP = "up" DOWN = "down" INPUT = "input" OUTPUT = "output" STEP = "step" def subgraph_visit(current, # type: Text nodes, # type: MutableMapping[Text, Node] visited, # type: Set[Text] direction # type: Text ): # type: (...) -> None if current in visited: return visited.add(current) if direction == DOWN: d = nodes[current].down if direction == UP: d = nodes[current].up for c in d: subgraph_visit(c, nodes, visited, direction) def declare_node(nodes, nodeid, tp): # type: (Dict[Text, Node], Text, Optional[Text]) -> Node if nodeid in nodes: n = nodes[nodeid] if n.type is None: nodes[nodeid] = Node(n.up, n.down, tp) else: nodes[nodeid] = Node([], [], tp) return nodes[nodeid] def get_subgraph(roots, # type: MutableSequence[Text] tool # type: Workflow ): # type: (...) -> Optional[CommentedMap] if tool.tool["class"] != "Workflow": raise Exception("Can only extract subgraph from workflow") nodes = {} # type: Dict[Text, Node] for inp in tool.tool["inputs"]: declare_node(nodes, inp["id"], INPUT) for out in tool.tool["outputs"]: declare_node(nodes, out["id"], OUTPUT) for i in aslist(out.get("outputSource", [])): # source is upstream from output (dependency) nodes[out["id"]].up.append(i) # output is downstream from source declare_node(nodes, i, None) nodes[i].down.append(out["id"]) for st in tool.tool["steps"]: step = declare_node(nodes, st["id"], STEP) for i in st["in"]: if "source" not in i: continue for src in aslist(i["source"]): # source is upstream from step (dependency) step.up.append(src) # step is downstream from source declare_node(nodes, src, None) nodes[src].down.append(st["id"]) for out in st["out"]: # output is downstream from step step.down.append(out) # step is upstream from output declare_node(nodes, out, None) nodes[out].up.append(st["id"]) # Find all the downstream nodes from the starting points visited_down = set() # type: Set[Text] for r in roots: if nodes[r].type == OUTPUT: subgraph_visit(r, nodes, visited_down, UP) else: subgraph_visit(r, nodes, visited_down, DOWN) def find_step(stepid): # type: (Text) -> Optional[MutableMapping[Text, Any]] for st in tool.steps: if st.tool["id"] == stepid: return st.tool return None # Now make sure all the nodes are connected to upstream inputs visited = set() # type: Set[Text] rewire = {} # type: Dict[Text, Tuple[Text, Text]] for v in visited_down: visited.add(v) if nodes[v].type in (STEP, OUTPUT): for u in nodes[v].up: if u in visited_down: continue if nodes[u].type == INPUT: visited.add(u) else: # rewire df = urllib.parse.urldefrag(u) rn = df[0] + "#" + df[1].replace("/", "_") if nodes[v].type == STEP: wfstep = find_step(v) if wfstep is not None: for inp in wfstep["inputs"]: if u in inp["source"]: rewire[u] = (rn, inp["type"]) break else: raise Exception("Could not find step %s" % v) extracted = CommentedMap() for f in tool.tool: if f in ("steps", "inputs", "outputs"): extracted[f] = [] for i in tool.tool[f]: if i["id"] in visited: if f == "steps": for inport in i["in"]: if "source" not in inport: continue if isinstance(inport["source"], MutableSequence): inport["source"] = [rewire[s][0] for s in inport["source"] if s in rewire] elif inport["source"] in rewire: inport["source"] = rewire[inport["source"]][0] extracted[f].append(i) else: extracted[f] = tool.tool[f] for rv in itervalues(rewire): extracted["inputs"].append({ "id": rv[0], "type": rv[1] }) return extracted