comparison env/lib/python3.7/site-packages/ephemeris/shed_tools.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """
2 A tool to automate installation of tool repositories from a Galaxy Tool Shed
3 into an instance of Galaxy.
4
5 Shed-tools has three commands: update, test and install.
6
7 Update simply updates all the tools in a Galaxy given connection details on the command line.
8
9 Test tests the specified tools in the Galaxy Instance.
10
11 Install allows installation of tools in multiple ways.
12 Galaxy instance details and the installed tools can be provided in one of three
13 ways:
14
15 1. In the YAML format via dedicated files (a sample can be found
16 `here <https://github.com/galaxyproject/ansible-galaxy-tools/blob/master/files/tool_list.yaml.sample>`_).
17 2. On the command line as dedicated script options (see the usage help).
18 3. As a single composite parameter to the script. The parameter must be a
19 single, YAML-formatted string with the keys corresponding to the keys
20 available for use in the YAML formatted file (for example:
21 `--yaml_tool "{'owner': 'kellrott', 'tool_shed_url':
22 'https://testtoolshed.g2.bx.psu.edu', 'tool_panel_section_id':
23 'peak_calling', 'name': 'synapse_interface'}"`).
24
25 Only one of the methods can be used with each invocation of the script but if
26 more than one are provided are provided, precedence will correspond to order
27 of the items in the list above.
28 When installing tools, Galaxy expects any `tool_panel_section_id` provided when
29 installing a tool to already exist in the configuration. If the section
30 does not exist, the tool will be installed outside any section. See
31 `shed_tool_conf.xml.sample` in this directory for a sample of such file. Before
32 running this script to install the tools, make sure to place such file into
33 Galaxy's configuration directory and set Galaxy configuration option
34 `tool_config_file` to include it.
35 """
36 import datetime as dt
37 import json
38 import os
39 import re
40 import time
41 from collections import namedtuple
42 from concurrent.futures import thread, ThreadPoolExecutor
43
44 import requests
45 import yaml
46 from bioblend.galaxy.client import ConnectionError
47 from bioblend.galaxy.toolshed import ToolShedClient
48 from galaxy.tool_util.verify.interactor import (
49 GalaxyInteractorApi,
50 verify_tool,
51 )
52 from galaxy.util import unicodify
53
54 from . import get_galaxy_connection, load_yaml_file
55 from .ephemeris_log import disable_external_library_logging, setup_global_logger
56 from .get_tool_list_from_galaxy import GiToToolYaml, the_same_repository, tools_for_repository
57 from .shed_tools_args import parser
58 from .shed_tools_methods import complete_repo_information, flatten_repo_info, VALID_KEYS
59
60
61 class InstallRepositoryManager(object):
62 """Manages the installation of new repositories on a galaxy instance"""
63
64 def __init__(self,
65 galaxy_instance):
66 """Initialize a new tool manager"""
67 self.gi = galaxy_instance
68 self.tool_shed_client = ToolShedClient(self.gi)
69
70 def installed_repositories(self):
71 """Get currently installed tools"""
72 return GiToToolYaml(
73 gi=self.gi,
74 skip_tool_panel_section_name=False,
75 get_data_managers=True,
76 get_all_tools=True
77 ).tool_list.get("tools")
78
79 def filter_installed_repos(self, repos, check_revision=True):
80 # TODO: Find a speedier algorithm.
81 """This filters a list of repositories"""
82 not_installed_repos = []
83 already_installed_repos = []
84 if check_revision:
85 # If we want to check if revisions are equal, flatten the list,
86 # so each repository - revision combination has its own entry
87 installed_repos = flatten_repo_info(self.installed_repositories())
88 else:
89 # If we do not care about revision equality, do not do the flatten
90 # action to limit the number of comparisons.
91 installed_repos = self.installed_repositories()
92
93 for repo in repos:
94 for installed_repo in installed_repos:
95 if the_same_repository(installed_repo, repo, check_revision):
96 already_installed_repos.append(repo)
97 break
98 else: # This executes when the for loop completes and no match has been found.
99 not_installed_repos.append(repo)
100 FilterResults = namedtuple("FilterResults", ["not_installed_repos", "already_installed_repos"])
101 return FilterResults(already_installed_repos=already_installed_repos, not_installed_repos=not_installed_repos)
102
103 def install_repositories(self,
104 repositories,
105 log=None,
106 force_latest_revision=False,
107 default_toolshed='https://toolshed.g2.bx.psu.edu/',
108 default_install_tool_dependencies=False,
109 default_install_resolver_dependencies=True,
110 default_install_repository_dependencies=True):
111 """Install a list of tools on the current galaxy"""
112 if not repositories:
113 raise ValueError("Empty list of tools was given")
114 installation_start = dt.datetime.now()
115 installed_repositories = []
116 skipped_repositories = []
117 errored_repositories = []
118 counter = 0
119
120 # Check repos for invalid keys
121 for repo in repositories:
122 for key in repo.keys():
123 if key not in VALID_KEYS and key != 'revisions':
124 if log:
125 log.warning("'{0}' not a valid key. Will be skipped during parsing".format(key))
126
127 # Start by flattening the repo list per revision
128 flattened_repos = flatten_repo_info(repositories)
129 total_num_repositories = len(flattened_repos)
130
131 # Complete the repo information, and make sure each repository has a revision
132 repository_list = []
133 for repository in flattened_repos:
134 start = dt.datetime.now()
135 try:
136 complete_repo = complete_repo_information(
137 repository,
138 default_toolshed_url=default_toolshed,
139 require_tool_panel_info=True,
140 default_install_tool_dependencies=default_install_tool_dependencies,
141 default_install_resolver_dependencies=default_install_resolver_dependencies,
142 default_install_repository_dependencies=default_install_repository_dependencies,
143 force_latest_revision=force_latest_revision)
144 repository_list.append(complete_repo)
145 except Exception as e:
146 # We'll run through the loop come whatever may, we log the errored repositories at the end anyway.
147 if log:
148 log_repository_install_error(repository, start, unicodify(e), log)
149 errored_repositories.append(repository)
150
151 # Filter out already installed repos
152 filtered_repos = self.filter_installed_repos(repository_list)
153
154 for skipped_repo in filtered_repos.already_installed_repos:
155 counter += 1
156 if log:
157 log_repository_install_skip(skipped_repo, counter, total_num_repositories, log)
158 skipped_repositories.append(skipped_repo)
159
160 # Install repos
161 for repository in filtered_repos.not_installed_repos:
162 counter += 1
163 if log:
164 log_repository_install_start(repository, counter=counter, installation_start=installation_start, log=log,
165 total_num_repositories=total_num_repositories)
166 result = self.install_repository_revision(repository, log)
167 if result == "error":
168 errored_repositories.append(repository)
169 elif result == "skipped":
170 skipped_repositories.append(repository)
171 elif result == "installed":
172 installed_repositories.append(repository)
173
174 # Log results
175 if log:
176 log.info("Installed repositories ({0}): {1}".format(
177 len(installed_repositories),
178 [(
179 t['name'],
180 t.get('changeset_revision')
181 ) for t in installed_repositories])
182 )
183 log.info("Skipped repositories ({0}): {1}".format(
184 len(skipped_repositories),
185 [(
186 t['name'],
187 t.get('changeset_revision')
188 ) for t in skipped_repositories])
189 )
190 log.info("Errored repositories ({0}): {1}".format(
191 len(errored_repositories),
192 [(
193 t['name'],
194 t.get('changeset_revision', "")
195 ) for t in errored_repositories])
196 )
197 log.info("All repositories have been installed.")
198 log.info("Total run time: {0}".format(dt.datetime.now() - installation_start))
199 InstallResults = namedtuple("InstallResults",
200 ["installed_repositories", "errored_repositories", "skipped_repositories"])
201 return InstallResults(installed_repositories=installed_repositories,
202 skipped_repositories=skipped_repositories,
203 errored_repositories=errored_repositories)
204
205 def update_repositories(self, repositories=None, log=None, **kwargs):
206 if not repositories: # Repositories None or empty list
207 repositories = self.installed_repositories()
208 else:
209 filtered_repos = self.filter_installed_repos(repositories, check_revision=False)
210 if filtered_repos.not_installed_repos:
211 if log:
212 log.warning("The following tools are not installed and will not be upgraded: {0}".format(
213 filtered_repos.not_installed_repos))
214 repositories = filtered_repos.already_installed_repos
215 return self.install_repositories(repositories, force_latest_revision=True, log=log, **kwargs)
216
217 def test_tools(self,
218 test_json,
219 repositories=None,
220 log=None,
221 test_user_api_key=None,
222 test_user="ephemeris@galaxyproject.org",
223 parallel_tests=1,
224 ):
225 """Run tool tests for all tools in each repository in supplied tool list or ``self.installed_repositories()``.
226 """
227 tool_test_start = dt.datetime.now()
228 tests_passed = []
229 test_exceptions = []
230
231 if not repositories: # If repositories is None or empty list
232 # Consider a variant of this that doesn't even consume a tool list YAML? target
233 # something like installed_repository_revisions(self.gi)
234 repositories = self.installed_repositories()
235
236 target_repositories = flatten_repo_info(repositories)
237
238 installed_tools = []
239 for target_repository in target_repositories:
240 repo_tools = tools_for_repository(self.gi, target_repository)
241 installed_tools.extend(repo_tools)
242
243 all_test_results = []
244 galaxy_interactor = self._get_interactor(test_user, test_user_api_key)
245 test_history = galaxy_interactor.new_history()
246
247 with ThreadPoolExecutor(max_workers=parallel_tests) as executor:
248 try:
249 for tool in installed_tools:
250 self._test_tool(executor=executor,
251 tool=tool,
252 galaxy_interactor=galaxy_interactor,
253 test_history=test_history,
254 log=log,
255 tool_test_results=all_test_results,
256 tests_passed=tests_passed,
257 test_exceptions=test_exceptions,
258 )
259 finally:
260 # Always write report, even if test was cancelled.
261 try:
262 executor.shutdown(wait=True)
263 except KeyboardInterrupt:
264 executor._threads.clear()
265 thread._threads_queues.clear()
266 n_passed = len(tests_passed)
267 n_failed = len(test_exceptions)
268 report_obj = {
269 'version': '0.1',
270 'suitename': 'Ephemeris tool tests targeting %s' % self.gi.base_url,
271 'results': {
272 'total': n_passed + n_failed,
273 'errors': n_failed,
274 'failures': 0,
275 'skips': 0,
276 },
277 'tests': sorted(all_test_results, key=lambda el: el['id']),
278 }
279 with open(test_json, "w") as f:
280 json.dump(report_obj, f)
281 if log:
282 log.info("Report written to '%s'", os.path.abspath(test_json))
283 log.info("Passed tool tests ({0}): {1}".format(
284 n_passed,
285 [t for t in tests_passed])
286 )
287 log.info("Failed tool tests ({0}): {1}".format(
288 n_failed,
289 [t[0] for t in test_exceptions])
290 )
291 log.info("Total tool test time: {0}".format(dt.datetime.now() - tool_test_start))
292
293 def _get_interactor(self, test_user, test_user_api_key):
294 if test_user_api_key is None:
295 whoami = self.gi.make_get_request(self.gi.url + "/whoami").json()
296 if whoami is not None:
297 test_user_api_key = self.gi.key
298 galaxy_interactor_kwds = {
299 "galaxy_url": re.sub('/api', '', self.gi.url),
300 "master_api_key": self.gi.key,
301 "api_key": test_user_api_key, # TODO
302 "keep_outputs_dir": '',
303 }
304 if test_user_api_key is None:
305 galaxy_interactor_kwds["test_user"] = test_user
306 galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds)
307 return galaxy_interactor
308
309 @staticmethod
310 def _test_tool(executor,
311 tool,
312 galaxy_interactor,
313 tool_test_results,
314 tests_passed,
315 test_exceptions,
316 log,
317 test_history=None,
318 ):
319 if test_history is None:
320 test_history = galaxy_interactor.new_history()
321 tool_id = tool["id"]
322 tool_version = tool["version"]
323 try:
324 tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version)
325 except Exception as e:
326 if log:
327 log.warning("Fetching test definition for tool '%s' failed", tool_id, exc_info=True)
328 test_exceptions.append((tool_id, e))
329 Results = namedtuple("Results", ["tool_test_results", "tests_passed", "test_exceptions"])
330 return Results(tool_test_results=tool_test_results,
331 tests_passed=tests_passed,
332 test_exceptions=test_exceptions)
333 test_indices = list(range(len(tool_test_dicts)))
334
335 for test_index in test_indices:
336 test_id = tool_id + "-" + str(test_index)
337
338 def run_test(index, test_id):
339
340 def register(job_data):
341 tool_test_results.append({
342 'id': test_id,
343 'has_data': True,
344 'data': job_data,
345 })
346
347 try:
348 if log:
349 log.info("Executing test '%s'", test_id)
350 verify_tool(
351 tool_id, galaxy_interactor, test_index=index, tool_version=tool_version,
352 register_job_data=register, quiet=True, test_history=test_history,
353 )
354 tests_passed.append(test_id)
355 if log:
356 log.info("Test '%s' passed", test_id)
357 except Exception as e:
358 if log:
359 log.warning("Test '%s' failed", test_id, exc_info=True)
360 test_exceptions.append((test_id, e))
361
362 executor.submit(run_test, test_index, test_id)
363
364 def install_repository_revision(self, repository, log):
365 default_err_msg = ('All repositories that you are attempting to install '
366 'have been previously installed.')
367 start = dt.datetime.now()
368 try:
369 repository['new_tool_panel_section_label'] = repository.pop('tool_panel_section_label')
370 response = self.tool_shed_client.install_repository_revision(**repository)
371 if isinstance(response, dict) and response.get('status', None) == 'ok':
372 # This rare case happens if a repository is already installed but
373 # was not recognised as such in the above check. In such a
374 # case the return value looks like this:
375 # {u'status': u'ok', u'message': u'No repositories were
376 # installed, possibly because the selected repository has
377 # already been installed.'}
378 if log:
379 log.debug("\tRepository {0} is already installed.".format(repository['name']))
380 if log:
381 log_repository_install_success(
382 repository=repository,
383 start=start,
384 log=log)
385 return "installed"
386 except (ConnectionError, requests.exceptions.ConnectionError) as e:
387 if default_err_msg in unicodify(e):
388 # THIS SHOULD NOT HAPPEN DUE TO THE CHECKS EARLIER
389 if log:
390 log.debug("\tRepository %s already installed (at revision %s)" %
391 (repository['name'], repository['changeset_revision']))
392 return "skipped"
393 elif "504" in unicodify(e) or 'Connection aborted' in unicodify(e):
394 if log:
395 log.debug("Timeout during install of %s, extending wait to 1h", repository['name'])
396 success = self.wait_for_install(repository=repository, log=log, timeout=3600)
397 if success:
398 if log:
399 log_repository_install_success(
400 repository=repository,
401 start=start,
402 log=log)
403 return "installed"
404 else:
405 if log:
406 log_repository_install_error(
407 repository=repository,
408 start=start, msg=e.body,
409 log=log)
410 return "error"
411 else:
412 if log:
413 log_repository_install_error(
414 repository=repository,
415 start=start, msg=e.body,
416 log=log)
417 return "error"
418
419 def wait_for_install(self, repository, log=None, timeout=3600):
420 """
421 If nginx times out, we look into the list of installed repositories
422 and try to determine if a repository of the same namer/owner is still installing.
423 Returns True if install finished successfully,
424 returns False when timeout is exceeded or installation has failed.
425 """
426 start = dt.datetime.now()
427 while (dt.datetime.now() - start) < dt.timedelta(seconds=timeout):
428 try:
429 installed_repo_list = self.tool_shed_client.get_repositories()
430 for installing_repo in installed_repo_list:
431 if (installing_repo['name'] == repository['name']) and (
432 installing_repo['owner'] == repository['owner']) and (
433 installing_repo['changeset_revision'] == repository['changeset_revision']):
434 if installing_repo['status'] == 'Installed':
435 return True
436 elif installing_repo['status'] == 'Error':
437 return False
438 else:
439 time.sleep(10)
440 except ConnectionError as e:
441 if log:
442 log.warning('Failed to get repositories list: %s', unicodify(e))
443 time.sleep(10)
444 return False
445
446
447 def log_repository_install_error(repository, start, msg, log):
448 """
449 Log failed repository installations. Return a dictionary with information
450 """
451 end = dt.datetime.now()
452 log.error(
453 "\t* Error installing a repository (after %s seconds)! Name: %s," "owner: %s, ""revision: %s, error: %s",
454 str(end - start),
455 repository.get('name', ""),
456 repository.get('owner', ""),
457 repository.get('changeset_revision', ""),
458 msg)
459
460
461 def log_repository_install_success(repository, start, log):
462 """
463 Log successful repository installation.
464 Repositories that finish in error still count as successful installs currently.
465 """
466 end = dt.datetime.now()
467 log.debug(
468 "\trepository %s installed successfully (in %s) at revision %s" % (
469 repository['name'],
470 str(end - start),
471 repository['changeset_revision']
472 )
473 )
474
475
476 def log_repository_install_skip(repository, counter, total_num_repositories, log):
477 log.debug(
478 "({0}/{1}) repository {2} already installed at revision {3}. Skipping."
479 .format(
480 counter,
481 total_num_repositories,
482 repository['name'],
483 repository['changeset_revision']
484 )
485 )
486
487
488 def log_repository_install_start(repository, counter, total_num_repositories, installation_start, log):
489 log.debug(
490 '(%s/%s) Installing repository %s from %s to section "%s" at revision %s (TRT: %s)' % (
491 counter, total_num_repositories,
492 repository['name'],
493 repository['owner'],
494 repository['tool_panel_section_id'] or repository['tool_panel_section_label'],
495 repository['changeset_revision'],
496 dt.datetime.now() - installation_start
497 )
498 )
499
500
501 def args_to_repos(args):
502 if args.tool_list_file:
503 tool_list = load_yaml_file(args.tool_list_file)
504 repos = tool_list['tools']
505 elif args.tool_yaml:
506 repos = [yaml.safe_load(args.tool_yaml)]
507 elif args.name and args.owner:
508 repo = dict(
509 owner=args.owner,
510 name=args.name,
511 tool_panel_section_id=args.tool_panel_section_id,
512 tool_panel_section_label=args.tool_panel_section_label,
513 revisions=args.revisions
514 )
515 if args.tool_shed_url:
516 repo["tool_shed_url"] = args.tool_shed_url
517 repos = [repo]
518 else:
519 repos = []
520 return repos
521
522
523 def main():
524 disable_external_library_logging()
525 args = parser().parse_args()
526 log = setup_global_logger(name=__name__, log_file=args.log_file)
527 gi = get_galaxy_connection(args, file=args.tool_list_file, log=log, login_required=True)
528 install_repository_manager = InstallRepositoryManager(gi)
529
530 repos = args_to_repos(args)
531
532 if args.tool_list_file:
533 tool_list = load_yaml_file(args.tool_list_file)
534 else:
535 tool_list = dict()
536
537 # Get some of the other installation arguments
538 kwargs = dict(
539 default_install_tool_dependencies=tool_list.get("install_tool_dependencies") or getattr(args,
540 "install_tool_dependencies",
541 False),
542 default_install_repository_dependencies=tool_list.get("install_repository_dependencies") or getattr(args,
543 "install_repository_dependencies",
544 False),
545 default_install_resolver_dependencies=tool_list.get("install_resolver_dependencies") or getattr(args,
546 "install_resolver_dependencies",
547 False))
548
549 # Start installing/updating and store the results in install_results.
550 # Or do testing if the action is `test`
551 install_results = None
552 if args.action == "update":
553 install_results = install_repository_manager.update_repositories(
554 repositories=repos,
555 log=log,
556 **kwargs)
557 elif args.action == "install":
558 install_results = install_repository_manager.install_repositories(
559 repos,
560 log=log,
561 force_latest_revision=args.force_latest_revision,
562 **kwargs)
563 elif args.action == "test":
564 install_repository_manager.test_tools(
565 test_json=args.test_json,
566 repositories=repos,
567 log=log,
568 test_user_api_key=args.test_user_api_key,
569 test_user=args.test_user,
570 parallel_tests=args.parallel_tests,
571 )
572 else:
573 raise NotImplementedError("This point in the code should not be reached. Please contact the developers.")
574
575 # Run tests on the install results if required.
576 if install_results and args.test or args.test_existing:
577 to_be_tested_repositories = install_results.installed_repositories
578 if args.test_existing:
579 to_be_tested_repositories.extend(install_results.skipped_repositories)
580 if to_be_tested_repositories:
581 install_repository_manager.test_tools(
582 test_json=args.test_json,
583 repositories=to_be_tested_repositories,
584 log=log,
585 test_user_api_key=args.test_user_api_key,
586 test_user=args.test_user,
587 parallel_tests=args.parallel_tests,
588 )
589
590
591 if __name__ == "__main__":
592 main()