diff env/lib/python3.7/site-packages/ephemeris/shed_tools.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/ephemeris/shed_tools.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,592 @@
+"""
+A tool to automate installation of tool repositories from a Galaxy Tool Shed
+into an instance of Galaxy.
+
+Shed-tools has three commands: update, test and install.
+
+Update simply updates all the tools in a Galaxy given connection details on the command line.
+
+Test tests the specified tools in the Galaxy Instance.
+
+Install allows installation of tools in multiple ways.
+Galaxy instance details and the installed tools can be provided in one of three
+ways:
+
+1. In the YAML format via dedicated files (a sample can be found
+   `here <https://github.com/galaxyproject/ansible-galaxy-tools/blob/master/files/tool_list.yaml.sample>`_).
+2. On the command line as dedicated script options (see the usage help).
+3. As a single composite parameter to the script. The parameter must be a
+   single, YAML-formatted string with the keys corresponding to the keys
+   available for use in the YAML formatted file (for example:
+   `--yaml_tool "{'owner': 'kellrott', 'tool_shed_url':
+   'https://testtoolshed.g2.bx.psu.edu', 'tool_panel_section_id':
+   'peak_calling', 'name': 'synapse_interface'}"`).
+
+Only one of the methods can be used with each invocation of the script but if
+more than one are provided are provided, precedence will correspond to order
+of the items in the list above.
+When installing tools, Galaxy expects any `tool_panel_section_id` provided when
+installing a tool to already exist in the configuration. If the section
+does not exist, the tool will be installed outside any section. See
+`shed_tool_conf.xml.sample` in this directory for a sample of such file. Before
+running this script to install the tools, make sure to place such file into
+Galaxy's configuration directory and set Galaxy configuration option
+`tool_config_file` to include it.
+"""
+import datetime as dt
+import json
+import os
+import re
+import time
+from collections import namedtuple
+from concurrent.futures import thread, ThreadPoolExecutor
+
+import requests
+import yaml
+from bioblend.galaxy.client import ConnectionError
+from bioblend.galaxy.toolshed import ToolShedClient
+from galaxy.tool_util.verify.interactor import (
+    GalaxyInteractorApi,
+    verify_tool,
+)
+from galaxy.util import unicodify
+
+from . import get_galaxy_connection, load_yaml_file
+from .ephemeris_log import disable_external_library_logging, setup_global_logger
+from .get_tool_list_from_galaxy import GiToToolYaml, the_same_repository, tools_for_repository
+from .shed_tools_args import parser
+from .shed_tools_methods import complete_repo_information, flatten_repo_info, VALID_KEYS
+
+
+class InstallRepositoryManager(object):
+    """Manages the installation of new repositories on a galaxy instance"""
+
+    def __init__(self,
+                 galaxy_instance):
+        """Initialize a new tool manager"""
+        self.gi = galaxy_instance
+        self.tool_shed_client = ToolShedClient(self.gi)
+
+    def installed_repositories(self):
+        """Get currently installed tools"""
+        return GiToToolYaml(
+            gi=self.gi,
+            skip_tool_panel_section_name=False,
+            get_data_managers=True,
+            get_all_tools=True
+        ).tool_list.get("tools")
+
+    def filter_installed_repos(self, repos, check_revision=True):
+        # TODO: Find a speedier algorithm.
+        """This filters a list of repositories"""
+        not_installed_repos = []
+        already_installed_repos = []
+        if check_revision:
+            # If we want to check if revisions are equal, flatten the list,
+            # so each repository - revision combination has its own entry
+            installed_repos = flatten_repo_info(self.installed_repositories())
+        else:
+            # If we do not care about revision equality, do not do the flatten
+            # action to limit the number of comparisons.
+            installed_repos = self.installed_repositories()
+
+        for repo in repos:
+            for installed_repo in installed_repos:
+                if the_same_repository(installed_repo, repo, check_revision):
+                    already_installed_repos.append(repo)
+                    break
+            else:  # This executes when the for loop completes and no match has been found.
+                not_installed_repos.append(repo)
+        FilterResults = namedtuple("FilterResults", ["not_installed_repos", "already_installed_repos"])
+        return FilterResults(already_installed_repos=already_installed_repos, not_installed_repos=not_installed_repos)
+
+    def install_repositories(self,
+                             repositories,
+                             log=None,
+                             force_latest_revision=False,
+                             default_toolshed='https://toolshed.g2.bx.psu.edu/',
+                             default_install_tool_dependencies=False,
+                             default_install_resolver_dependencies=True,
+                             default_install_repository_dependencies=True):
+        """Install a list of tools on the current galaxy"""
+        if not repositories:
+            raise ValueError("Empty list of tools was given")
+        installation_start = dt.datetime.now()
+        installed_repositories = []
+        skipped_repositories = []
+        errored_repositories = []
+        counter = 0
+
+        # Check repos for invalid keys
+        for repo in repositories:
+            for key in repo.keys():
+                if key not in VALID_KEYS and key != 'revisions':
+                    if log:
+                        log.warning("'{0}' not a valid key. Will be skipped during parsing".format(key))
+
+        # Start by flattening the repo list per revision
+        flattened_repos = flatten_repo_info(repositories)
+        total_num_repositories = len(flattened_repos)
+
+        # Complete the repo information, and make sure each repository has a revision
+        repository_list = []
+        for repository in flattened_repos:
+            start = dt.datetime.now()
+            try:
+                complete_repo = complete_repo_information(
+                    repository,
+                    default_toolshed_url=default_toolshed,
+                    require_tool_panel_info=True,
+                    default_install_tool_dependencies=default_install_tool_dependencies,
+                    default_install_resolver_dependencies=default_install_resolver_dependencies,
+                    default_install_repository_dependencies=default_install_repository_dependencies,
+                    force_latest_revision=force_latest_revision)
+                repository_list.append(complete_repo)
+            except Exception as e:
+                # We'll run through the loop come whatever may, we log the errored repositories at the end anyway.
+                if log:
+                    log_repository_install_error(repository, start, unicodify(e), log)
+                errored_repositories.append(repository)
+
+        # Filter out already installed repos
+        filtered_repos = self.filter_installed_repos(repository_list)
+
+        for skipped_repo in filtered_repos.already_installed_repos:
+            counter += 1
+            if log:
+                log_repository_install_skip(skipped_repo, counter, total_num_repositories, log)
+            skipped_repositories.append(skipped_repo)
+
+        # Install repos
+        for repository in filtered_repos.not_installed_repos:
+            counter += 1
+            if log:
+                log_repository_install_start(repository, counter=counter, installation_start=installation_start, log=log,
+                                             total_num_repositories=total_num_repositories)
+            result = self.install_repository_revision(repository, log)
+            if result == "error":
+                errored_repositories.append(repository)
+            elif result == "skipped":
+                skipped_repositories.append(repository)
+            elif result == "installed":
+                installed_repositories.append(repository)
+
+        # Log results
+        if log:
+            log.info("Installed repositories ({0}): {1}".format(
+                len(installed_repositories),
+                [(
+                    t['name'],
+                    t.get('changeset_revision')
+                ) for t in installed_repositories])
+            )
+            log.info("Skipped repositories ({0}): {1}".format(
+                len(skipped_repositories),
+                [(
+                    t['name'],
+                    t.get('changeset_revision')
+                ) for t in skipped_repositories])
+            )
+            log.info("Errored repositories ({0}): {1}".format(
+                len(errored_repositories),
+                [(
+                    t['name'],
+                    t.get('changeset_revision', "")
+                ) for t in errored_repositories])
+            )
+            log.info("All repositories have been installed.")
+            log.info("Total run time: {0}".format(dt.datetime.now() - installation_start))
+        InstallResults = namedtuple("InstallResults",
+                                    ["installed_repositories", "errored_repositories", "skipped_repositories"])
+        return InstallResults(installed_repositories=installed_repositories,
+                              skipped_repositories=skipped_repositories,
+                              errored_repositories=errored_repositories)
+
+    def update_repositories(self, repositories=None, log=None, **kwargs):
+        if not repositories:  # Repositories None or empty list
+            repositories = self.installed_repositories()
+        else:
+            filtered_repos = self.filter_installed_repos(repositories, check_revision=False)
+            if filtered_repos.not_installed_repos:
+                if log:
+                    log.warning("The following tools are not installed and will not be upgraded: {0}".format(
+                        filtered_repos.not_installed_repos))
+            repositories = filtered_repos.already_installed_repos
+        return self.install_repositories(repositories, force_latest_revision=True, log=log, **kwargs)
+
+    def test_tools(self,
+                   test_json,
+                   repositories=None,
+                   log=None,
+                   test_user_api_key=None,
+                   test_user="ephemeris@galaxyproject.org",
+                   parallel_tests=1,
+                   ):
+        """Run tool tests for all tools in each repository in supplied tool list or ``self.installed_repositories()``.
+        """
+        tool_test_start = dt.datetime.now()
+        tests_passed = []
+        test_exceptions = []
+
+        if not repositories:  # If repositories is None or empty list
+            # Consider a variant of this that doesn't even consume a tool list YAML? target
+            # something like installed_repository_revisions(self.gi)
+            repositories = self.installed_repositories()
+
+        target_repositories = flatten_repo_info(repositories)
+
+        installed_tools = []
+        for target_repository in target_repositories:
+            repo_tools = tools_for_repository(self.gi, target_repository)
+            installed_tools.extend(repo_tools)
+
+        all_test_results = []
+        galaxy_interactor = self._get_interactor(test_user, test_user_api_key)
+        test_history = galaxy_interactor.new_history()
+
+        with ThreadPoolExecutor(max_workers=parallel_tests) as executor:
+            try:
+                for tool in installed_tools:
+                    self._test_tool(executor=executor,
+                                    tool=tool,
+                                    galaxy_interactor=galaxy_interactor,
+                                    test_history=test_history,
+                                    log=log,
+                                    tool_test_results=all_test_results,
+                                    tests_passed=tests_passed,
+                                    test_exceptions=test_exceptions,
+                                    )
+            finally:
+                # Always write report, even if test was cancelled.
+                try:
+                    executor.shutdown(wait=True)
+                except KeyboardInterrupt:
+                    executor._threads.clear()
+                    thread._threads_queues.clear()
+                n_passed = len(tests_passed)
+                n_failed = len(test_exceptions)
+                report_obj = {
+                    'version': '0.1',
+                    'suitename': 'Ephemeris tool tests targeting %s' % self.gi.base_url,
+                    'results': {
+                        'total': n_passed + n_failed,
+                        'errors': n_failed,
+                        'failures': 0,
+                        'skips': 0,
+                    },
+                    'tests': sorted(all_test_results, key=lambda el: el['id']),
+                }
+                with open(test_json, "w") as f:
+                    json.dump(report_obj, f)
+                if log:
+                    log.info("Report written to '%s'", os.path.abspath(test_json))
+                    log.info("Passed tool tests ({0}): {1}".format(
+                        n_passed,
+                        [t for t in tests_passed])
+                    )
+                    log.info("Failed tool tests ({0}): {1}".format(
+                        n_failed,
+                        [t[0] for t in test_exceptions])
+                    )
+                    log.info("Total tool test time: {0}".format(dt.datetime.now() - tool_test_start))
+
+    def _get_interactor(self, test_user, test_user_api_key):
+        if test_user_api_key is None:
+            whoami = self.gi.make_get_request(self.gi.url + "/whoami").json()
+            if whoami is not None:
+                test_user_api_key = self.gi.key
+        galaxy_interactor_kwds = {
+            "galaxy_url": re.sub('/api', '', self.gi.url),
+            "master_api_key": self.gi.key,
+            "api_key": test_user_api_key,  # TODO
+            "keep_outputs_dir": '',
+        }
+        if test_user_api_key is None:
+            galaxy_interactor_kwds["test_user"] = test_user
+        galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds)
+        return galaxy_interactor
+
+    @staticmethod
+    def _test_tool(executor,
+                   tool,
+                   galaxy_interactor,
+                   tool_test_results,
+                   tests_passed,
+                   test_exceptions,
+                   log,
+                   test_history=None,
+                   ):
+        if test_history is None:
+            test_history = galaxy_interactor.new_history()
+        tool_id = tool["id"]
+        tool_version = tool["version"]
+        try:
+            tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version)
+        except Exception as e:
+            if log:
+                log.warning("Fetching test definition for tool '%s' failed", tool_id, exc_info=True)
+            test_exceptions.append((tool_id, e))
+            Results = namedtuple("Results", ["tool_test_results", "tests_passed", "test_exceptions"])
+            return Results(tool_test_results=tool_test_results,
+                           tests_passed=tests_passed,
+                           test_exceptions=test_exceptions)
+        test_indices = list(range(len(tool_test_dicts)))
+
+        for test_index in test_indices:
+            test_id = tool_id + "-" + str(test_index)
+
+            def run_test(index, test_id):
+
+                def register(job_data):
+                    tool_test_results.append({
+                        'id': test_id,
+                        'has_data': True,
+                        'data': job_data,
+                    })
+
+                try:
+                    if log:
+                        log.info("Executing test '%s'", test_id)
+                    verify_tool(
+                        tool_id, galaxy_interactor, test_index=index, tool_version=tool_version,
+                        register_job_data=register, quiet=True, test_history=test_history,
+                    )
+                    tests_passed.append(test_id)
+                    if log:
+                        log.info("Test '%s' passed", test_id)
+                except Exception as e:
+                    if log:
+                        log.warning("Test '%s' failed", test_id, exc_info=True)
+                    test_exceptions.append((test_id, e))
+
+            executor.submit(run_test, test_index, test_id)
+
+    def install_repository_revision(self, repository, log):
+        default_err_msg = ('All repositories that you are attempting to install '
+                           'have been previously installed.')
+        start = dt.datetime.now()
+        try:
+            repository['new_tool_panel_section_label'] = repository.pop('tool_panel_section_label')
+            response = self.tool_shed_client.install_repository_revision(**repository)
+            if isinstance(response, dict) and response.get('status', None) == 'ok':
+                # This rare case happens if a repository is already installed but
+                # was not recognised as such in the above check. In such a
+                # case the return value looks like this:
+                # {u'status': u'ok', u'message': u'No repositories were
+                #  installed, possibly because the selected repository has
+                #  already been installed.'}
+                if log:
+                    log.debug("\tRepository {0} is already installed.".format(repository['name']))
+            if log:
+                log_repository_install_success(
+                    repository=repository,
+                    start=start,
+                    log=log)
+            return "installed"
+        except (ConnectionError, requests.exceptions.ConnectionError) as e:
+            if default_err_msg in unicodify(e):
+                # THIS SHOULD NOT HAPPEN DUE TO THE CHECKS EARLIER
+                if log:
+                    log.debug("\tRepository %s already installed (at revision %s)" %
+                              (repository['name'], repository['changeset_revision']))
+                return "skipped"
+            elif "504" in unicodify(e) or 'Connection aborted' in unicodify(e):
+                if log:
+                    log.debug("Timeout during install of %s, extending wait to 1h", repository['name'])
+                success = self.wait_for_install(repository=repository, log=log, timeout=3600)
+                if success:
+                    if log:
+                        log_repository_install_success(
+                            repository=repository,
+                            start=start,
+                            log=log)
+                    return "installed"
+                else:
+                    if log:
+                        log_repository_install_error(
+                            repository=repository,
+                            start=start, msg=e.body,
+                            log=log)
+                    return "error"
+            else:
+                if log:
+                    log_repository_install_error(
+                        repository=repository,
+                        start=start, msg=e.body,
+                        log=log)
+                return "error"
+
+    def wait_for_install(self, repository, log=None, timeout=3600):
+        """
+        If nginx times out, we look into the list of installed repositories
+        and try to determine if a repository of the same namer/owner is still installing.
+        Returns True if install finished successfully,
+        returns False when timeout is exceeded or installation has failed.
+        """
+        start = dt.datetime.now()
+        while (dt.datetime.now() - start) < dt.timedelta(seconds=timeout):
+            try:
+                installed_repo_list = self.tool_shed_client.get_repositories()
+                for installing_repo in installed_repo_list:
+                    if (installing_repo['name'] == repository['name']) and (
+                            installing_repo['owner'] == repository['owner']) and (
+                            installing_repo['changeset_revision'] == repository['changeset_revision']):
+                        if installing_repo['status'] == 'Installed':
+                            return True
+                        elif installing_repo['status'] == 'Error':
+                            return False
+                        else:
+                            time.sleep(10)
+            except ConnectionError as e:
+                if log:
+                    log.warning('Failed to get repositories list: %s', unicodify(e))
+                time.sleep(10)
+        return False
+
+
+def log_repository_install_error(repository, start, msg, log):
+    """
+    Log failed repository installations. Return a dictionary with information
+    """
+    end = dt.datetime.now()
+    log.error(
+        "\t* Error installing a repository (after %s seconds)! Name: %s," "owner: %s, ""revision: %s, error: %s",
+        str(end - start),
+        repository.get('name', ""),
+        repository.get('owner', ""),
+        repository.get('changeset_revision', ""),
+        msg)
+
+
+def log_repository_install_success(repository, start, log):
+    """
+    Log successful repository installation.
+    Repositories that finish in error still count as successful installs currently.
+    """
+    end = dt.datetime.now()
+    log.debug(
+        "\trepository %s installed successfully (in %s) at revision %s" % (
+            repository['name'],
+            str(end - start),
+            repository['changeset_revision']
+        )
+    )
+
+
+def log_repository_install_skip(repository, counter, total_num_repositories, log):
+    log.debug(
+        "({0}/{1}) repository {2} already installed at revision {3}. Skipping."
+        .format(
+            counter,
+            total_num_repositories,
+            repository['name'],
+            repository['changeset_revision']
+        )
+    )
+
+
+def log_repository_install_start(repository, counter, total_num_repositories, installation_start, log):
+    log.debug(
+        '(%s/%s) Installing repository %s from %s to section "%s" at revision %s (TRT: %s)' % (
+            counter, total_num_repositories,
+            repository['name'],
+            repository['owner'],
+            repository['tool_panel_section_id'] or repository['tool_panel_section_label'],
+            repository['changeset_revision'],
+            dt.datetime.now() - installation_start
+        )
+    )
+
+
+def args_to_repos(args):
+    if args.tool_list_file:
+        tool_list = load_yaml_file(args.tool_list_file)
+        repos = tool_list['tools']
+    elif args.tool_yaml:
+        repos = [yaml.safe_load(args.tool_yaml)]
+    elif args.name and args.owner:
+        repo = dict(
+            owner=args.owner,
+            name=args.name,
+            tool_panel_section_id=args.tool_panel_section_id,
+            tool_panel_section_label=args.tool_panel_section_label,
+            revisions=args.revisions
+        )
+        if args.tool_shed_url:
+            repo["tool_shed_url"] = args.tool_shed_url
+        repos = [repo]
+    else:
+        repos = []
+    return repos
+
+
+def main():
+    disable_external_library_logging()
+    args = parser().parse_args()
+    log = setup_global_logger(name=__name__, log_file=args.log_file)
+    gi = get_galaxy_connection(args, file=args.tool_list_file, log=log, login_required=True)
+    install_repository_manager = InstallRepositoryManager(gi)
+
+    repos = args_to_repos(args)
+
+    if args.tool_list_file:
+        tool_list = load_yaml_file(args.tool_list_file)
+    else:
+        tool_list = dict()
+
+    # Get some of the other installation arguments
+    kwargs = dict(
+        default_install_tool_dependencies=tool_list.get("install_tool_dependencies") or getattr(args,
+                                                                                                "install_tool_dependencies",
+                                                                                                False),
+        default_install_repository_dependencies=tool_list.get("install_repository_dependencies") or getattr(args,
+                                                                                                            "install_repository_dependencies",
+                                                                                                            False),
+        default_install_resolver_dependencies=tool_list.get("install_resolver_dependencies") or getattr(args,
+                                                                                                        "install_resolver_dependencies",
+                                                                                                        False))
+
+    # Start installing/updating and store the results in install_results.
+    # Or do testing if the action is `test`
+    install_results = None
+    if args.action == "update":
+        install_results = install_repository_manager.update_repositories(
+            repositories=repos,
+            log=log,
+            **kwargs)
+    elif args.action == "install":
+        install_results = install_repository_manager.install_repositories(
+            repos,
+            log=log,
+            force_latest_revision=args.force_latest_revision,
+            **kwargs)
+    elif args.action == "test":
+        install_repository_manager.test_tools(
+            test_json=args.test_json,
+            repositories=repos,
+            log=log,
+            test_user_api_key=args.test_user_api_key,
+            test_user=args.test_user,
+            parallel_tests=args.parallel_tests,
+        )
+    else:
+        raise NotImplementedError("This point in the code should not be reached. Please contact the developers.")
+
+    # Run tests on the install results if required.
+    if install_results and args.test or args.test_existing:
+        to_be_tested_repositories = install_results.installed_repositories
+        if args.test_existing:
+            to_be_tested_repositories.extend(install_results.skipped_repositories)
+        if to_be_tested_repositories:
+            install_repository_manager.test_tools(
+                test_json=args.test_json,
+                repositories=to_be_tested_repositories,
+                log=log,
+                test_user_api_key=args.test_user_api_key,
+                test_user=args.test_user,
+                parallel_tests=args.parallel_tests,
+            )
+
+
+if __name__ == "__main__":
+    main()