Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/ephemeris/shed_tools.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 """ | |
2 A tool to automate installation of tool repositories from a Galaxy Tool Shed | |
3 into an instance of Galaxy. | |
4 | |
5 Shed-tools has three commands: update, test and install. | |
6 | |
7 Update simply updates all the tools in a Galaxy given connection details on the command line. | |
8 | |
9 Test tests the specified tools in the Galaxy Instance. | |
10 | |
11 Install allows installation of tools in multiple ways. | |
12 Galaxy instance details and the installed tools can be provided in one of three | |
13 ways: | |
14 | |
15 1. In the YAML format via dedicated files (a sample can be found | |
16 `here <https://github.com/galaxyproject/ansible-galaxy-tools/blob/master/files/tool_list.yaml.sample>`_). | |
17 2. On the command line as dedicated script options (see the usage help). | |
18 3. As a single composite parameter to the script. The parameter must be a | |
19 single, YAML-formatted string with the keys corresponding to the keys | |
20 available for use in the YAML formatted file (for example: | |
21 `--yaml_tool "{'owner': 'kellrott', 'tool_shed_url': | |
22 'https://testtoolshed.g2.bx.psu.edu', 'tool_panel_section_id': | |
23 'peak_calling', 'name': 'synapse_interface'}"`). | |
24 | |
25 Only one of the methods can be used with each invocation of the script but if | |
26 more than one are provided are provided, precedence will correspond to order | |
27 of the items in the list above. | |
28 When installing tools, Galaxy expects any `tool_panel_section_id` provided when | |
29 installing a tool to already exist in the configuration. If the section | |
30 does not exist, the tool will be installed outside any section. See | |
31 `shed_tool_conf.xml.sample` in this directory for a sample of such file. Before | |
32 running this script to install the tools, make sure to place such file into | |
33 Galaxy's configuration directory and set Galaxy configuration option | |
34 `tool_config_file` to include it. | |
35 """ | |
36 import datetime as dt | |
37 import json | |
38 import os | |
39 import re | |
40 import time | |
41 from collections import namedtuple | |
42 from concurrent.futures import thread, ThreadPoolExecutor | |
43 | |
44 import requests | |
45 import yaml | |
46 from bioblend.galaxy.client import ConnectionError | |
47 from bioblend.galaxy.toolshed import ToolShedClient | |
48 from galaxy.tool_util.verify.interactor import ( | |
49 GalaxyInteractorApi, | |
50 verify_tool, | |
51 ) | |
52 from galaxy.util import unicodify | |
53 | |
54 from . import get_galaxy_connection, load_yaml_file | |
55 from .ephemeris_log import disable_external_library_logging, setup_global_logger | |
56 from .get_tool_list_from_galaxy import GiToToolYaml, the_same_repository, tools_for_repository | |
57 from .shed_tools_args import parser | |
58 from .shed_tools_methods import complete_repo_information, flatten_repo_info, VALID_KEYS | |
59 | |
60 | |
61 class InstallRepositoryManager(object): | |
62 """Manages the installation of new repositories on a galaxy instance""" | |
63 | |
64 def __init__(self, | |
65 galaxy_instance): | |
66 """Initialize a new tool manager""" | |
67 self.gi = galaxy_instance | |
68 self.tool_shed_client = ToolShedClient(self.gi) | |
69 | |
70 def installed_repositories(self): | |
71 """Get currently installed tools""" | |
72 return GiToToolYaml( | |
73 gi=self.gi, | |
74 skip_tool_panel_section_name=False, | |
75 get_data_managers=True, | |
76 get_all_tools=True | |
77 ).tool_list.get("tools") | |
78 | |
79 def filter_installed_repos(self, repos, check_revision=True): | |
80 # TODO: Find a speedier algorithm. | |
81 """This filters a list of repositories""" | |
82 not_installed_repos = [] | |
83 already_installed_repos = [] | |
84 if check_revision: | |
85 # If we want to check if revisions are equal, flatten the list, | |
86 # so each repository - revision combination has its own entry | |
87 installed_repos = flatten_repo_info(self.installed_repositories()) | |
88 else: | |
89 # If we do not care about revision equality, do not do the flatten | |
90 # action to limit the number of comparisons. | |
91 installed_repos = self.installed_repositories() | |
92 | |
93 for repo in repos: | |
94 for installed_repo in installed_repos: | |
95 if the_same_repository(installed_repo, repo, check_revision): | |
96 already_installed_repos.append(repo) | |
97 break | |
98 else: # This executes when the for loop completes and no match has been found. | |
99 not_installed_repos.append(repo) | |
100 FilterResults = namedtuple("FilterResults", ["not_installed_repos", "already_installed_repos"]) | |
101 return FilterResults(already_installed_repos=already_installed_repos, not_installed_repos=not_installed_repos) | |
102 | |
103 def install_repositories(self, | |
104 repositories, | |
105 log=None, | |
106 force_latest_revision=False, | |
107 default_toolshed='https://toolshed.g2.bx.psu.edu/', | |
108 default_install_tool_dependencies=False, | |
109 default_install_resolver_dependencies=True, | |
110 default_install_repository_dependencies=True): | |
111 """Install a list of tools on the current galaxy""" | |
112 if not repositories: | |
113 raise ValueError("Empty list of tools was given") | |
114 installation_start = dt.datetime.now() | |
115 installed_repositories = [] | |
116 skipped_repositories = [] | |
117 errored_repositories = [] | |
118 counter = 0 | |
119 | |
120 # Check repos for invalid keys | |
121 for repo in repositories: | |
122 for key in repo.keys(): | |
123 if key not in VALID_KEYS and key != 'revisions': | |
124 if log: | |
125 log.warning("'{0}' not a valid key. Will be skipped during parsing".format(key)) | |
126 | |
127 # Start by flattening the repo list per revision | |
128 flattened_repos = flatten_repo_info(repositories) | |
129 total_num_repositories = len(flattened_repos) | |
130 | |
131 # Complete the repo information, and make sure each repository has a revision | |
132 repository_list = [] | |
133 for repository in flattened_repos: | |
134 start = dt.datetime.now() | |
135 try: | |
136 complete_repo = complete_repo_information( | |
137 repository, | |
138 default_toolshed_url=default_toolshed, | |
139 require_tool_panel_info=True, | |
140 default_install_tool_dependencies=default_install_tool_dependencies, | |
141 default_install_resolver_dependencies=default_install_resolver_dependencies, | |
142 default_install_repository_dependencies=default_install_repository_dependencies, | |
143 force_latest_revision=force_latest_revision) | |
144 repository_list.append(complete_repo) | |
145 except Exception as e: | |
146 # We'll run through the loop come whatever may, we log the errored repositories at the end anyway. | |
147 if log: | |
148 log_repository_install_error(repository, start, unicodify(e), log) | |
149 errored_repositories.append(repository) | |
150 | |
151 # Filter out already installed repos | |
152 filtered_repos = self.filter_installed_repos(repository_list) | |
153 | |
154 for skipped_repo in filtered_repos.already_installed_repos: | |
155 counter += 1 | |
156 if log: | |
157 log_repository_install_skip(skipped_repo, counter, total_num_repositories, log) | |
158 skipped_repositories.append(skipped_repo) | |
159 | |
160 # Install repos | |
161 for repository in filtered_repos.not_installed_repos: | |
162 counter += 1 | |
163 if log: | |
164 log_repository_install_start(repository, counter=counter, installation_start=installation_start, log=log, | |
165 total_num_repositories=total_num_repositories) | |
166 result = self.install_repository_revision(repository, log) | |
167 if result == "error": | |
168 errored_repositories.append(repository) | |
169 elif result == "skipped": | |
170 skipped_repositories.append(repository) | |
171 elif result == "installed": | |
172 installed_repositories.append(repository) | |
173 | |
174 # Log results | |
175 if log: | |
176 log.info("Installed repositories ({0}): {1}".format( | |
177 len(installed_repositories), | |
178 [( | |
179 t['name'], | |
180 t.get('changeset_revision') | |
181 ) for t in installed_repositories]) | |
182 ) | |
183 log.info("Skipped repositories ({0}): {1}".format( | |
184 len(skipped_repositories), | |
185 [( | |
186 t['name'], | |
187 t.get('changeset_revision') | |
188 ) for t in skipped_repositories]) | |
189 ) | |
190 log.info("Errored repositories ({0}): {1}".format( | |
191 len(errored_repositories), | |
192 [( | |
193 t['name'], | |
194 t.get('changeset_revision', "") | |
195 ) for t in errored_repositories]) | |
196 ) | |
197 log.info("All repositories have been installed.") | |
198 log.info("Total run time: {0}".format(dt.datetime.now() - installation_start)) | |
199 InstallResults = namedtuple("InstallResults", | |
200 ["installed_repositories", "errored_repositories", "skipped_repositories"]) | |
201 return InstallResults(installed_repositories=installed_repositories, | |
202 skipped_repositories=skipped_repositories, | |
203 errored_repositories=errored_repositories) | |
204 | |
205 def update_repositories(self, repositories=None, log=None, **kwargs): | |
206 if not repositories: # Repositories None or empty list | |
207 repositories = self.installed_repositories() | |
208 else: | |
209 filtered_repos = self.filter_installed_repos(repositories, check_revision=False) | |
210 if filtered_repos.not_installed_repos: | |
211 if log: | |
212 log.warning("The following tools are not installed and will not be upgraded: {0}".format( | |
213 filtered_repos.not_installed_repos)) | |
214 repositories = filtered_repos.already_installed_repos | |
215 return self.install_repositories(repositories, force_latest_revision=True, log=log, **kwargs) | |
216 | |
217 def test_tools(self, | |
218 test_json, | |
219 repositories=None, | |
220 log=None, | |
221 test_user_api_key=None, | |
222 test_user="ephemeris@galaxyproject.org", | |
223 parallel_tests=1, | |
224 ): | |
225 """Run tool tests for all tools in each repository in supplied tool list or ``self.installed_repositories()``. | |
226 """ | |
227 tool_test_start = dt.datetime.now() | |
228 tests_passed = [] | |
229 test_exceptions = [] | |
230 | |
231 if not repositories: # If repositories is None or empty list | |
232 # Consider a variant of this that doesn't even consume a tool list YAML? target | |
233 # something like installed_repository_revisions(self.gi) | |
234 repositories = self.installed_repositories() | |
235 | |
236 target_repositories = flatten_repo_info(repositories) | |
237 | |
238 installed_tools = [] | |
239 for target_repository in target_repositories: | |
240 repo_tools = tools_for_repository(self.gi, target_repository) | |
241 installed_tools.extend(repo_tools) | |
242 | |
243 all_test_results = [] | |
244 galaxy_interactor = self._get_interactor(test_user, test_user_api_key) | |
245 test_history = galaxy_interactor.new_history() | |
246 | |
247 with ThreadPoolExecutor(max_workers=parallel_tests) as executor: | |
248 try: | |
249 for tool in installed_tools: | |
250 self._test_tool(executor=executor, | |
251 tool=tool, | |
252 galaxy_interactor=galaxy_interactor, | |
253 test_history=test_history, | |
254 log=log, | |
255 tool_test_results=all_test_results, | |
256 tests_passed=tests_passed, | |
257 test_exceptions=test_exceptions, | |
258 ) | |
259 finally: | |
260 # Always write report, even if test was cancelled. | |
261 try: | |
262 executor.shutdown(wait=True) | |
263 except KeyboardInterrupt: | |
264 executor._threads.clear() | |
265 thread._threads_queues.clear() | |
266 n_passed = len(tests_passed) | |
267 n_failed = len(test_exceptions) | |
268 report_obj = { | |
269 'version': '0.1', | |
270 'suitename': 'Ephemeris tool tests targeting %s' % self.gi.base_url, | |
271 'results': { | |
272 'total': n_passed + n_failed, | |
273 'errors': n_failed, | |
274 'failures': 0, | |
275 'skips': 0, | |
276 }, | |
277 'tests': sorted(all_test_results, key=lambda el: el['id']), | |
278 } | |
279 with open(test_json, "w") as f: | |
280 json.dump(report_obj, f) | |
281 if log: | |
282 log.info("Report written to '%s'", os.path.abspath(test_json)) | |
283 log.info("Passed tool tests ({0}): {1}".format( | |
284 n_passed, | |
285 [t for t in tests_passed]) | |
286 ) | |
287 log.info("Failed tool tests ({0}): {1}".format( | |
288 n_failed, | |
289 [t[0] for t in test_exceptions]) | |
290 ) | |
291 log.info("Total tool test time: {0}".format(dt.datetime.now() - tool_test_start)) | |
292 | |
293 def _get_interactor(self, test_user, test_user_api_key): | |
294 if test_user_api_key is None: | |
295 whoami = self.gi.make_get_request(self.gi.url + "/whoami").json() | |
296 if whoami is not None: | |
297 test_user_api_key = self.gi.key | |
298 galaxy_interactor_kwds = { | |
299 "galaxy_url": re.sub('/api', '', self.gi.url), | |
300 "master_api_key": self.gi.key, | |
301 "api_key": test_user_api_key, # TODO | |
302 "keep_outputs_dir": '', | |
303 } | |
304 if test_user_api_key is None: | |
305 galaxy_interactor_kwds["test_user"] = test_user | |
306 galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds) | |
307 return galaxy_interactor | |
308 | |
309 @staticmethod | |
310 def _test_tool(executor, | |
311 tool, | |
312 galaxy_interactor, | |
313 tool_test_results, | |
314 tests_passed, | |
315 test_exceptions, | |
316 log, | |
317 test_history=None, | |
318 ): | |
319 if test_history is None: | |
320 test_history = galaxy_interactor.new_history() | |
321 tool_id = tool["id"] | |
322 tool_version = tool["version"] | |
323 try: | |
324 tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version) | |
325 except Exception as e: | |
326 if log: | |
327 log.warning("Fetching test definition for tool '%s' failed", tool_id, exc_info=True) | |
328 test_exceptions.append((tool_id, e)) | |
329 Results = namedtuple("Results", ["tool_test_results", "tests_passed", "test_exceptions"]) | |
330 return Results(tool_test_results=tool_test_results, | |
331 tests_passed=tests_passed, | |
332 test_exceptions=test_exceptions) | |
333 test_indices = list(range(len(tool_test_dicts))) | |
334 | |
335 for test_index in test_indices: | |
336 test_id = tool_id + "-" + str(test_index) | |
337 | |
338 def run_test(index, test_id): | |
339 | |
340 def register(job_data): | |
341 tool_test_results.append({ | |
342 'id': test_id, | |
343 'has_data': True, | |
344 'data': job_data, | |
345 }) | |
346 | |
347 try: | |
348 if log: | |
349 log.info("Executing test '%s'", test_id) | |
350 verify_tool( | |
351 tool_id, galaxy_interactor, test_index=index, tool_version=tool_version, | |
352 register_job_data=register, quiet=True, test_history=test_history, | |
353 ) | |
354 tests_passed.append(test_id) | |
355 if log: | |
356 log.info("Test '%s' passed", test_id) | |
357 except Exception as e: | |
358 if log: | |
359 log.warning("Test '%s' failed", test_id, exc_info=True) | |
360 test_exceptions.append((test_id, e)) | |
361 | |
362 executor.submit(run_test, test_index, test_id) | |
363 | |
364 def install_repository_revision(self, repository, log): | |
365 default_err_msg = ('All repositories that you are attempting to install ' | |
366 'have been previously installed.') | |
367 start = dt.datetime.now() | |
368 try: | |
369 repository['new_tool_panel_section_label'] = repository.pop('tool_panel_section_label') | |
370 response = self.tool_shed_client.install_repository_revision(**repository) | |
371 if isinstance(response, dict) and response.get('status', None) == 'ok': | |
372 # This rare case happens if a repository is already installed but | |
373 # was not recognised as such in the above check. In such a | |
374 # case the return value looks like this: | |
375 # {u'status': u'ok', u'message': u'No repositories were | |
376 # installed, possibly because the selected repository has | |
377 # already been installed.'} | |
378 if log: | |
379 log.debug("\tRepository {0} is already installed.".format(repository['name'])) | |
380 if log: | |
381 log_repository_install_success( | |
382 repository=repository, | |
383 start=start, | |
384 log=log) | |
385 return "installed" | |
386 except (ConnectionError, requests.exceptions.ConnectionError) as e: | |
387 if default_err_msg in unicodify(e): | |
388 # THIS SHOULD NOT HAPPEN DUE TO THE CHECKS EARLIER | |
389 if log: | |
390 log.debug("\tRepository %s already installed (at revision %s)" % | |
391 (repository['name'], repository['changeset_revision'])) | |
392 return "skipped" | |
393 elif "504" in unicodify(e) or 'Connection aborted' in unicodify(e): | |
394 if log: | |
395 log.debug("Timeout during install of %s, extending wait to 1h", repository['name']) | |
396 success = self.wait_for_install(repository=repository, log=log, timeout=3600) | |
397 if success: | |
398 if log: | |
399 log_repository_install_success( | |
400 repository=repository, | |
401 start=start, | |
402 log=log) | |
403 return "installed" | |
404 else: | |
405 if log: | |
406 log_repository_install_error( | |
407 repository=repository, | |
408 start=start, msg=e.body, | |
409 log=log) | |
410 return "error" | |
411 else: | |
412 if log: | |
413 log_repository_install_error( | |
414 repository=repository, | |
415 start=start, msg=e.body, | |
416 log=log) | |
417 return "error" | |
418 | |
419 def wait_for_install(self, repository, log=None, timeout=3600): | |
420 """ | |
421 If nginx times out, we look into the list of installed repositories | |
422 and try to determine if a repository of the same namer/owner is still installing. | |
423 Returns True if install finished successfully, | |
424 returns False when timeout is exceeded or installation has failed. | |
425 """ | |
426 start = dt.datetime.now() | |
427 while (dt.datetime.now() - start) < dt.timedelta(seconds=timeout): | |
428 try: | |
429 installed_repo_list = self.tool_shed_client.get_repositories() | |
430 for installing_repo in installed_repo_list: | |
431 if (installing_repo['name'] == repository['name']) and ( | |
432 installing_repo['owner'] == repository['owner']) and ( | |
433 installing_repo['changeset_revision'] == repository['changeset_revision']): | |
434 if installing_repo['status'] == 'Installed': | |
435 return True | |
436 elif installing_repo['status'] == 'Error': | |
437 return False | |
438 else: | |
439 time.sleep(10) | |
440 except ConnectionError as e: | |
441 if log: | |
442 log.warning('Failed to get repositories list: %s', unicodify(e)) | |
443 time.sleep(10) | |
444 return False | |
445 | |
446 | |
447 def log_repository_install_error(repository, start, msg, log): | |
448 """ | |
449 Log failed repository installations. Return a dictionary with information | |
450 """ | |
451 end = dt.datetime.now() | |
452 log.error( | |
453 "\t* Error installing a repository (after %s seconds)! Name: %s," "owner: %s, ""revision: %s, error: %s", | |
454 str(end - start), | |
455 repository.get('name', ""), | |
456 repository.get('owner', ""), | |
457 repository.get('changeset_revision', ""), | |
458 msg) | |
459 | |
460 | |
461 def log_repository_install_success(repository, start, log): | |
462 """ | |
463 Log successful repository installation. | |
464 Repositories that finish in error still count as successful installs currently. | |
465 """ | |
466 end = dt.datetime.now() | |
467 log.debug( | |
468 "\trepository %s installed successfully (in %s) at revision %s" % ( | |
469 repository['name'], | |
470 str(end - start), | |
471 repository['changeset_revision'] | |
472 ) | |
473 ) | |
474 | |
475 | |
476 def log_repository_install_skip(repository, counter, total_num_repositories, log): | |
477 log.debug( | |
478 "({0}/{1}) repository {2} already installed at revision {3}. Skipping." | |
479 .format( | |
480 counter, | |
481 total_num_repositories, | |
482 repository['name'], | |
483 repository['changeset_revision'] | |
484 ) | |
485 ) | |
486 | |
487 | |
488 def log_repository_install_start(repository, counter, total_num_repositories, installation_start, log): | |
489 log.debug( | |
490 '(%s/%s) Installing repository %s from %s to section "%s" at revision %s (TRT: %s)' % ( | |
491 counter, total_num_repositories, | |
492 repository['name'], | |
493 repository['owner'], | |
494 repository['tool_panel_section_id'] or repository['tool_panel_section_label'], | |
495 repository['changeset_revision'], | |
496 dt.datetime.now() - installation_start | |
497 ) | |
498 ) | |
499 | |
500 | |
501 def args_to_repos(args): | |
502 if args.tool_list_file: | |
503 tool_list = load_yaml_file(args.tool_list_file) | |
504 repos = tool_list['tools'] | |
505 elif args.tool_yaml: | |
506 repos = [yaml.safe_load(args.tool_yaml)] | |
507 elif args.name and args.owner: | |
508 repo = dict( | |
509 owner=args.owner, | |
510 name=args.name, | |
511 tool_panel_section_id=args.tool_panel_section_id, | |
512 tool_panel_section_label=args.tool_panel_section_label, | |
513 revisions=args.revisions | |
514 ) | |
515 if args.tool_shed_url: | |
516 repo["tool_shed_url"] = args.tool_shed_url | |
517 repos = [repo] | |
518 else: | |
519 repos = [] | |
520 return repos | |
521 | |
522 | |
523 def main(): | |
524 disable_external_library_logging() | |
525 args = parser().parse_args() | |
526 log = setup_global_logger(name=__name__, log_file=args.log_file) | |
527 gi = get_galaxy_connection(args, file=args.tool_list_file, log=log, login_required=True) | |
528 install_repository_manager = InstallRepositoryManager(gi) | |
529 | |
530 repos = args_to_repos(args) | |
531 | |
532 if args.tool_list_file: | |
533 tool_list = load_yaml_file(args.tool_list_file) | |
534 else: | |
535 tool_list = dict() | |
536 | |
537 # Get some of the other installation arguments | |
538 kwargs = dict( | |
539 default_install_tool_dependencies=tool_list.get("install_tool_dependencies") or getattr(args, | |
540 "install_tool_dependencies", | |
541 False), | |
542 default_install_repository_dependencies=tool_list.get("install_repository_dependencies") or getattr(args, | |
543 "install_repository_dependencies", | |
544 False), | |
545 default_install_resolver_dependencies=tool_list.get("install_resolver_dependencies") or getattr(args, | |
546 "install_resolver_dependencies", | |
547 False)) | |
548 | |
549 # Start installing/updating and store the results in install_results. | |
550 # Or do testing if the action is `test` | |
551 install_results = None | |
552 if args.action == "update": | |
553 install_results = install_repository_manager.update_repositories( | |
554 repositories=repos, | |
555 log=log, | |
556 **kwargs) | |
557 elif args.action == "install": | |
558 install_results = install_repository_manager.install_repositories( | |
559 repos, | |
560 log=log, | |
561 force_latest_revision=args.force_latest_revision, | |
562 **kwargs) | |
563 elif args.action == "test": | |
564 install_repository_manager.test_tools( | |
565 test_json=args.test_json, | |
566 repositories=repos, | |
567 log=log, | |
568 test_user_api_key=args.test_user_api_key, | |
569 test_user=args.test_user, | |
570 parallel_tests=args.parallel_tests, | |
571 ) | |
572 else: | |
573 raise NotImplementedError("This point in the code should not be reached. Please contact the developers.") | |
574 | |
575 # Run tests on the install results if required. | |
576 if install_results and args.test or args.test_existing: | |
577 to_be_tested_repositories = install_results.installed_repositories | |
578 if args.test_existing: | |
579 to_be_tested_repositories.extend(install_results.skipped_repositories) | |
580 if to_be_tested_repositories: | |
581 install_repository_manager.test_tools( | |
582 test_json=args.test_json, | |
583 repositories=to_be_tested_repositories, | |
584 log=log, | |
585 test_user_api_key=args.test_user_api_key, | |
586 test_user=args.test_user, | |
587 parallel_tests=args.parallel_tests, | |
588 ) | |
589 | |
590 | |
591 if __name__ == "__main__": | |
592 main() |