diff env/lib/python3.7/site-packages/planemo/engine/galaxy.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/planemo/engine/galaxy.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,137 @@
+"""Module contianing the :class:`GalaxyEngine` implementation of :class:`Engine`."""
+from __future__ import absolute_import
+
+import abc
+import contextlib
+
+from galaxy.tool_util.verify import interactor
+
+from planemo.galaxy.activity import execute
+from planemo.galaxy.config import external_galaxy_config
+from planemo.galaxy.serve import serve_daemon
+from planemo.runnable import RunnableType
+from .interface import BaseEngine
+
+
+class GalaxyEngine(BaseEngine):
+    """An :class:`Engine` implementation backed by a managed Galaxy.
+
+    More information on Galaxy can be found at http://galaxyproject.org/.
+    """
+
+    __metaclass__ = abc.ABCMeta
+
+    handled_runnable_types = [
+        RunnableType.cwl_tool,
+        RunnableType.cwl_workflow,
+        RunnableType.galaxy_workflow,
+        RunnableType.galaxy_tool,
+        RunnableType.galaxy_datamanager,
+    ]
+
+    def _run(self, runnable, job_path):
+        """Run CWL job in Galaxy."""
+        self._ctx.vlog("Serving artifact [%s] with Galaxy." % (runnable,))
+        with self.ensure_runnables_served([runnable]) as config:
+            self._ctx.vlog("Running job path [%s]" % job_path)
+            run_response = execute(self._ctx, config, runnable, job_path, **self._kwds)
+
+        return run_response
+
+    @abc.abstractmethod
+    def ensure_runnables_served(self, runnables):
+        """Use a context manager and describe Galaxy instance with runnables being served."""
+
+    def _run_test_case(self, test_case):
+        if hasattr(test_case, "job_path"):
+            # Simple file-based job path.
+            return super(GalaxyEngine, self)._run_test_case(test_case)
+        else:
+            with self.ensure_runnables_served([test_case.runnable]) as config:
+                galaxy_interactor_kwds = {
+                    "galaxy_url": config.galaxy_url,
+                    "master_api_key": config.master_api_key,
+                    "api_key": config.user_api_key,
+                    "keep_outputs_dir": "",  # TODO: this...
+                }
+                tool_id = test_case.tool_id
+                test_index = test_case.test_index
+                tool_version = test_case.tool_version
+                galaxy_interactor = interactor.GalaxyInteractorApi(**galaxy_interactor_kwds)
+
+                test_results = []
+
+                def _register_job_data(job_data):
+                    test_results.append({
+                        'id': tool_id + "-" + str(test_index),
+                        'has_data': True,
+                        'data': job_data,
+                    })
+
+                verbose = self._ctx.verbose
+                try:
+                    if verbose:
+                        # TODO: this is pretty hacky, it'd be better to send a stream
+                        # and capture the output information somehow.
+                        interactor.VERBOSE_GALAXY_ERRORS = True
+
+                    interactor.verify_tool(
+                        tool_id,
+                        galaxy_interactor,
+                        test_index=test_index,
+                        tool_version=tool_version,
+                        register_job_data=_register_job_data,
+                        quiet=not verbose,
+                    )
+                except Exception:
+                    pass
+
+                return test_results[0]
+
+
+class LocalManagedGalaxyEngine(GalaxyEngine):
+    """An :class:`Engine` implementation backed by a managed Galaxy.
+
+    More information on Galaxy can be found at http://galaxyproject.org/.
+    """
+
+    @contextlib.contextmanager
+    def ensure_runnables_served(self, runnables):
+        # TODO: define an interface for this - not everything in config would make sense for a
+        # pre-existing Galaxy interface.
+        with serve_daemon(self._ctx, runnables, **self._serve_kwds()) as config:
+            yield config
+
+    def _serve_kwds(self):
+        return self._kwds.copy()
+
+
+class DockerizedManagedGalaxyEngine(LocalManagedGalaxyEngine):
+    """An :class:`Engine` implementation backed by Galaxy running in Docker.
+
+    More information on Galaxy can be found at http://galaxyproject.org/.
+    """
+
+    def _serve_kwds(self):
+        serve_kwds = self._kwds.copy()
+        serve_kwds["dockerize"] = True
+        return serve_kwds
+
+
+class ExternalGalaxyEngine(GalaxyEngine):
+    """An :class:`Engine` implementation backed by an external Galaxy instance.
+    """
+
+    @contextlib.contextmanager
+    def ensure_runnables_served(self, runnables):
+        # TODO: ensure tools are available
+        with external_galaxy_config(self._ctx, runnables, **self._kwds) as config:
+            config.install_workflows()
+            yield config
+
+
+__all__ = (
+    "DockerizedManagedGalaxyEngine",
+    "ExternalGalaxyEngine",
+    "LocalManagedGalaxyEngine",
+)