Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/planemo/engine/galaxy.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/planemo/engine/galaxy.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,137 +0,0 @@ -"""Module contianing the :class:`GalaxyEngine` implementation of :class:`Engine`.""" -from __future__ import absolute_import - -import abc -import contextlib - -from galaxy.tool_util.verify import interactor - -from planemo.galaxy.activity import execute -from planemo.galaxy.config import external_galaxy_config -from planemo.galaxy.serve import serve_daemon -from planemo.runnable import RunnableType -from .interface import BaseEngine - - -class GalaxyEngine(BaseEngine): - """An :class:`Engine` implementation backed by a managed Galaxy. - - More information on Galaxy can be found at http://galaxyproject.org/. - """ - - __metaclass__ = abc.ABCMeta - - handled_runnable_types = [ - RunnableType.cwl_tool, - RunnableType.cwl_workflow, - RunnableType.galaxy_workflow, - RunnableType.galaxy_tool, - RunnableType.galaxy_datamanager, - ] - - def _run(self, runnable, job_path): - """Run CWL job in Galaxy.""" - self._ctx.vlog("Serving artifact [%s] with Galaxy." % (runnable,)) - with self.ensure_runnables_served([runnable]) as config: - self._ctx.vlog("Running job path [%s]" % job_path) - run_response = execute(self._ctx, config, runnable, job_path, **self._kwds) - - return run_response - - @abc.abstractmethod - def ensure_runnables_served(self, runnables): - """Use a context manager and describe Galaxy instance with runnables being served.""" - - def _run_test_case(self, test_case): - if hasattr(test_case, "job_path"): - # Simple file-based job path. - return super(GalaxyEngine, self)._run_test_case(test_case) - else: - with self.ensure_runnables_served([test_case.runnable]) as config: - galaxy_interactor_kwds = { - "galaxy_url": config.galaxy_url, - "master_api_key": config.master_api_key, - "api_key": config.user_api_key, - "keep_outputs_dir": "", # TODO: this... - } - tool_id = test_case.tool_id - test_index = test_case.test_index - tool_version = test_case.tool_version - galaxy_interactor = interactor.GalaxyInteractorApi(**galaxy_interactor_kwds) - - test_results = [] - - def _register_job_data(job_data): - test_results.append({ - 'id': tool_id + "-" + str(test_index), - 'has_data': True, - 'data': job_data, - }) - - verbose = self._ctx.verbose - try: - if verbose: - # TODO: this is pretty hacky, it'd be better to send a stream - # and capture the output information somehow. - interactor.VERBOSE_GALAXY_ERRORS = True - - interactor.verify_tool( - tool_id, - galaxy_interactor, - test_index=test_index, - tool_version=tool_version, - register_job_data=_register_job_data, - quiet=not verbose, - ) - except Exception: - pass - - return test_results[0] - - -class LocalManagedGalaxyEngine(GalaxyEngine): - """An :class:`Engine` implementation backed by a managed Galaxy. - - More information on Galaxy can be found at http://galaxyproject.org/. - """ - - @contextlib.contextmanager - def ensure_runnables_served(self, runnables): - # TODO: define an interface for this - not everything in config would make sense for a - # pre-existing Galaxy interface. - with serve_daemon(self._ctx, runnables, **self._serve_kwds()) as config: - yield config - - def _serve_kwds(self): - return self._kwds.copy() - - -class DockerizedManagedGalaxyEngine(LocalManagedGalaxyEngine): - """An :class:`Engine` implementation backed by Galaxy running in Docker. - - More information on Galaxy can be found at http://galaxyproject.org/. - """ - - def _serve_kwds(self): - serve_kwds = self._kwds.copy() - serve_kwds["dockerize"] = True - return serve_kwds - - -class ExternalGalaxyEngine(GalaxyEngine): - """An :class:`Engine` implementation backed by an external Galaxy instance. - """ - - @contextlib.contextmanager - def ensure_runnables_served(self, runnables): - # TODO: ensure tools are available - with external_galaxy_config(self._ctx, runnables, **self._kwds) as config: - config.install_workflows() - yield config - - -__all__ = ( - "DockerizedManagedGalaxyEngine", - "ExternalGalaxyEngine", - "LocalManagedGalaxyEngine", -)