Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/ephemeris/shed_tools.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/ephemeris/shed_tools.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,592 +0,0 @@ -""" -A tool to automate installation of tool repositories from a Galaxy Tool Shed -into an instance of Galaxy. - -Shed-tools has three commands: update, test and install. - -Update simply updates all the tools in a Galaxy given connection details on the command line. - -Test tests the specified tools in the Galaxy Instance. - -Install allows installation of tools in multiple ways. -Galaxy instance details and the installed tools can be provided in one of three -ways: - -1. In the YAML format via dedicated files (a sample can be found - `here <https://github.com/galaxyproject/ansible-galaxy-tools/blob/master/files/tool_list.yaml.sample>`_). -2. On the command line as dedicated script options (see the usage help). -3. As a single composite parameter to the script. The parameter must be a - single, YAML-formatted string with the keys corresponding to the keys - available for use in the YAML formatted file (for example: - `--yaml_tool "{'owner': 'kellrott', 'tool_shed_url': - 'https://testtoolshed.g2.bx.psu.edu', 'tool_panel_section_id': - 'peak_calling', 'name': 'synapse_interface'}"`). - -Only one of the methods can be used with each invocation of the script but if -more than one are provided are provided, precedence will correspond to order -of the items in the list above. -When installing tools, Galaxy expects any `tool_panel_section_id` provided when -installing a tool to already exist in the configuration. If the section -does not exist, the tool will be installed outside any section. See -`shed_tool_conf.xml.sample` in this directory for a sample of such file. Before -running this script to install the tools, make sure to place such file into -Galaxy's configuration directory and set Galaxy configuration option -`tool_config_file` to include it. -""" -import datetime as dt -import json -import os -import re -import time -from collections import namedtuple -from concurrent.futures import thread, ThreadPoolExecutor - -import requests -import yaml -from bioblend.galaxy.client import ConnectionError -from bioblend.galaxy.toolshed import ToolShedClient -from galaxy.tool_util.verify.interactor import ( - GalaxyInteractorApi, - verify_tool, -) -from galaxy.util import unicodify - -from . import get_galaxy_connection, load_yaml_file -from .ephemeris_log import disable_external_library_logging, setup_global_logger -from .get_tool_list_from_galaxy import GiToToolYaml, the_same_repository, tools_for_repository -from .shed_tools_args import parser -from .shed_tools_methods import complete_repo_information, flatten_repo_info, VALID_KEYS - - -class InstallRepositoryManager(object): - """Manages the installation of new repositories on a galaxy instance""" - - def __init__(self, - galaxy_instance): - """Initialize a new tool manager""" - self.gi = galaxy_instance - self.tool_shed_client = ToolShedClient(self.gi) - - def installed_repositories(self): - """Get currently installed tools""" - return GiToToolYaml( - gi=self.gi, - skip_tool_panel_section_name=False, - get_data_managers=True, - get_all_tools=True - ).tool_list.get("tools") - - def filter_installed_repos(self, repos, check_revision=True): - # TODO: Find a speedier algorithm. - """This filters a list of repositories""" - not_installed_repos = [] - already_installed_repos = [] - if check_revision: - # If we want to check if revisions are equal, flatten the list, - # so each repository - revision combination has its own entry - installed_repos = flatten_repo_info(self.installed_repositories()) - else: - # If we do not care about revision equality, do not do the flatten - # action to limit the number of comparisons. - installed_repos = self.installed_repositories() - - for repo in repos: - for installed_repo in installed_repos: - if the_same_repository(installed_repo, repo, check_revision): - already_installed_repos.append(repo) - break - else: # This executes when the for loop completes and no match has been found. - not_installed_repos.append(repo) - FilterResults = namedtuple("FilterResults", ["not_installed_repos", "already_installed_repos"]) - return FilterResults(already_installed_repos=already_installed_repos, not_installed_repos=not_installed_repos) - - def install_repositories(self, - repositories, - log=None, - force_latest_revision=False, - default_toolshed='https://toolshed.g2.bx.psu.edu/', - default_install_tool_dependencies=False, - default_install_resolver_dependencies=True, - default_install_repository_dependencies=True): - """Install a list of tools on the current galaxy""" - if not repositories: - raise ValueError("Empty list of tools was given") - installation_start = dt.datetime.now() - installed_repositories = [] - skipped_repositories = [] - errored_repositories = [] - counter = 0 - - # Check repos for invalid keys - for repo in repositories: - for key in repo.keys(): - if key not in VALID_KEYS and key != 'revisions': - if log: - log.warning("'{0}' not a valid key. Will be skipped during parsing".format(key)) - - # Start by flattening the repo list per revision - flattened_repos = flatten_repo_info(repositories) - total_num_repositories = len(flattened_repos) - - # Complete the repo information, and make sure each repository has a revision - repository_list = [] - for repository in flattened_repos: - start = dt.datetime.now() - try: - complete_repo = complete_repo_information( - repository, - default_toolshed_url=default_toolshed, - require_tool_panel_info=True, - default_install_tool_dependencies=default_install_tool_dependencies, - default_install_resolver_dependencies=default_install_resolver_dependencies, - default_install_repository_dependencies=default_install_repository_dependencies, - force_latest_revision=force_latest_revision) - repository_list.append(complete_repo) - except Exception as e: - # We'll run through the loop come whatever may, we log the errored repositories at the end anyway. - if log: - log_repository_install_error(repository, start, unicodify(e), log) - errored_repositories.append(repository) - - # Filter out already installed repos - filtered_repos = self.filter_installed_repos(repository_list) - - for skipped_repo in filtered_repos.already_installed_repos: - counter += 1 - if log: - log_repository_install_skip(skipped_repo, counter, total_num_repositories, log) - skipped_repositories.append(skipped_repo) - - # Install repos - for repository in filtered_repos.not_installed_repos: - counter += 1 - if log: - log_repository_install_start(repository, counter=counter, installation_start=installation_start, log=log, - total_num_repositories=total_num_repositories) - result = self.install_repository_revision(repository, log) - if result == "error": - errored_repositories.append(repository) - elif result == "skipped": - skipped_repositories.append(repository) - elif result == "installed": - installed_repositories.append(repository) - - # Log results - if log: - log.info("Installed repositories ({0}): {1}".format( - len(installed_repositories), - [( - t['name'], - t.get('changeset_revision') - ) for t in installed_repositories]) - ) - log.info("Skipped repositories ({0}): {1}".format( - len(skipped_repositories), - [( - t['name'], - t.get('changeset_revision') - ) for t in skipped_repositories]) - ) - log.info("Errored repositories ({0}): {1}".format( - len(errored_repositories), - [( - t['name'], - t.get('changeset_revision', "") - ) for t in errored_repositories]) - ) - log.info("All repositories have been installed.") - log.info("Total run time: {0}".format(dt.datetime.now() - installation_start)) - InstallResults = namedtuple("InstallResults", - ["installed_repositories", "errored_repositories", "skipped_repositories"]) - return InstallResults(installed_repositories=installed_repositories, - skipped_repositories=skipped_repositories, - errored_repositories=errored_repositories) - - def update_repositories(self, repositories=None, log=None, **kwargs): - if not repositories: # Repositories None or empty list - repositories = self.installed_repositories() - else: - filtered_repos = self.filter_installed_repos(repositories, check_revision=False) - if filtered_repos.not_installed_repos: - if log: - log.warning("The following tools are not installed and will not be upgraded: {0}".format( - filtered_repos.not_installed_repos)) - repositories = filtered_repos.already_installed_repos - return self.install_repositories(repositories, force_latest_revision=True, log=log, **kwargs) - - def test_tools(self, - test_json, - repositories=None, - log=None, - test_user_api_key=None, - test_user="ephemeris@galaxyproject.org", - parallel_tests=1, - ): - """Run tool tests for all tools in each repository in supplied tool list or ``self.installed_repositories()``. - """ - tool_test_start = dt.datetime.now() - tests_passed = [] - test_exceptions = [] - - if not repositories: # If repositories is None or empty list - # Consider a variant of this that doesn't even consume a tool list YAML? target - # something like installed_repository_revisions(self.gi) - repositories = self.installed_repositories() - - target_repositories = flatten_repo_info(repositories) - - installed_tools = [] - for target_repository in target_repositories: - repo_tools = tools_for_repository(self.gi, target_repository) - installed_tools.extend(repo_tools) - - all_test_results = [] - galaxy_interactor = self._get_interactor(test_user, test_user_api_key) - test_history = galaxy_interactor.new_history() - - with ThreadPoolExecutor(max_workers=parallel_tests) as executor: - try: - for tool in installed_tools: - self._test_tool(executor=executor, - tool=tool, - galaxy_interactor=galaxy_interactor, - test_history=test_history, - log=log, - tool_test_results=all_test_results, - tests_passed=tests_passed, - test_exceptions=test_exceptions, - ) - finally: - # Always write report, even if test was cancelled. - try: - executor.shutdown(wait=True) - except KeyboardInterrupt: - executor._threads.clear() - thread._threads_queues.clear() - n_passed = len(tests_passed) - n_failed = len(test_exceptions) - report_obj = { - 'version': '0.1', - 'suitename': 'Ephemeris tool tests targeting %s' % self.gi.base_url, - 'results': { - 'total': n_passed + n_failed, - 'errors': n_failed, - 'failures': 0, - 'skips': 0, - }, - 'tests': sorted(all_test_results, key=lambda el: el['id']), - } - with open(test_json, "w") as f: - json.dump(report_obj, f) - if log: - log.info("Report written to '%s'", os.path.abspath(test_json)) - log.info("Passed tool tests ({0}): {1}".format( - n_passed, - [t for t in tests_passed]) - ) - log.info("Failed tool tests ({0}): {1}".format( - n_failed, - [t[0] for t in test_exceptions]) - ) - log.info("Total tool test time: {0}".format(dt.datetime.now() - tool_test_start)) - - def _get_interactor(self, test_user, test_user_api_key): - if test_user_api_key is None: - whoami = self.gi.make_get_request(self.gi.url + "/whoami").json() - if whoami is not None: - test_user_api_key = self.gi.key - galaxy_interactor_kwds = { - "galaxy_url": re.sub('/api', '', self.gi.url), - "master_api_key": self.gi.key, - "api_key": test_user_api_key, # TODO - "keep_outputs_dir": '', - } - if test_user_api_key is None: - galaxy_interactor_kwds["test_user"] = test_user - galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds) - return galaxy_interactor - - @staticmethod - def _test_tool(executor, - tool, - galaxy_interactor, - tool_test_results, - tests_passed, - test_exceptions, - log, - test_history=None, - ): - if test_history is None: - test_history = galaxy_interactor.new_history() - tool_id = tool["id"] - tool_version = tool["version"] - try: - tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version) - except Exception as e: - if log: - log.warning("Fetching test definition for tool '%s' failed", tool_id, exc_info=True) - test_exceptions.append((tool_id, e)) - Results = namedtuple("Results", ["tool_test_results", "tests_passed", "test_exceptions"]) - return Results(tool_test_results=tool_test_results, - tests_passed=tests_passed, - test_exceptions=test_exceptions) - test_indices = list(range(len(tool_test_dicts))) - - for test_index in test_indices: - test_id = tool_id + "-" + str(test_index) - - def run_test(index, test_id): - - def register(job_data): - tool_test_results.append({ - 'id': test_id, - 'has_data': True, - 'data': job_data, - }) - - try: - if log: - log.info("Executing test '%s'", test_id) - verify_tool( - tool_id, galaxy_interactor, test_index=index, tool_version=tool_version, - register_job_data=register, quiet=True, test_history=test_history, - ) - tests_passed.append(test_id) - if log: - log.info("Test '%s' passed", test_id) - except Exception as e: - if log: - log.warning("Test '%s' failed", test_id, exc_info=True) - test_exceptions.append((test_id, e)) - - executor.submit(run_test, test_index, test_id) - - def install_repository_revision(self, repository, log): - default_err_msg = ('All repositories that you are attempting to install ' - 'have been previously installed.') - start = dt.datetime.now() - try: - repository['new_tool_panel_section_label'] = repository.pop('tool_panel_section_label') - response = self.tool_shed_client.install_repository_revision(**repository) - if isinstance(response, dict) and response.get('status', None) == 'ok': - # This rare case happens if a repository is already installed but - # was not recognised as such in the above check. In such a - # case the return value looks like this: - # {u'status': u'ok', u'message': u'No repositories were - # installed, possibly because the selected repository has - # already been installed.'} - if log: - log.debug("\tRepository {0} is already installed.".format(repository['name'])) - if log: - log_repository_install_success( - repository=repository, - start=start, - log=log) - return "installed" - except (ConnectionError, requests.exceptions.ConnectionError) as e: - if default_err_msg in unicodify(e): - # THIS SHOULD NOT HAPPEN DUE TO THE CHECKS EARLIER - if log: - log.debug("\tRepository %s already installed (at revision %s)" % - (repository['name'], repository['changeset_revision'])) - return "skipped" - elif "504" in unicodify(e) or 'Connection aborted' in unicodify(e): - if log: - log.debug("Timeout during install of %s, extending wait to 1h", repository['name']) - success = self.wait_for_install(repository=repository, log=log, timeout=3600) - if success: - if log: - log_repository_install_success( - repository=repository, - start=start, - log=log) - return "installed" - else: - if log: - log_repository_install_error( - repository=repository, - start=start, msg=e.body, - log=log) - return "error" - else: - if log: - log_repository_install_error( - repository=repository, - start=start, msg=e.body, - log=log) - return "error" - - def wait_for_install(self, repository, log=None, timeout=3600): - """ - If nginx times out, we look into the list of installed repositories - and try to determine if a repository of the same namer/owner is still installing. - Returns True if install finished successfully, - returns False when timeout is exceeded or installation has failed. - """ - start = dt.datetime.now() - while (dt.datetime.now() - start) < dt.timedelta(seconds=timeout): - try: - installed_repo_list = self.tool_shed_client.get_repositories() - for installing_repo in installed_repo_list: - if (installing_repo['name'] == repository['name']) and ( - installing_repo['owner'] == repository['owner']) and ( - installing_repo['changeset_revision'] == repository['changeset_revision']): - if installing_repo['status'] == 'Installed': - return True - elif installing_repo['status'] == 'Error': - return False - else: - time.sleep(10) - except ConnectionError as e: - if log: - log.warning('Failed to get repositories list: %s', unicodify(e)) - time.sleep(10) - return False - - -def log_repository_install_error(repository, start, msg, log): - """ - Log failed repository installations. Return a dictionary with information - """ - end = dt.datetime.now() - log.error( - "\t* Error installing a repository (after %s seconds)! Name: %s," "owner: %s, ""revision: %s, error: %s", - str(end - start), - repository.get('name', ""), - repository.get('owner', ""), - repository.get('changeset_revision', ""), - msg) - - -def log_repository_install_success(repository, start, log): - """ - Log successful repository installation. - Repositories that finish in error still count as successful installs currently. - """ - end = dt.datetime.now() - log.debug( - "\trepository %s installed successfully (in %s) at revision %s" % ( - repository['name'], - str(end - start), - repository['changeset_revision'] - ) - ) - - -def log_repository_install_skip(repository, counter, total_num_repositories, log): - log.debug( - "({0}/{1}) repository {2} already installed at revision {3}. Skipping." - .format( - counter, - total_num_repositories, - repository['name'], - repository['changeset_revision'] - ) - ) - - -def log_repository_install_start(repository, counter, total_num_repositories, installation_start, log): - log.debug( - '(%s/%s) Installing repository %s from %s to section "%s" at revision %s (TRT: %s)' % ( - counter, total_num_repositories, - repository['name'], - repository['owner'], - repository['tool_panel_section_id'] or repository['tool_panel_section_label'], - repository['changeset_revision'], - dt.datetime.now() - installation_start - ) - ) - - -def args_to_repos(args): - if args.tool_list_file: - tool_list = load_yaml_file(args.tool_list_file) - repos = tool_list['tools'] - elif args.tool_yaml: - repos = [yaml.safe_load(args.tool_yaml)] - elif args.name and args.owner: - repo = dict( - owner=args.owner, - name=args.name, - tool_panel_section_id=args.tool_panel_section_id, - tool_panel_section_label=args.tool_panel_section_label, - revisions=args.revisions - ) - if args.tool_shed_url: - repo["tool_shed_url"] = args.tool_shed_url - repos = [repo] - else: - repos = [] - return repos - - -def main(): - disable_external_library_logging() - args = parser().parse_args() - log = setup_global_logger(name=__name__, log_file=args.log_file) - gi = get_galaxy_connection(args, file=args.tool_list_file, log=log, login_required=True) - install_repository_manager = InstallRepositoryManager(gi) - - repos = args_to_repos(args) - - if args.tool_list_file: - tool_list = load_yaml_file(args.tool_list_file) - else: - tool_list = dict() - - # Get some of the other installation arguments - kwargs = dict( - default_install_tool_dependencies=tool_list.get("install_tool_dependencies") or getattr(args, - "install_tool_dependencies", - False), - default_install_repository_dependencies=tool_list.get("install_repository_dependencies") or getattr(args, - "install_repository_dependencies", - False), - default_install_resolver_dependencies=tool_list.get("install_resolver_dependencies") or getattr(args, - "install_resolver_dependencies", - False)) - - # Start installing/updating and store the results in install_results. - # Or do testing if the action is `test` - install_results = None - if args.action == "update": - install_results = install_repository_manager.update_repositories( - repositories=repos, - log=log, - **kwargs) - elif args.action == "install": - install_results = install_repository_manager.install_repositories( - repos, - log=log, - force_latest_revision=args.force_latest_revision, - **kwargs) - elif args.action == "test": - install_repository_manager.test_tools( - test_json=args.test_json, - repositories=repos, - log=log, - test_user_api_key=args.test_user_api_key, - test_user=args.test_user, - parallel_tests=args.parallel_tests, - ) - else: - raise NotImplementedError("This point in the code should not be reached. Please contact the developers.") - - # Run tests on the install results if required. - if install_results and args.test or args.test_existing: - to_be_tested_repositories = install_results.installed_repositories - if args.test_existing: - to_be_tested_repositories.extend(install_results.skipped_repositories) - if to_be_tested_repositories: - install_repository_manager.test_tools( - test_json=args.test_json, - repositories=to_be_tested_repositories, - log=log, - test_user_api_key=args.test_user_api_key, - test_user=args.test_user, - parallel_tests=args.parallel_tests, - ) - - -if __name__ == "__main__": - main()