diff env/lib/python3.7/site-packages/ephemeris/shed_tools.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/ephemeris/shed_tools.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,592 +0,0 @@
-"""
-A tool to automate installation of tool repositories from a Galaxy Tool Shed
-into an instance of Galaxy.
-
-Shed-tools has three commands: update, test and install.
-
-Update simply updates all the tools in a Galaxy given connection details on the command line.
-
-Test tests the specified tools in the Galaxy Instance.
-
-Install allows installation of tools in multiple ways.
-Galaxy instance details and the installed tools can be provided in one of three
-ways:
-
-1. In the YAML format via dedicated files (a sample can be found
-   `here <https://github.com/galaxyproject/ansible-galaxy-tools/blob/master/files/tool_list.yaml.sample>`_).
-2. On the command line as dedicated script options (see the usage help).
-3. As a single composite parameter to the script. The parameter must be a
-   single, YAML-formatted string with the keys corresponding to the keys
-   available for use in the YAML formatted file (for example:
-   `--yaml_tool "{'owner': 'kellrott', 'tool_shed_url':
-   'https://testtoolshed.g2.bx.psu.edu', 'tool_panel_section_id':
-   'peak_calling', 'name': 'synapse_interface'}"`).
-
-Only one of the methods can be used with each invocation of the script but if
-more than one are provided are provided, precedence will correspond to order
-of the items in the list above.
-When installing tools, Galaxy expects any `tool_panel_section_id` provided when
-installing a tool to already exist in the configuration. If the section
-does not exist, the tool will be installed outside any section. See
-`shed_tool_conf.xml.sample` in this directory for a sample of such file. Before
-running this script to install the tools, make sure to place such file into
-Galaxy's configuration directory and set Galaxy configuration option
-`tool_config_file` to include it.
-"""
-import datetime as dt
-import json
-import os
-import re
-import time
-from collections import namedtuple
-from concurrent.futures import thread, ThreadPoolExecutor
-
-import requests
-import yaml
-from bioblend.galaxy.client import ConnectionError
-from bioblend.galaxy.toolshed import ToolShedClient
-from galaxy.tool_util.verify.interactor import (
-    GalaxyInteractorApi,
-    verify_tool,
-)
-from galaxy.util import unicodify
-
-from . import get_galaxy_connection, load_yaml_file
-from .ephemeris_log import disable_external_library_logging, setup_global_logger
-from .get_tool_list_from_galaxy import GiToToolYaml, the_same_repository, tools_for_repository
-from .shed_tools_args import parser
-from .shed_tools_methods import complete_repo_information, flatten_repo_info, VALID_KEYS
-
-
-class InstallRepositoryManager(object):
-    """Manages the installation of new repositories on a galaxy instance"""
-
-    def __init__(self,
-                 galaxy_instance):
-        """Initialize a new tool manager"""
-        self.gi = galaxy_instance
-        self.tool_shed_client = ToolShedClient(self.gi)
-
-    def installed_repositories(self):
-        """Get currently installed tools"""
-        return GiToToolYaml(
-            gi=self.gi,
-            skip_tool_panel_section_name=False,
-            get_data_managers=True,
-            get_all_tools=True
-        ).tool_list.get("tools")
-
-    def filter_installed_repos(self, repos, check_revision=True):
-        # TODO: Find a speedier algorithm.
-        """This filters a list of repositories"""
-        not_installed_repos = []
-        already_installed_repos = []
-        if check_revision:
-            # If we want to check if revisions are equal, flatten the list,
-            # so each repository - revision combination has its own entry
-            installed_repos = flatten_repo_info(self.installed_repositories())
-        else:
-            # If we do not care about revision equality, do not do the flatten
-            # action to limit the number of comparisons.
-            installed_repos = self.installed_repositories()
-
-        for repo in repos:
-            for installed_repo in installed_repos:
-                if the_same_repository(installed_repo, repo, check_revision):
-                    already_installed_repos.append(repo)
-                    break
-            else:  # This executes when the for loop completes and no match has been found.
-                not_installed_repos.append(repo)
-        FilterResults = namedtuple("FilterResults", ["not_installed_repos", "already_installed_repos"])
-        return FilterResults(already_installed_repos=already_installed_repos, not_installed_repos=not_installed_repos)
-
-    def install_repositories(self,
-                             repositories,
-                             log=None,
-                             force_latest_revision=False,
-                             default_toolshed='https://toolshed.g2.bx.psu.edu/',
-                             default_install_tool_dependencies=False,
-                             default_install_resolver_dependencies=True,
-                             default_install_repository_dependencies=True):
-        """Install a list of tools on the current galaxy"""
-        if not repositories:
-            raise ValueError("Empty list of tools was given")
-        installation_start = dt.datetime.now()
-        installed_repositories = []
-        skipped_repositories = []
-        errored_repositories = []
-        counter = 0
-
-        # Check repos for invalid keys
-        for repo in repositories:
-            for key in repo.keys():
-                if key not in VALID_KEYS and key != 'revisions':
-                    if log:
-                        log.warning("'{0}' not a valid key. Will be skipped during parsing".format(key))
-
-        # Start by flattening the repo list per revision
-        flattened_repos = flatten_repo_info(repositories)
-        total_num_repositories = len(flattened_repos)
-
-        # Complete the repo information, and make sure each repository has a revision
-        repository_list = []
-        for repository in flattened_repos:
-            start = dt.datetime.now()
-            try:
-                complete_repo = complete_repo_information(
-                    repository,
-                    default_toolshed_url=default_toolshed,
-                    require_tool_panel_info=True,
-                    default_install_tool_dependencies=default_install_tool_dependencies,
-                    default_install_resolver_dependencies=default_install_resolver_dependencies,
-                    default_install_repository_dependencies=default_install_repository_dependencies,
-                    force_latest_revision=force_latest_revision)
-                repository_list.append(complete_repo)
-            except Exception as e:
-                # We'll run through the loop come whatever may, we log the errored repositories at the end anyway.
-                if log:
-                    log_repository_install_error(repository, start, unicodify(e), log)
-                errored_repositories.append(repository)
-
-        # Filter out already installed repos
-        filtered_repos = self.filter_installed_repos(repository_list)
-
-        for skipped_repo in filtered_repos.already_installed_repos:
-            counter += 1
-            if log:
-                log_repository_install_skip(skipped_repo, counter, total_num_repositories, log)
-            skipped_repositories.append(skipped_repo)
-
-        # Install repos
-        for repository in filtered_repos.not_installed_repos:
-            counter += 1
-            if log:
-                log_repository_install_start(repository, counter=counter, installation_start=installation_start, log=log,
-                                             total_num_repositories=total_num_repositories)
-            result = self.install_repository_revision(repository, log)
-            if result == "error":
-                errored_repositories.append(repository)
-            elif result == "skipped":
-                skipped_repositories.append(repository)
-            elif result == "installed":
-                installed_repositories.append(repository)
-
-        # Log results
-        if log:
-            log.info("Installed repositories ({0}): {1}".format(
-                len(installed_repositories),
-                [(
-                    t['name'],
-                    t.get('changeset_revision')
-                ) for t in installed_repositories])
-            )
-            log.info("Skipped repositories ({0}): {1}".format(
-                len(skipped_repositories),
-                [(
-                    t['name'],
-                    t.get('changeset_revision')
-                ) for t in skipped_repositories])
-            )
-            log.info("Errored repositories ({0}): {1}".format(
-                len(errored_repositories),
-                [(
-                    t['name'],
-                    t.get('changeset_revision', "")
-                ) for t in errored_repositories])
-            )
-            log.info("All repositories have been installed.")
-            log.info("Total run time: {0}".format(dt.datetime.now() - installation_start))
-        InstallResults = namedtuple("InstallResults",
-                                    ["installed_repositories", "errored_repositories", "skipped_repositories"])
-        return InstallResults(installed_repositories=installed_repositories,
-                              skipped_repositories=skipped_repositories,
-                              errored_repositories=errored_repositories)
-
-    def update_repositories(self, repositories=None, log=None, **kwargs):
-        if not repositories:  # Repositories None or empty list
-            repositories = self.installed_repositories()
-        else:
-            filtered_repos = self.filter_installed_repos(repositories, check_revision=False)
-            if filtered_repos.not_installed_repos:
-                if log:
-                    log.warning("The following tools are not installed and will not be upgraded: {0}".format(
-                        filtered_repos.not_installed_repos))
-            repositories = filtered_repos.already_installed_repos
-        return self.install_repositories(repositories, force_latest_revision=True, log=log, **kwargs)
-
-    def test_tools(self,
-                   test_json,
-                   repositories=None,
-                   log=None,
-                   test_user_api_key=None,
-                   test_user="ephemeris@galaxyproject.org",
-                   parallel_tests=1,
-                   ):
-        """Run tool tests for all tools in each repository in supplied tool list or ``self.installed_repositories()``.
-        """
-        tool_test_start = dt.datetime.now()
-        tests_passed = []
-        test_exceptions = []
-
-        if not repositories:  # If repositories is None or empty list
-            # Consider a variant of this that doesn't even consume a tool list YAML? target
-            # something like installed_repository_revisions(self.gi)
-            repositories = self.installed_repositories()
-
-        target_repositories = flatten_repo_info(repositories)
-
-        installed_tools = []
-        for target_repository in target_repositories:
-            repo_tools = tools_for_repository(self.gi, target_repository)
-            installed_tools.extend(repo_tools)
-
-        all_test_results = []
-        galaxy_interactor = self._get_interactor(test_user, test_user_api_key)
-        test_history = galaxy_interactor.new_history()
-
-        with ThreadPoolExecutor(max_workers=parallel_tests) as executor:
-            try:
-                for tool in installed_tools:
-                    self._test_tool(executor=executor,
-                                    tool=tool,
-                                    galaxy_interactor=galaxy_interactor,
-                                    test_history=test_history,
-                                    log=log,
-                                    tool_test_results=all_test_results,
-                                    tests_passed=tests_passed,
-                                    test_exceptions=test_exceptions,
-                                    )
-            finally:
-                # Always write report, even if test was cancelled.
-                try:
-                    executor.shutdown(wait=True)
-                except KeyboardInterrupt:
-                    executor._threads.clear()
-                    thread._threads_queues.clear()
-                n_passed = len(tests_passed)
-                n_failed = len(test_exceptions)
-                report_obj = {
-                    'version': '0.1',
-                    'suitename': 'Ephemeris tool tests targeting %s' % self.gi.base_url,
-                    'results': {
-                        'total': n_passed + n_failed,
-                        'errors': n_failed,
-                        'failures': 0,
-                        'skips': 0,
-                    },
-                    'tests': sorted(all_test_results, key=lambda el: el['id']),
-                }
-                with open(test_json, "w") as f:
-                    json.dump(report_obj, f)
-                if log:
-                    log.info("Report written to '%s'", os.path.abspath(test_json))
-                    log.info("Passed tool tests ({0}): {1}".format(
-                        n_passed,
-                        [t for t in tests_passed])
-                    )
-                    log.info("Failed tool tests ({0}): {1}".format(
-                        n_failed,
-                        [t[0] for t in test_exceptions])
-                    )
-                    log.info("Total tool test time: {0}".format(dt.datetime.now() - tool_test_start))
-
-    def _get_interactor(self, test_user, test_user_api_key):
-        if test_user_api_key is None:
-            whoami = self.gi.make_get_request(self.gi.url + "/whoami").json()
-            if whoami is not None:
-                test_user_api_key = self.gi.key
-        galaxy_interactor_kwds = {
-            "galaxy_url": re.sub('/api', '', self.gi.url),
-            "master_api_key": self.gi.key,
-            "api_key": test_user_api_key,  # TODO
-            "keep_outputs_dir": '',
-        }
-        if test_user_api_key is None:
-            galaxy_interactor_kwds["test_user"] = test_user
-        galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds)
-        return galaxy_interactor
-
-    @staticmethod
-    def _test_tool(executor,
-                   tool,
-                   galaxy_interactor,
-                   tool_test_results,
-                   tests_passed,
-                   test_exceptions,
-                   log,
-                   test_history=None,
-                   ):
-        if test_history is None:
-            test_history = galaxy_interactor.new_history()
-        tool_id = tool["id"]
-        tool_version = tool["version"]
-        try:
-            tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version)
-        except Exception as e:
-            if log:
-                log.warning("Fetching test definition for tool '%s' failed", tool_id, exc_info=True)
-            test_exceptions.append((tool_id, e))
-            Results = namedtuple("Results", ["tool_test_results", "tests_passed", "test_exceptions"])
-            return Results(tool_test_results=tool_test_results,
-                           tests_passed=tests_passed,
-                           test_exceptions=test_exceptions)
-        test_indices = list(range(len(tool_test_dicts)))
-
-        for test_index in test_indices:
-            test_id = tool_id + "-" + str(test_index)
-
-            def run_test(index, test_id):
-
-                def register(job_data):
-                    tool_test_results.append({
-                        'id': test_id,
-                        'has_data': True,
-                        'data': job_data,
-                    })
-
-                try:
-                    if log:
-                        log.info("Executing test '%s'", test_id)
-                    verify_tool(
-                        tool_id, galaxy_interactor, test_index=index, tool_version=tool_version,
-                        register_job_data=register, quiet=True, test_history=test_history,
-                    )
-                    tests_passed.append(test_id)
-                    if log:
-                        log.info("Test '%s' passed", test_id)
-                except Exception as e:
-                    if log:
-                        log.warning("Test '%s' failed", test_id, exc_info=True)
-                    test_exceptions.append((test_id, e))
-
-            executor.submit(run_test, test_index, test_id)
-
-    def install_repository_revision(self, repository, log):
-        default_err_msg = ('All repositories that you are attempting to install '
-                           'have been previously installed.')
-        start = dt.datetime.now()
-        try:
-            repository['new_tool_panel_section_label'] = repository.pop('tool_panel_section_label')
-            response = self.tool_shed_client.install_repository_revision(**repository)
-            if isinstance(response, dict) and response.get('status', None) == 'ok':
-                # This rare case happens if a repository is already installed but
-                # was not recognised as such in the above check. In such a
-                # case the return value looks like this:
-                # {u'status': u'ok', u'message': u'No repositories were
-                #  installed, possibly because the selected repository has
-                #  already been installed.'}
-                if log:
-                    log.debug("\tRepository {0} is already installed.".format(repository['name']))
-            if log:
-                log_repository_install_success(
-                    repository=repository,
-                    start=start,
-                    log=log)
-            return "installed"
-        except (ConnectionError, requests.exceptions.ConnectionError) as e:
-            if default_err_msg in unicodify(e):
-                # THIS SHOULD NOT HAPPEN DUE TO THE CHECKS EARLIER
-                if log:
-                    log.debug("\tRepository %s already installed (at revision %s)" %
-                              (repository['name'], repository['changeset_revision']))
-                return "skipped"
-            elif "504" in unicodify(e) or 'Connection aborted' in unicodify(e):
-                if log:
-                    log.debug("Timeout during install of %s, extending wait to 1h", repository['name'])
-                success = self.wait_for_install(repository=repository, log=log, timeout=3600)
-                if success:
-                    if log:
-                        log_repository_install_success(
-                            repository=repository,
-                            start=start,
-                            log=log)
-                    return "installed"
-                else:
-                    if log:
-                        log_repository_install_error(
-                            repository=repository,
-                            start=start, msg=e.body,
-                            log=log)
-                    return "error"
-            else:
-                if log:
-                    log_repository_install_error(
-                        repository=repository,
-                        start=start, msg=e.body,
-                        log=log)
-                return "error"
-
-    def wait_for_install(self, repository, log=None, timeout=3600):
-        """
-        If nginx times out, we look into the list of installed repositories
-        and try to determine if a repository of the same namer/owner is still installing.
-        Returns True if install finished successfully,
-        returns False when timeout is exceeded or installation has failed.
-        """
-        start = dt.datetime.now()
-        while (dt.datetime.now() - start) < dt.timedelta(seconds=timeout):
-            try:
-                installed_repo_list = self.tool_shed_client.get_repositories()
-                for installing_repo in installed_repo_list:
-                    if (installing_repo['name'] == repository['name']) and (
-                            installing_repo['owner'] == repository['owner']) and (
-                            installing_repo['changeset_revision'] == repository['changeset_revision']):
-                        if installing_repo['status'] == 'Installed':
-                            return True
-                        elif installing_repo['status'] == 'Error':
-                            return False
-                        else:
-                            time.sleep(10)
-            except ConnectionError as e:
-                if log:
-                    log.warning('Failed to get repositories list: %s', unicodify(e))
-                time.sleep(10)
-        return False
-
-
-def log_repository_install_error(repository, start, msg, log):
-    """
-    Log failed repository installations. Return a dictionary with information
-    """
-    end = dt.datetime.now()
-    log.error(
-        "\t* Error installing a repository (after %s seconds)! Name: %s," "owner: %s, ""revision: %s, error: %s",
-        str(end - start),
-        repository.get('name', ""),
-        repository.get('owner', ""),
-        repository.get('changeset_revision', ""),
-        msg)
-
-
-def log_repository_install_success(repository, start, log):
-    """
-    Log successful repository installation.
-    Repositories that finish in error still count as successful installs currently.
-    """
-    end = dt.datetime.now()
-    log.debug(
-        "\trepository %s installed successfully (in %s) at revision %s" % (
-            repository['name'],
-            str(end - start),
-            repository['changeset_revision']
-        )
-    )
-
-
-def log_repository_install_skip(repository, counter, total_num_repositories, log):
-    log.debug(
-        "({0}/{1}) repository {2} already installed at revision {3}. Skipping."
-        .format(
-            counter,
-            total_num_repositories,
-            repository['name'],
-            repository['changeset_revision']
-        )
-    )
-
-
-def log_repository_install_start(repository, counter, total_num_repositories, installation_start, log):
-    log.debug(
-        '(%s/%s) Installing repository %s from %s to section "%s" at revision %s (TRT: %s)' % (
-            counter, total_num_repositories,
-            repository['name'],
-            repository['owner'],
-            repository['tool_panel_section_id'] or repository['tool_panel_section_label'],
-            repository['changeset_revision'],
-            dt.datetime.now() - installation_start
-        )
-    )
-
-
-def args_to_repos(args):
-    if args.tool_list_file:
-        tool_list = load_yaml_file(args.tool_list_file)
-        repos = tool_list['tools']
-    elif args.tool_yaml:
-        repos = [yaml.safe_load(args.tool_yaml)]
-    elif args.name and args.owner:
-        repo = dict(
-            owner=args.owner,
-            name=args.name,
-            tool_panel_section_id=args.tool_panel_section_id,
-            tool_panel_section_label=args.tool_panel_section_label,
-            revisions=args.revisions
-        )
-        if args.tool_shed_url:
-            repo["tool_shed_url"] = args.tool_shed_url
-        repos = [repo]
-    else:
-        repos = []
-    return repos
-
-
-def main():
-    disable_external_library_logging()
-    args = parser().parse_args()
-    log = setup_global_logger(name=__name__, log_file=args.log_file)
-    gi = get_galaxy_connection(args, file=args.tool_list_file, log=log, login_required=True)
-    install_repository_manager = InstallRepositoryManager(gi)
-
-    repos = args_to_repos(args)
-
-    if args.tool_list_file:
-        tool_list = load_yaml_file(args.tool_list_file)
-    else:
-        tool_list = dict()
-
-    # Get some of the other installation arguments
-    kwargs = dict(
-        default_install_tool_dependencies=tool_list.get("install_tool_dependencies") or getattr(args,
-                                                                                                "install_tool_dependencies",
-                                                                                                False),
-        default_install_repository_dependencies=tool_list.get("install_repository_dependencies") or getattr(args,
-                                                                                                            "install_repository_dependencies",
-                                                                                                            False),
-        default_install_resolver_dependencies=tool_list.get("install_resolver_dependencies") or getattr(args,
-                                                                                                        "install_resolver_dependencies",
-                                                                                                        False))
-
-    # Start installing/updating and store the results in install_results.
-    # Or do testing if the action is `test`
-    install_results = None
-    if args.action == "update":
-        install_results = install_repository_manager.update_repositories(
-            repositories=repos,
-            log=log,
-            **kwargs)
-    elif args.action == "install":
-        install_results = install_repository_manager.install_repositories(
-            repos,
-            log=log,
-            force_latest_revision=args.force_latest_revision,
-            **kwargs)
-    elif args.action == "test":
-        install_repository_manager.test_tools(
-            test_json=args.test_json,
-            repositories=repos,
-            log=log,
-            test_user_api_key=args.test_user_api_key,
-            test_user=args.test_user,
-            parallel_tests=args.parallel_tests,
-        )
-    else:
-        raise NotImplementedError("This point in the code should not be reached. Please contact the developers.")
-
-    # Run tests on the install results if required.
-    if install_results and args.test or args.test_existing:
-        to_be_tested_repositories = install_results.installed_repositories
-        if args.test_existing:
-            to_be_tested_repositories.extend(install_results.skipped_repositories)
-        if to_be_tested_repositories:
-            install_repository_manager.test_tools(
-                test_json=args.test_json,
-                repositories=to_be_tested_repositories,
-                log=log,
-                test_user_api_key=args.test_user_api_key,
-                test_user=args.test_user,
-                parallel_tests=args.parallel_tests,
-            )
-
-
-if __name__ == "__main__":
-    main()