Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/planemo/galaxy/workflows.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
comparison
equal
deleted
inserted
replaced
1:75ca89e9b81c | 2:6af9afd405e9 |
---|---|
1 """Utilities for Galaxy workflows.""" | |
2 import json | |
3 import os | |
4 from collections import namedtuple | |
5 | |
6 import yaml | |
7 from bioblend.galaxy.client import Client | |
8 from ephemeris import generate_tool_list_from_ga_workflow_files | |
9 from ephemeris import shed_tools | |
10 from gxformat2.converter import python_to_workflow | |
11 from gxformat2.interface import BioBlendImporterGalaxyInterface | |
12 from gxformat2.interface import ImporterGalaxyInterface | |
13 | |
14 from planemo.io import warn | |
15 | |
16 FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories." | |
17 | |
18 | |
19 def load_shed_repos(runnable): | |
20 if runnable.type.name != "galaxy_workflow": | |
21 return [] | |
22 | |
23 path = runnable.path | |
24 if path.endswith(".ga"): | |
25 generate_tool_list_from_ga_workflow_files.generate_tool_list_from_workflow([path], "Tools from workflows", "tools.yml") | |
26 with open("tools.yml", "r") as f: | |
27 tools = yaml.safe_load(f)["tools"] | |
28 | |
29 else: | |
30 # It'd be better to just infer this from the tool shed ID somehow than | |
31 # require explicit annotation like this... I think? | |
32 with open(path, "r") as f: | |
33 workflow = yaml.safe_load(f) | |
34 | |
35 tools = workflow.get("tools", []) | |
36 | |
37 return tools | |
38 | |
39 | |
40 def install_shed_repos(runnable, admin_gi, ignore_dependency_problems): | |
41 tools_info = load_shed_repos(runnable) | |
42 if tools_info: | |
43 install_tool_manager = shed_tools.InstallRepositoryManager(admin_gi) | |
44 install_results = install_tool_manager.install_repositories(tools_info) | |
45 if install_results.errored_repositories: | |
46 if ignore_dependency_problems: | |
47 warn(FAILED_REPOSITORIES_MESSAGE) | |
48 else: | |
49 raise Exception(FAILED_REPOSITORIES_MESSAGE) | |
50 | |
51 | |
52 def import_workflow(path, admin_gi, user_gi, from_path=False): | |
53 """Import a workflow path to specified Galaxy instance.""" | |
54 if not from_path: | |
55 importer = BioBlendImporterGalaxyInterface( | |
56 admin_gi=admin_gi, | |
57 user_gi=user_gi | |
58 ) | |
59 workflow = _raw_dict(path, importer) | |
60 return importer.import_workflow(workflow) | |
61 else: | |
62 # TODO: Update bioblend to allow from_path. | |
63 path = os.path.abspath(path) | |
64 payload = dict( | |
65 from_path=path | |
66 ) | |
67 workflows_url = user_gi._make_url(user_gi.workflows) | |
68 workflow = Client._post(user_gi.workflows, payload, url=workflows_url) | |
69 return workflow | |
70 | |
71 | |
72 def _raw_dict(path, importer=None): | |
73 if path.endswith(".ga"): | |
74 with open(path, "r") as f: | |
75 workflow = json.load(f) | |
76 else: | |
77 if importer is None: | |
78 importer = DummyImporterGalaxyInterface() | |
79 | |
80 workflow_directory = os.path.dirname(path) | |
81 workflow_directory = os.path.abspath(workflow_directory) | |
82 with open(path, "r") as f: | |
83 workflow = yaml.safe_load(f) | |
84 workflow = python_to_workflow(workflow, importer, workflow_directory) | |
85 | |
86 return workflow | |
87 | |
88 | |
89 def find_tool_ids(path): | |
90 tool_ids = [] | |
91 workflow = _raw_dict(path) | |
92 for (order_index, step) in workflow["steps"].items(): | |
93 tool_id = step.get("tool_id") | |
94 tool_ids.append(tool_id) | |
95 | |
96 return tool_ids | |
97 | |
98 | |
99 WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label"]) | |
100 | |
101 | |
102 def describe_outputs(path): | |
103 """Return a list of :class:`WorkflowOutput` objects for target workflow.""" | |
104 workflow = _raw_dict(path) | |
105 outputs = [] | |
106 for (order_index, step) in workflow["steps"].items(): | |
107 step_outputs = step.get("workflow_outputs", []) | |
108 for step_output in step_outputs: | |
109 output = WorkflowOutput( | |
110 int(order_index), | |
111 step_output["output_name"], | |
112 step_output["label"], | |
113 ) | |
114 outputs.append(output) | |
115 return outputs | |
116 | |
117 | |
118 class DummyImporterGalaxyInterface(ImporterGalaxyInterface): | |
119 | |
120 def import_workflow(self, workflow, **kwds): | |
121 return None | |
122 | |
123 | |
124 __all__ = ( | |
125 "import_workflow", | |
126 "describe_outputs", | |
127 ) |