diff env/lib/python3.7/site-packages/gxformat2/interface.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/gxformat2/interface.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,82 @@
+"""This module contains an interface and implementation describing Galaxy interactions used by gxformat2.
+
+The interface is :class:`ImporterGalaxyInterface` and the default
+implementation based on `BioBlend <https://bioblend.readthedocs.io/>`__
+is :class:`BioBlendImporterGalaxyInterface`.
+"""
+import abc
+
+import bioblend
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
+class ImporterGalaxyInterface(object):
+    """An abstract interface describing Galaxy operations used by gxformat2.
+
+    Specifically containing definitions of operations required to load
+    workflows into Galaxy.
+    """
+
+    @abc.abstractmethod
+    def import_workflow(self, workflow, **kwds):
+        """Import a workflow via POST /api/workflows or comparable interface into Galaxy."""
+
+    def import_tool(self, tool):
+        """Import a new dynamically defined tool.
+
+        Not yet implemented in vanilla Galaxy - used only in the cwl branch of Galaxy.
+        """
+        raise NotImplementedError()
+
+
+class BioBlendImporterGalaxyInterface(object):
+    """Implementation of :class:`ImporterGalaxyInterface` using bioblend."""
+
+    def __init__(self, **kwds):
+        """Build a :class:`bioblend.GalaxyInstance` from supplied arguments."""
+        url = None
+
+        admin_key = None
+        admin_gi = None
+        if "admin_gi" in kwds:
+            admin_gi = kwds["admin_gi"]
+        elif "gi" in kwds:
+            admin_gi = kwds["gi"]
+        elif "url" in kwds and "admin_key" in kwds:
+            url = kwds["url"]
+            admin_key = kwds["admin_key"]
+
+        if admin_gi is None:
+            assert url is not None
+            assert admin_key is not None
+            admin_gi = bioblend.GalaxyInstance(url=url, key=admin_key)
+
+        user_key = None
+        user_gi = None
+        if "user_gi" in kwds:
+            user_gi = kwds["user_gi"]
+        elif "gi" in kwds:
+            user_gi = kwds["gi"]
+        elif "url" in kwds and "user_key" in kwds:
+            url = kwds["url"]
+            user_key = kwds["user_key"]
+
+        if user_gi is None:
+            assert url is not None
+            assert user_key is not None
+            user_gi = bioblend.GalaxyInstance(url=url, key=user_key)
+
+        self._admin_gi = admin_gi
+        self._user_gi = user_gi
+
+    def import_workflow(self, workflow, **kwds):
+        """Import Galaxy workflow using instance :class:`bioblend.GalaxyInstance` object."""
+        return self._user_gi.workflows.import_workflow_json(
+            workflow,
+            **kwds
+        )
+
+    def import_tool(self, tool_representation):
+        """Import Galaxy tool using instance :class:`bioblend.GalaxyInstance` object."""
+        raise NotImplementedError()