Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/ephemeris/run_data_managers.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
comparison
equal
deleted
inserted
replaced
1:75ca89e9b81c | 2:6af9afd405e9 |
---|---|
1 #!/usr/bin/env python | |
2 """Run-data-managers is a tool for provisioning data on a galaxy instance. | |
3 | |
4 Run-data-managers has the ability to run multiple data managers that are interdependent. | |
5 When a reference genome is needed for bwa-mem for example, Run-data-managers | |
6 can first run a data manager to fetch the fasta file and run | |
7 another data manager that indexes the fasta file for bwa-mem. | |
8 This functionality depends on the "watch_tool_data_dir" setting in galaxy.ini to be True. | |
9 Also, if a new data manager is installed, galaxy needs to be restarted in order for it's tool_data_dir to be watched. | |
10 | |
11 Run-data-managers needs a yaml that specifies what data managers are run and with which settings. | |
12 Example files can be found `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.sample>`_, | |
13 `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.sample.advanced>`_, | |
14 and `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.test>`_. | |
15 | |
16 By default run-data-managers skips entries in the yaml file that have already been run. | |
17 It checks it in the following way: | |
18 | |
19 * If the data manager has input variables "name" or "sequence_name" it will check if the "name" column in the data table already has this entry. | |
20 "name" will take precedence over "sequence_name". | |
21 * If the data manager has input variables "value", "sequence_id" or 'dbkey' it will check if the "value" | |
22 column in the data table already has this entry. | |
23 Value takes precedence over sequence_id which takes precedence over dbkey. | |
24 * If none of the above input variables are specified the data manager will always run. | |
25 """ | |
26 import argparse | |
27 import json | |
28 import logging | |
29 import time | |
30 from collections import namedtuple | |
31 | |
32 from bioblend.galaxy.tool_data import ToolDataClient | |
33 from bioblend.galaxy.tools import ToolClient | |
34 from jinja2 import Template | |
35 | |
36 from . import get_galaxy_connection | |
37 from . import load_yaml_file | |
38 from .common_parser import get_common_args | |
39 from .ephemeris_log import disable_external_library_logging, setup_global_logger | |
40 | |
41 DEFAULT_URL = "http://localhost" | |
42 DEFAULT_SOURCE_TABLES = ["all_fasta"] | |
43 | |
44 | |
45 def wait(gi, job_list, log): | |
46 """ | |
47 Waits until all jobs in a list are finished or failed. | |
48 It will check the state of the created datasets every 30s. | |
49 It will return a tuple: ( finished_jobs, failed_jobs ) | |
50 """ | |
51 | |
52 failed_jobs = [] | |
53 successful_jobs = [] | |
54 | |
55 # Empty list returns false and stops the loop. | |
56 while bool(job_list): | |
57 finished_jobs = [] | |
58 for job in job_list: | |
59 job_hid = job['outputs'][0]['hid'] | |
60 # check if the output of the running job is either in 'ok' or 'error' state | |
61 state = gi.datasets.show_dataset(job['outputs'][0]['id'])['state'] | |
62 if state == 'ok': | |
63 log.info('Job %i finished with state %s.' % (job_hid, state)) | |
64 successful_jobs.append(job) | |
65 finished_jobs.append(job) | |
66 if state == 'error': | |
67 log.error('Job %i finished with state %s.' % (job_hid, state)) | |
68 job_id = job['jobs'][0]['id'] | |
69 job_details = gi.jobs.show_job(job_id, full_details=True) | |
70 log.error( | |
71 "Job {job_hid}: Tool '{tool_id}' finished with exit code: {exit_code}. Stderr: {stderr}".format( | |
72 job_hid=job_hid, | |
73 **job_details | |
74 )) | |
75 log.debug("Job {job_hid}: Tool '{tool_id}' stdout: {stdout}".format( | |
76 job_hid=job_hid, | |
77 **job_details | |
78 )) | |
79 failed_jobs.append(job) | |
80 finished_jobs.append(job) | |
81 else: | |
82 log.debug('Job %i still running.' % job_hid) | |
83 # Remove finished jobs from job_list. | |
84 for finished_job in finished_jobs: | |
85 job_list.remove(finished_job) | |
86 # only sleep if job_list is not empty yet. | |
87 if bool(job_list): | |
88 time.sleep(30) | |
89 return successful_jobs, failed_jobs | |
90 | |
91 | |
92 def get_first_valid_entry(input_dict, key_list): | |
93 """Iterates over key_list and returns the value of the first key that exists in the dictionary. Or returns None""" | |
94 for key in key_list: | |
95 if key in input_dict: | |
96 return input_dict.get(key) | |
97 return None | |
98 | |
99 | |
100 class DataManagers: | |
101 def __init__(self, galaxy_instance, configuration): | |
102 """ | |
103 :param galaxy_instance: A GalaxyInstance object (import from bioblend.galaxy) | |
104 :param configuration: A dictionary. Examples in the ephemeris documentation. | |
105 """ | |
106 self.gi = galaxy_instance | |
107 self.config = configuration | |
108 self.tool_data_client = ToolDataClient(self.gi) | |
109 self.tool_client = ToolClient(self.gi) | |
110 self.possible_name_keys = ['name', 'sequence_name'] # In order of importance! | |
111 self.possible_value_keys = ['value', 'sequence_id', 'dbkey'] # In order of importance! | |
112 self.data_managers = self.config.get('data_managers') | |
113 self.genomes = self.config.get('genomes', '') | |
114 self.source_tables = DEFAULT_SOURCE_TABLES | |
115 self.fetch_jobs = [] | |
116 self.skipped_fetch_jobs = [] | |
117 self.index_jobs = [] | |
118 self.skipped_index_jobs = [] | |
119 | |
120 def initiate_job_lists(self): | |
121 """ | |
122 Determines which data managers should be run to populate the data tables. | |
123 Distinguishes between fetch jobs (download files) and index jobs. | |
124 :return: populate self.fetch_jobs, self.skipped_fetch_jobs, self.index_jobs and self.skipped_index_jobs | |
125 """ | |
126 self.fetch_jobs = [] | |
127 self.skipped_fetch_jobs = [] | |
128 self.index_jobs = [] | |
129 self.skipped_index_jobs = [] | |
130 for dm in self.data_managers: | |
131 jobs, skipped_jobs = self.get_dm_jobs(dm) | |
132 if self.dm_is_fetcher(dm): | |
133 self.fetch_jobs.extend(jobs) | |
134 self.skipped_fetch_jobs.extend(skipped_jobs) | |
135 else: | |
136 self.index_jobs.extend(jobs) | |
137 self.skipped_index_jobs.extend(skipped_jobs) | |
138 | |
139 def get_dm_jobs(self, dm): | |
140 """Gets the job entries for a single dm. Puts entries that already present in skipped_job_list. | |
141 :returns job_list, skipped_job_list""" | |
142 job_list = [] | |
143 skipped_job_list = [] | |
144 items = self.parse_items(dm.get('items', [''])) | |
145 for item in items: | |
146 dm_id = dm['id'] | |
147 params = dm['params'] | |
148 inputs = dict() | |
149 # Iterate over all parameters, replace occurences of {{item}} with the current processing item | |
150 # and create the tool_inputs dict for running the data manager job | |
151 for param in params: | |
152 key, value = list(param.items())[0] | |
153 value_template = Template(value) | |
154 value = value_template.render(item=item) | |
155 inputs.update({key: value}) | |
156 | |
157 job = dict(tool_id=dm_id, inputs=inputs) | |
158 | |
159 data_tables = dm.get('data_table_reload', []) | |
160 if self.input_entries_exist_in_data_tables(data_tables, inputs): | |
161 skipped_job_list.append(job) | |
162 else: | |
163 job_list.append(job) | |
164 return job_list, skipped_job_list | |
165 | |
166 def dm_is_fetcher(self, dm): | |
167 """Checks whether the data manager fetches a sequence instead of indexing. | |
168 This is based on the source table. | |
169 :returns True if dm is a fetcher. False if it is not.""" | |
170 data_tables = dm.get('data_table_reload', []) | |
171 for data_table in data_tables: | |
172 if data_table in self.source_tables: | |
173 return True | |
174 return False | |
175 | |
176 def data_table_entry_exists(self, data_table_name, entry, column='value'): | |
177 """Checks whether an entry exists in the a specified column in the data_table.""" | |
178 try: | |
179 data_table_content = self.tool_data_client.show_data_table(data_table_name) | |
180 except Exception: | |
181 raise Exception('Table "%s" does not exist' % data_table_name) | |
182 | |
183 try: | |
184 column_index = data_table_content.get('columns').index(column) | |
185 except IndexError: | |
186 raise IndexError('Column "%s" does not exist in %s' % (column, data_table_name)) | |
187 | |
188 for field in data_table_content.get('fields'): | |
189 if field[column_index] == entry: | |
190 return True | |
191 return False | |
192 | |
193 def input_entries_exist_in_data_tables(self, data_tables, input_dict): | |
194 """Checks whether name and value entries from the input are already present in the data tables. | |
195 If an entry is missing in of the tables, this function returns False""" | |
196 value_entry = get_first_valid_entry(input_dict, self.possible_value_keys) | |
197 name_entry = get_first_valid_entry(input_dict, self.possible_name_keys) | |
198 | |
199 # Return False if name and value entries are both None | |
200 if not value_entry and not name_entry: | |
201 return False | |
202 | |
203 # Check every data table for existence of name and value | |
204 # Return False as soon as entry is not present | |
205 for data_table in data_tables: | |
206 if value_entry: | |
207 if not self.data_table_entry_exists(data_table, value_entry, column='value'): | |
208 return False | |
209 if name_entry: | |
210 if not self.data_table_entry_exists(data_table, name_entry, column='name'): | |
211 return False | |
212 # If all checks are passed the entries are present in the database tables. | |
213 return True | |
214 | |
215 def parse_items(self, items): | |
216 """ | |
217 Parses items with jinja2. | |
218 :param items: the items to be parsed | |
219 :return: the parsed items | |
220 """ | |
221 if bool(self.genomes): | |
222 items_template = Template(json.dumps(items)) | |
223 rendered_items = items_template.render(genomes=json.dumps(self.genomes)) | |
224 # Remove trailing " if present | |
225 rendered_items = rendered_items.strip('"') | |
226 items = json.loads(rendered_items) | |
227 return items | |
228 | |
229 def run(self, log=None, ignore_errors=False, overwrite=False): | |
230 """ | |
231 Runs the data managers. | |
232 :param log: The log to be used. | |
233 :param ignore_errors: Ignore erroring data_managers. Continue regardless. | |
234 :param overwrite: Overwrite existing entries in data tables | |
235 """ | |
236 self.initiate_job_lists() | |
237 all_succesful_jobs = [] | |
238 all_failed_jobs = [] | |
239 all_skipped_jobs = [] | |
240 | |
241 if not log: | |
242 log = logging.getLogger() | |
243 | |
244 def run_jobs(jobs, skipped_jobs): | |
245 job_list = [] | |
246 for skipped_job in skipped_jobs: | |
247 if overwrite: | |
248 log.info('%s already run for %s. Entry will be overwritten.' % | |
249 (skipped_job["tool_id"], skipped_job["inputs"])) | |
250 jobs.append(skipped_job) | |
251 else: | |
252 log.info('%s already run for %s. Skipping.' % (skipped_job["tool_id"], skipped_job["inputs"])) | |
253 all_skipped_jobs.append(skipped_job) | |
254 for job in jobs: | |
255 started_job = self.tool_client.run_tool(history_id=None, tool_id=job["tool_id"], | |
256 tool_inputs=job["inputs"]) | |
257 log.info('Dispatched job %i. Running DM: "%s" with parameters: %s' % | |
258 (started_job['outputs'][0]['hid'], job["tool_id"], job["inputs"])) | |
259 job_list.append(started_job) | |
260 | |
261 successful_jobs, failed_jobs = wait(self.gi, job_list, log) | |
262 if failed_jobs: | |
263 if not ignore_errors: | |
264 log.error('Not all jobs successful! aborting...') | |
265 raise RuntimeError('Not all jobs successful! aborting...') | |
266 else: | |
267 log.warning('Not all jobs successful! ignoring...') | |
268 all_succesful_jobs.extend(successful_jobs) | |
269 all_failed_jobs.extend(failed_jobs) | |
270 | |
271 log.info("Running data managers that populate the following source data tables: %s" % self.source_tables) | |
272 run_jobs(self.fetch_jobs, self.skipped_fetch_jobs) | |
273 log.info("Running data managers that index sequences.") | |
274 run_jobs(self.index_jobs, self.skipped_index_jobs) | |
275 | |
276 log.info('Finished running data managers. Results:') | |
277 log.info('Successful jobs: %i ' % len(all_succesful_jobs)) | |
278 log.info('Skipped jobs: %i ' % len(all_skipped_jobs)) | |
279 log.info('Failed jobs: %i ' % len(all_failed_jobs)) | |
280 InstallResults = namedtuple("InstallResults", ["successful_jobs", "failed_jobs", "skipped_jobs"]) | |
281 return InstallResults(successful_jobs=all_succesful_jobs, failed_jobs=all_failed_jobs, | |
282 skipped_jobs=all_skipped_jobs) | |
283 | |
284 | |
285 def _parser(): | |
286 """returns the parser object.""" | |
287 parent = get_common_args(log_file=True) | |
288 | |
289 parser = argparse.ArgumentParser( | |
290 parents=[parent], | |
291 description='Running Galaxy data managers in a defined order with defined parameters.' | |
292 "'watch_tool_data_dir' in galaxy config should be set to true.'") | |
293 parser.add_argument("--config", required=True, | |
294 help="Path to the YAML config file with the list of data managers and data to install.") | |
295 parser.add_argument("--overwrite", action="store_true", | |
296 help="Disables checking whether the item already exists in the tool data table.") | |
297 parser.add_argument("--ignore_errors", action="store_true", | |
298 help="Do not stop running when jobs have failed.") | |
299 return parser | |
300 | |
301 | |
302 def main(): | |
303 disable_external_library_logging() | |
304 parser = _parser() | |
305 args = parser.parse_args() | |
306 log = setup_global_logger(name=__name__, log_file=args.log_file) | |
307 if args.verbose: | |
308 log.setLevel(logging.DEBUG) | |
309 else: | |
310 log.setLevel(logging.INFO) | |
311 gi = get_galaxy_connection(args, file=args.config, log=log, login_required=True) | |
312 config = load_yaml_file(args.config) | |
313 data_managers = DataManagers(gi, config) | |
314 data_managers.run(log, args.ignore_errors, args.overwrite) | |
315 | |
316 | |
317 if __name__ == '__main__': | |
318 main() |