comparison env/lib/python3.7/site-packages/gxformat2/interface.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """This module contains an interface and implementation describing Galaxy interactions used by gxformat2.
2
3 The interface is :class:`ImporterGalaxyInterface` and the default
4 implementation based on `BioBlend <https://bioblend.readthedocs.io/>`__
5 is :class:`BioBlendImporterGalaxyInterface`.
6 """
7 import abc
8
9 import bioblend
10 import six
11
12
13 @six.add_metaclass(abc.ABCMeta)
14 class ImporterGalaxyInterface(object):
15 """An abstract interface describing Galaxy operations used by gxformat2.
16
17 Specifically containing definitions of operations required to load
18 workflows into Galaxy.
19 """
20
21 @abc.abstractmethod
22 def import_workflow(self, workflow, **kwds):
23 """Import a workflow via POST /api/workflows or comparable interface into Galaxy."""
24
25 def import_tool(self, tool):
26 """Import a new dynamically defined tool.
27
28 Not yet implemented in vanilla Galaxy - used only in the cwl branch of Galaxy.
29 """
30 raise NotImplementedError()
31
32
33 class BioBlendImporterGalaxyInterface(object):
34 """Implementation of :class:`ImporterGalaxyInterface` using bioblend."""
35
36 def __init__(self, **kwds):
37 """Build a :class:`bioblend.GalaxyInstance` from supplied arguments."""
38 url = None
39
40 admin_key = None
41 admin_gi = None
42 if "admin_gi" in kwds:
43 admin_gi = kwds["admin_gi"]
44 elif "gi" in kwds:
45 admin_gi = kwds["gi"]
46 elif "url" in kwds and "admin_key" in kwds:
47 url = kwds["url"]
48 admin_key = kwds["admin_key"]
49
50 if admin_gi is None:
51 assert url is not None
52 assert admin_key is not None
53 admin_gi = bioblend.GalaxyInstance(url=url, key=admin_key)
54
55 user_key = None
56 user_gi = None
57 if "user_gi" in kwds:
58 user_gi = kwds["user_gi"]
59 elif "gi" in kwds:
60 user_gi = kwds["gi"]
61 elif "url" in kwds and "user_key" in kwds:
62 url = kwds["url"]
63 user_key = kwds["user_key"]
64
65 if user_gi is None:
66 assert url is not None
67 assert user_key is not None
68 user_gi = bioblend.GalaxyInstance(url=url, key=user_key)
69
70 self._admin_gi = admin_gi
71 self._user_gi = user_gi
72
73 def import_workflow(self, workflow, **kwds):
74 """Import Galaxy workflow using instance :class:`bioblend.GalaxyInstance` object."""
75 return self._user_gi.workflows.import_workflow_json(
76 workflow,
77 **kwds
78 )
79
80 def import_tool(self, tool_representation):
81 """Import Galaxy tool using instance :class:`bioblend.GalaxyInstance` object."""
82 raise NotImplementedError()