Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/ephemeris/shed_tools.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
""" A tool to automate installation of tool repositories from a Galaxy Tool Shed into an instance of Galaxy. Shed-tools has three commands: update, test and install. Update simply updates all the tools in a Galaxy given connection details on the command line. Test tests the specified tools in the Galaxy Instance. Install allows installation of tools in multiple ways. Galaxy instance details and the installed tools can be provided in one of three ways: 1. In the YAML format via dedicated files (a sample can be found `here <https://github.com/galaxyproject/ansible-galaxy-tools/blob/master/files/tool_list.yaml.sample>`_). 2. On the command line as dedicated script options (see the usage help). 3. As a single composite parameter to the script. The parameter must be a single, YAML-formatted string with the keys corresponding to the keys available for use in the YAML formatted file (for example: `--yaml_tool "{'owner': 'kellrott', 'tool_shed_url': 'https://testtoolshed.g2.bx.psu.edu', 'tool_panel_section_id': 'peak_calling', 'name': 'synapse_interface'}"`). Only one of the methods can be used with each invocation of the script but if more than one are provided are provided, precedence will correspond to order of the items in the list above. When installing tools, Galaxy expects any `tool_panel_section_id` provided when installing a tool to already exist in the configuration. If the section does not exist, the tool will be installed outside any section. See `shed_tool_conf.xml.sample` in this directory for a sample of such file. Before running this script to install the tools, make sure to place such file into Galaxy's configuration directory and set Galaxy configuration option `tool_config_file` to include it. """ import datetime as dt import json import os import re import time from collections import namedtuple from concurrent.futures import thread, ThreadPoolExecutor import requests import yaml from bioblend.galaxy.client import ConnectionError from bioblend.galaxy.toolshed import ToolShedClient from galaxy.tool_util.verify.interactor import ( GalaxyInteractorApi, verify_tool, ) from galaxy.util import unicodify from . import get_galaxy_connection, load_yaml_file from .ephemeris_log import disable_external_library_logging, setup_global_logger from .get_tool_list_from_galaxy import GiToToolYaml, the_same_repository, tools_for_repository from .shed_tools_args import parser from .shed_tools_methods import complete_repo_information, flatten_repo_info, VALID_KEYS class InstallRepositoryManager(object): """Manages the installation of new repositories on a galaxy instance""" def __init__(self, galaxy_instance): """Initialize a new tool manager""" self.gi = galaxy_instance self.tool_shed_client = ToolShedClient(self.gi) def installed_repositories(self): """Get currently installed tools""" return GiToToolYaml( gi=self.gi, skip_tool_panel_section_name=False, get_data_managers=True, get_all_tools=True ).tool_list.get("tools") def filter_installed_repos(self, repos, check_revision=True): # TODO: Find a speedier algorithm. """This filters a list of repositories""" not_installed_repos = [] already_installed_repos = [] if check_revision: # If we want to check if revisions are equal, flatten the list, # so each repository - revision combination has its own entry installed_repos = flatten_repo_info(self.installed_repositories()) else: # If we do not care about revision equality, do not do the flatten # action to limit the number of comparisons. installed_repos = self.installed_repositories() for repo in repos: for installed_repo in installed_repos: if the_same_repository(installed_repo, repo, check_revision): already_installed_repos.append(repo) break else: # This executes when the for loop completes and no match has been found. not_installed_repos.append(repo) FilterResults = namedtuple("FilterResults", ["not_installed_repos", "already_installed_repos"]) return FilterResults(already_installed_repos=already_installed_repos, not_installed_repos=not_installed_repos) def install_repositories(self, repositories, log=None, force_latest_revision=False, default_toolshed='https://toolshed.g2.bx.psu.edu/', default_install_tool_dependencies=False, default_install_resolver_dependencies=True, default_install_repository_dependencies=True): """Install a list of tools on the current galaxy""" if not repositories: raise ValueError("Empty list of tools was given") installation_start = dt.datetime.now() installed_repositories = [] skipped_repositories = [] errored_repositories = [] counter = 0 # Check repos for invalid keys for repo in repositories: for key in repo.keys(): if key not in VALID_KEYS and key != 'revisions': if log: log.warning("'{0}' not a valid key. Will be skipped during parsing".format(key)) # Start by flattening the repo list per revision flattened_repos = flatten_repo_info(repositories) total_num_repositories = len(flattened_repos) # Complete the repo information, and make sure each repository has a revision repository_list = [] for repository in flattened_repos: start = dt.datetime.now() try: complete_repo = complete_repo_information( repository, default_toolshed_url=default_toolshed, require_tool_panel_info=True, default_install_tool_dependencies=default_install_tool_dependencies, default_install_resolver_dependencies=default_install_resolver_dependencies, default_install_repository_dependencies=default_install_repository_dependencies, force_latest_revision=force_latest_revision) repository_list.append(complete_repo) except Exception as e: # We'll run through the loop come whatever may, we log the errored repositories at the end anyway. if log: log_repository_install_error(repository, start, unicodify(e), log) errored_repositories.append(repository) # Filter out already installed repos filtered_repos = self.filter_installed_repos(repository_list) for skipped_repo in filtered_repos.already_installed_repos: counter += 1 if log: log_repository_install_skip(skipped_repo, counter, total_num_repositories, log) skipped_repositories.append(skipped_repo) # Install repos for repository in filtered_repos.not_installed_repos: counter += 1 if log: log_repository_install_start(repository, counter=counter, installation_start=installation_start, log=log, total_num_repositories=total_num_repositories) result = self.install_repository_revision(repository, log) if result == "error": errored_repositories.append(repository) elif result == "skipped": skipped_repositories.append(repository) elif result == "installed": installed_repositories.append(repository) # Log results if log: log.info("Installed repositories ({0}): {1}".format( len(installed_repositories), [( t['name'], t.get('changeset_revision') ) for t in installed_repositories]) ) log.info("Skipped repositories ({0}): {1}".format( len(skipped_repositories), [( t['name'], t.get('changeset_revision') ) for t in skipped_repositories]) ) log.info("Errored repositories ({0}): {1}".format( len(errored_repositories), [( t['name'], t.get('changeset_revision', "") ) for t in errored_repositories]) ) log.info("All repositories have been installed.") log.info("Total run time: {0}".format(dt.datetime.now() - installation_start)) InstallResults = namedtuple("InstallResults", ["installed_repositories", "errored_repositories", "skipped_repositories"]) return InstallResults(installed_repositories=installed_repositories, skipped_repositories=skipped_repositories, errored_repositories=errored_repositories) def update_repositories(self, repositories=None, log=None, **kwargs): if not repositories: # Repositories None or empty list repositories = self.installed_repositories() else: filtered_repos = self.filter_installed_repos(repositories, check_revision=False) if filtered_repos.not_installed_repos: if log: log.warning("The following tools are not installed and will not be upgraded: {0}".format( filtered_repos.not_installed_repos)) repositories = filtered_repos.already_installed_repos return self.install_repositories(repositories, force_latest_revision=True, log=log, **kwargs) def test_tools(self, test_json, repositories=None, log=None, test_user_api_key=None, test_user="ephemeris@galaxyproject.org", parallel_tests=1, ): """Run tool tests for all tools in each repository in supplied tool list or ``self.installed_repositories()``. """ tool_test_start = dt.datetime.now() tests_passed = [] test_exceptions = [] if not repositories: # If repositories is None or empty list # Consider a variant of this that doesn't even consume a tool list YAML? target # something like installed_repository_revisions(self.gi) repositories = self.installed_repositories() target_repositories = flatten_repo_info(repositories) installed_tools = [] for target_repository in target_repositories: repo_tools = tools_for_repository(self.gi, target_repository) installed_tools.extend(repo_tools) all_test_results = [] galaxy_interactor = self._get_interactor(test_user, test_user_api_key) test_history = galaxy_interactor.new_history() with ThreadPoolExecutor(max_workers=parallel_tests) as executor: try: for tool in installed_tools: self._test_tool(executor=executor, tool=tool, galaxy_interactor=galaxy_interactor, test_history=test_history, log=log, tool_test_results=all_test_results, tests_passed=tests_passed, test_exceptions=test_exceptions, ) finally: # Always write report, even if test was cancelled. try: executor.shutdown(wait=True) except KeyboardInterrupt: executor._threads.clear() thread._threads_queues.clear() n_passed = len(tests_passed) n_failed = len(test_exceptions) report_obj = { 'version': '0.1', 'suitename': 'Ephemeris tool tests targeting %s' % self.gi.base_url, 'results': { 'total': n_passed + n_failed, 'errors': n_failed, 'failures': 0, 'skips': 0, }, 'tests': sorted(all_test_results, key=lambda el: el['id']), } with open(test_json, "w") as f: json.dump(report_obj, f) if log: log.info("Report written to '%s'", os.path.abspath(test_json)) log.info("Passed tool tests ({0}): {1}".format( n_passed, [t for t in tests_passed]) ) log.info("Failed tool tests ({0}): {1}".format( n_failed, [t[0] for t in test_exceptions]) ) log.info("Total tool test time: {0}".format(dt.datetime.now() - tool_test_start)) def _get_interactor(self, test_user, test_user_api_key): if test_user_api_key is None: whoami = self.gi.make_get_request(self.gi.url + "/whoami").json() if whoami is not None: test_user_api_key = self.gi.key galaxy_interactor_kwds = { "galaxy_url": re.sub('/api', '', self.gi.url), "master_api_key": self.gi.key, "api_key": test_user_api_key, # TODO "keep_outputs_dir": '', } if test_user_api_key is None: galaxy_interactor_kwds["test_user"] = test_user galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds) return galaxy_interactor @staticmethod def _test_tool(executor, tool, galaxy_interactor, tool_test_results, tests_passed, test_exceptions, log, test_history=None, ): if test_history is None: test_history = galaxy_interactor.new_history() tool_id = tool["id"] tool_version = tool["version"] try: tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version) except Exception as e: if log: log.warning("Fetching test definition for tool '%s' failed", tool_id, exc_info=True) test_exceptions.append((tool_id, e)) Results = namedtuple("Results", ["tool_test_results", "tests_passed", "test_exceptions"]) return Results(tool_test_results=tool_test_results, tests_passed=tests_passed, test_exceptions=test_exceptions) test_indices = list(range(len(tool_test_dicts))) for test_index in test_indices: test_id = tool_id + "-" + str(test_index) def run_test(index, test_id): def register(job_data): tool_test_results.append({ 'id': test_id, 'has_data': True, 'data': job_data, }) try: if log: log.info("Executing test '%s'", test_id) verify_tool( tool_id, galaxy_interactor, test_index=index, tool_version=tool_version, register_job_data=register, quiet=True, test_history=test_history, ) tests_passed.append(test_id) if log: log.info("Test '%s' passed", test_id) except Exception as e: if log: log.warning("Test '%s' failed", test_id, exc_info=True) test_exceptions.append((test_id, e)) executor.submit(run_test, test_index, test_id) def install_repository_revision(self, repository, log): default_err_msg = ('All repositories that you are attempting to install ' 'have been previously installed.') start = dt.datetime.now() try: repository['new_tool_panel_section_label'] = repository.pop('tool_panel_section_label') response = self.tool_shed_client.install_repository_revision(**repository) if isinstance(response, dict) and response.get('status', None) == 'ok': # This rare case happens if a repository is already installed but # was not recognised as such in the above check. In such a # case the return value looks like this: # {u'status': u'ok', u'message': u'No repositories were # installed, possibly because the selected repository has # already been installed.'} if log: log.debug("\tRepository {0} is already installed.".format(repository['name'])) if log: log_repository_install_success( repository=repository, start=start, log=log) return "installed" except (ConnectionError, requests.exceptions.ConnectionError) as e: if default_err_msg in unicodify(e): # THIS SHOULD NOT HAPPEN DUE TO THE CHECKS EARLIER if log: log.debug("\tRepository %s already installed (at revision %s)" % (repository['name'], repository['changeset_revision'])) return "skipped" elif "504" in unicodify(e) or 'Connection aborted' in unicodify(e): if log: log.debug("Timeout during install of %s, extending wait to 1h", repository['name']) success = self.wait_for_install(repository=repository, log=log, timeout=3600) if success: if log: log_repository_install_success( repository=repository, start=start, log=log) return "installed" else: if log: log_repository_install_error( repository=repository, start=start, msg=e.body, log=log) return "error" else: if log: log_repository_install_error( repository=repository, start=start, msg=e.body, log=log) return "error" def wait_for_install(self, repository, log=None, timeout=3600): """ If nginx times out, we look into the list of installed repositories and try to determine if a repository of the same namer/owner is still installing. Returns True if install finished successfully, returns False when timeout is exceeded or installation has failed. """ start = dt.datetime.now() while (dt.datetime.now() - start) < dt.timedelta(seconds=timeout): try: installed_repo_list = self.tool_shed_client.get_repositories() for installing_repo in installed_repo_list: if (installing_repo['name'] == repository['name']) and ( installing_repo['owner'] == repository['owner']) and ( installing_repo['changeset_revision'] == repository['changeset_revision']): if installing_repo['status'] == 'Installed': return True elif installing_repo['status'] == 'Error': return False else: time.sleep(10) except ConnectionError as e: if log: log.warning('Failed to get repositories list: %s', unicodify(e)) time.sleep(10) return False def log_repository_install_error(repository, start, msg, log): """ Log failed repository installations. Return a dictionary with information """ end = dt.datetime.now() log.error( "\t* Error installing a repository (after %s seconds)! Name: %s," "owner: %s, ""revision: %s, error: %s", str(end - start), repository.get('name', ""), repository.get('owner', ""), repository.get('changeset_revision', ""), msg) def log_repository_install_success(repository, start, log): """ Log successful repository installation. Repositories that finish in error still count as successful installs currently. """ end = dt.datetime.now() log.debug( "\trepository %s installed successfully (in %s) at revision %s" % ( repository['name'], str(end - start), repository['changeset_revision'] ) ) def log_repository_install_skip(repository, counter, total_num_repositories, log): log.debug( "({0}/{1}) repository {2} already installed at revision {3}. Skipping." .format( counter, total_num_repositories, repository['name'], repository['changeset_revision'] ) ) def log_repository_install_start(repository, counter, total_num_repositories, installation_start, log): log.debug( '(%s/%s) Installing repository %s from %s to section "%s" at revision %s (TRT: %s)' % ( counter, total_num_repositories, repository['name'], repository['owner'], repository['tool_panel_section_id'] or repository['tool_panel_section_label'], repository['changeset_revision'], dt.datetime.now() - installation_start ) ) def args_to_repos(args): if args.tool_list_file: tool_list = load_yaml_file(args.tool_list_file) repos = tool_list['tools'] elif args.tool_yaml: repos = [yaml.safe_load(args.tool_yaml)] elif args.name and args.owner: repo = dict( owner=args.owner, name=args.name, tool_panel_section_id=args.tool_panel_section_id, tool_panel_section_label=args.tool_panel_section_label, revisions=args.revisions ) if args.tool_shed_url: repo["tool_shed_url"] = args.tool_shed_url repos = [repo] else: repos = [] return repos def main(): disable_external_library_logging() args = parser().parse_args() log = setup_global_logger(name=__name__, log_file=args.log_file) gi = get_galaxy_connection(args, file=args.tool_list_file, log=log, login_required=True) install_repository_manager = InstallRepositoryManager(gi) repos = args_to_repos(args) if args.tool_list_file: tool_list = load_yaml_file(args.tool_list_file) else: tool_list = dict() # Get some of the other installation arguments kwargs = dict( default_install_tool_dependencies=tool_list.get("install_tool_dependencies") or getattr(args, "install_tool_dependencies", False), default_install_repository_dependencies=tool_list.get("install_repository_dependencies") or getattr(args, "install_repository_dependencies", False), default_install_resolver_dependencies=tool_list.get("install_resolver_dependencies") or getattr(args, "install_resolver_dependencies", False)) # Start installing/updating and store the results in install_results. # Or do testing if the action is `test` install_results = None if args.action == "update": install_results = install_repository_manager.update_repositories( repositories=repos, log=log, **kwargs) elif args.action == "install": install_results = install_repository_manager.install_repositories( repos, log=log, force_latest_revision=args.force_latest_revision, **kwargs) elif args.action == "test": install_repository_manager.test_tools( test_json=args.test_json, repositories=repos, log=log, test_user_api_key=args.test_user_api_key, test_user=args.test_user, parallel_tests=args.parallel_tests, ) else: raise NotImplementedError("This point in the code should not be reached. Please contact the developers.") # Run tests on the install results if required. if install_results and args.test or args.test_existing: to_be_tested_repositories = install_results.installed_repositories if args.test_existing: to_be_tested_repositories.extend(install_results.skipped_repositories) if to_be_tested_repositories: install_repository_manager.test_tools( test_json=args.test_json, repositories=to_be_tested_repositories, log=log, test_user_api_key=args.test_user_api_key, test_user=args.test_user, parallel_tests=args.parallel_tests, ) if __name__ == "__main__": main()