Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/subgraph.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/subgraph.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,148 +0,0 @@ -import copy -from .utils import aslist, json_dumps -from collections import namedtuple -from typing import (Dict, MutableMapping, MutableSequence, Set, Any, Text, Optional, Tuple) -from .process import shortname -from six import itervalues -from six.moves import urllib -from .workflow import Workflow -from ruamel.yaml.comments import CommentedMap - -Node = namedtuple('Node', ('up', 'down', 'type')) -UP = "up" -DOWN = "down" -INPUT = "input" -OUTPUT = "output" -STEP = "step" - -def subgraph_visit(current, # type: Text - nodes, # type: MutableMapping[Text, Node] - visited, # type: Set[Text] - direction # type: Text -): # type: (...) -> None - - if current in visited: - return - visited.add(current) - - if direction == DOWN: - d = nodes[current].down - if direction == UP: - d = nodes[current].up - for c in d: - subgraph_visit(c, nodes, visited, direction) - -def declare_node(nodes, nodeid, tp): - # type: (Dict[Text, Node], Text, Optional[Text]) -> Node - if nodeid in nodes: - n = nodes[nodeid] - if n.type is None: - nodes[nodeid] = Node(n.up, n.down, tp) - else: - nodes[nodeid] = Node([], [], tp) - return nodes[nodeid] - -def get_subgraph(roots, # type: MutableSequence[Text] - tool # type: Workflow - ): # type: (...) -> Optional[CommentedMap] - if tool.tool["class"] != "Workflow": - raise Exception("Can only extract subgraph from workflow") - - nodes = {} # type: Dict[Text, Node] - - for inp in tool.tool["inputs"]: - declare_node(nodes, inp["id"], INPUT) - - for out in tool.tool["outputs"]: - declare_node(nodes, out["id"], OUTPUT) - for i in aslist(out.get("outputSource", [])): - # source is upstream from output (dependency) - nodes[out["id"]].up.append(i) - # output is downstream from source - declare_node(nodes, i, None) - nodes[i].down.append(out["id"]) - - for st in tool.tool["steps"]: - step = declare_node(nodes, st["id"], STEP) - for i in st["in"]: - if "source" not in i: - continue - for src in aslist(i["source"]): - # source is upstream from step (dependency) - step.up.append(src) - # step is downstream from source - declare_node(nodes, src, None) - nodes[src].down.append(st["id"]) - for out in st["out"]: - # output is downstream from step - step.down.append(out) - # step is upstream from output - declare_node(nodes, out, None) - nodes[out].up.append(st["id"]) - - - # Find all the downstream nodes from the starting points - visited_down = set() # type: Set[Text] - for r in roots: - if nodes[r].type == OUTPUT: - subgraph_visit(r, nodes, visited_down, UP) - else: - subgraph_visit(r, nodes, visited_down, DOWN) - - def find_step(stepid): # type: (Text) -> Optional[MutableMapping[Text, Any]] - for st in tool.steps: - if st.tool["id"] == stepid: - return st.tool - return None - - # Now make sure all the nodes are connected to upstream inputs - visited = set() # type: Set[Text] - rewire = {} # type: Dict[Text, Tuple[Text, Text]] - for v in visited_down: - visited.add(v) - if nodes[v].type in (STEP, OUTPUT): - for u in nodes[v].up: - if u in visited_down: - continue - if nodes[u].type == INPUT: - visited.add(u) - else: - # rewire - df = urllib.parse.urldefrag(u) - rn = df[0] + "#" + df[1].replace("/", "_") - if nodes[v].type == STEP: - wfstep = find_step(v) - if wfstep is not None: - for inp in wfstep["inputs"]: - if u in inp["source"]: - rewire[u] = (rn, inp["type"]) - break - else: - raise Exception("Could not find step %s" % v) - - - extracted = CommentedMap() - for f in tool.tool: - if f in ("steps", "inputs", "outputs"): - extracted[f] = [] - for i in tool.tool[f]: - if i["id"] in visited: - if f == "steps": - for inport in i["in"]: - if "source" not in inport: - continue - if isinstance(inport["source"], MutableSequence): - inport["source"] = [rewire[s][0] for s in inport["source"] if s in rewire] - elif inport["source"] in rewire: - inport["source"] = rewire[inport["source"]][0] - extracted[f].append(i) - else: - extracted[f] = tool.tool[f] - - for rv in itervalues(rewire): - extracted["inputs"].append({ - "id": rv[0], - "type": rv[1] - }) - - return extracted