Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyObjects.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 # pylint: disable=C0103,E1101 | |
| 2 import json | |
| 3 import os | |
| 4 import shutil | |
| 5 import socket | |
| 6 import sys | |
| 7 import tarfile | |
| 8 import tempfile | |
| 9 import uuid | |
| 10 from ssl import SSLError | |
| 11 from urllib.error import URLError | |
| 12 from urllib.request import urlopen | |
| 13 | |
| 14 import bioblend | |
| 15 import bioblend.galaxy.objects.galaxy_instance as galaxy_instance | |
| 16 import bioblend.galaxy.objects.wrappers as wrappers | |
| 17 from bioblend.galaxy import dataset_collections | |
| 18 from . import test_util | |
| 19 from .test_util import unittest | |
| 20 | |
| 21 bioblend.set_stream_logger('test', level='INFO') | |
| 22 socket.setdefaulttimeout(10.0) | |
| 23 SAMPLE_FN = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 24 SAMPLE_WF_COLL_FN = test_util.get_abspath(os.path.join('data', 'paste_columns_collections.ga')) | |
| 25 SAMPLE_WF_PARAMETER_INPUT_FN = test_util.get_abspath(os.path.join('data', 'workflow_with_parameter_input.ga')) | |
| 26 FOO_DATA = 'foo\nbar\n' | |
| 27 FOO_DATA_2 = 'foo2\nbar2\n' | |
| 28 SAMPLE_WF_DICT = { | |
| 29 'deleted': False, | |
| 30 'id': '9005c5112febe774', | |
| 31 'inputs': { | |
| 32 '571': {'label': 'Input Dataset', 'value': ''}, | |
| 33 '572': {'label': 'Input Dataset', 'value': ''}, | |
| 34 }, | |
| 35 'model_class': 'StoredWorkflow', | |
| 36 'name': 'paste_columns', | |
| 37 'published': False, | |
| 38 'steps': { | |
| 39 '571': { | |
| 40 'id': 571, | |
| 41 'input_steps': {}, | |
| 42 'tool_id': None, | |
| 43 'tool_inputs': {'name': 'Input Dataset'}, | |
| 44 'tool_version': None, | |
| 45 'type': 'data_input', | |
| 46 }, | |
| 47 '572': { | |
| 48 'id': 572, | |
| 49 'input_steps': {}, | |
| 50 'tool_id': None, | |
| 51 'tool_inputs': {'name': 'Input Dataset'}, | |
| 52 'tool_version': None, | |
| 53 'type': 'data_input', | |
| 54 }, | |
| 55 '573': { | |
| 56 'id': 573, | |
| 57 'input_steps': { | |
| 58 'input1': {'source_step': 571, 'step_output': 'output'}, | |
| 59 'input2': {'source_step': 572, 'step_output': 'output'}, | |
| 60 }, | |
| 61 'tool_id': 'Paste1', | |
| 62 'tool_inputs': { | |
| 63 'delimiter': '"T"', | |
| 64 'input1': 'null', | |
| 65 'input2': 'null', | |
| 66 }, | |
| 67 'tool_version': '1.0.0', | |
| 68 'type': 'tool', | |
| 69 } | |
| 70 }, | |
| 71 'tags': [], | |
| 72 'url': '/api/workflows/9005c5112febe774', | |
| 73 } | |
| 74 | |
| 75 | |
| 76 def is_reachable(url): | |
| 77 res = None | |
| 78 try: | |
| 79 res = urlopen(url, timeout=5) | |
| 80 except (SSLError, URLError, socket.timeout): | |
| 81 return False | |
| 82 if res is not None: | |
| 83 res.close() | |
| 84 return True | |
| 85 | |
| 86 | |
| 87 def upload_from_fs(lib, bnames, **kwargs): | |
| 88 tempdir = tempfile.mkdtemp(prefix='bioblend_test_') | |
| 89 try: | |
| 90 fnames = [os.path.join(tempdir, _) for _ in bnames] | |
| 91 for fn in fnames: | |
| 92 with open(fn, 'w') as f: | |
| 93 f.write(FOO_DATA) | |
| 94 dss = lib.upload_from_galaxy_fs(fnames, **kwargs) | |
| 95 finally: | |
| 96 shutil.rmtree(tempdir) | |
| 97 return dss, fnames | |
| 98 | |
| 99 | |
| 100 class MockWrapper(wrappers.Wrapper): | |
| 101 BASE_ATTRS = frozenset(['a', 'b']) | |
| 102 | |
| 103 def __init__(self, *args, **kwargs): | |
| 104 super().__init__(*args, **kwargs) | |
| 105 | |
| 106 @property | |
| 107 def gi_module(self): | |
| 108 return super().gi_module() | |
| 109 | |
| 110 | |
| 111 class TestWrapper(unittest.TestCase): | |
| 112 | |
| 113 def setUp(self): | |
| 114 self.d = {'a': 1, 'b': [2, 3], 'c': {'x': 4}} | |
| 115 self.assertRaises(TypeError, wrappers.Wrapper, self.d) | |
| 116 self.w = MockWrapper(self.d) | |
| 117 | |
| 118 def test_initialize(self): | |
| 119 for k in MockWrapper.BASE_ATTRS: | |
| 120 self.assertEqual(getattr(self.w, k), self.d[k]) | |
| 121 self.w.a = 222 | |
| 122 self.w.b[0] = 222 | |
| 123 self.assertEqual(self.w.a, 222) | |
| 124 self.assertEqual(self.w.b[0], 222) | |
| 125 self.assertEqual(self.d['a'], 1) | |
| 126 self.assertEqual(self.d['b'][0], 2) | |
| 127 self.assertRaises(AttributeError, getattr, self.w, 'foo') | |
| 128 self.assertRaises(AttributeError, setattr, self.w, 'foo', 0) | |
| 129 | |
| 130 def test_taint(self): | |
| 131 self.assertFalse(self.w.is_modified) | |
| 132 self.w.a = 111 # pylint: disable=W0201 | |
| 133 self.assertTrue(self.w.is_modified) | |
| 134 | |
| 135 def test_serialize(self): | |
| 136 w = MockWrapper.from_json(self.w.to_json()) | |
| 137 self.assertEqual(w.wrapped, self.w.wrapped) | |
| 138 | |
| 139 def test_clone(self): | |
| 140 w = self.w.clone() | |
| 141 self.assertEqual(w.wrapped, self.w.wrapped) | |
| 142 w.b[0] = 111 | |
| 143 self.assertEqual(self.w.b[0], 2) | |
| 144 | |
| 145 def test_kwargs(self): | |
| 146 parent = MockWrapper({'a': 10}) | |
| 147 w = MockWrapper(self.d, parent=parent) | |
| 148 self.assertIs(w.parent, parent) | |
| 149 self.assertRaises(AttributeError, setattr, w, 'parent', 0) | |
| 150 | |
| 151 | |
| 152 class TestWorkflow(unittest.TestCase): | |
| 153 | |
| 154 def setUp(self): | |
| 155 self.wf = wrappers.Workflow(SAMPLE_WF_DICT) | |
| 156 | |
| 157 def test_initialize(self): | |
| 158 self.assertEqual(self.wf.id, '9005c5112febe774') | |
| 159 self.assertEqual(self.wf.name, 'paste_columns') | |
| 160 self.assertEqual(self.wf.deleted, False) | |
| 161 self.assertEqual(self.wf.published, False) | |
| 162 self.assertEqual(self.wf.tags, []) | |
| 163 self.assertEqual( | |
| 164 self.wf.input_labels_to_ids, {'Input Dataset': set(['571', '572'])}) | |
| 165 self.assertEqual(self.wf.tool_labels_to_ids, {'Paste1': set(['573'])}) | |
| 166 self.assertEqual(self.wf.data_input_ids, set(['571', '572'])) | |
| 167 self.assertEqual(self.wf.source_ids, set(['571', '572'])) | |
| 168 self.assertEqual(self.wf.sink_ids, set(['573'])) | |
| 169 | |
| 170 def test_dag(self): | |
| 171 inv_dag = {} | |
| 172 for h, tails in self.wf.dag.items(): | |
| 173 for t in tails: | |
| 174 inv_dag.setdefault(str(t), set()).add(h) | |
| 175 self.assertEqual(self.wf.inv_dag, inv_dag) | |
| 176 heads = set(self.wf.dag) | |
| 177 self.assertEqual(heads, set.union(*self.wf.inv_dag.values())) | |
| 178 tails = set(self.wf.inv_dag) | |
| 179 self.assertEqual(tails, set.union(*self.wf.dag.values())) | |
| 180 ids = self.wf.sorted_step_ids() | |
| 181 self.assertEqual(set(ids), heads | tails) | |
| 182 for h, tails in self.wf.dag.items(): | |
| 183 for t in tails: | |
| 184 self.assertLess(ids.index(h), ids.index(t)) | |
| 185 | |
| 186 def test_steps(self): | |
| 187 steps = SAMPLE_WF_DICT['steps'] | |
| 188 for sid, s in self.wf.steps.items(): | |
| 189 self.assertIsInstance(s, wrappers.Step) | |
| 190 self.assertEqual(s.id, sid) | |
| 191 self.assertIn(sid, steps) | |
| 192 self.assertIs(s.parent, self.wf) | |
| 193 self.assertEqual(self.wf.data_input_ids, set(['571', '572'])) | |
| 194 self.assertEqual(self.wf.tool_ids, set(['573'])) | |
| 195 | |
| 196 def test_taint(self): | |
| 197 self.assertFalse(self.wf.is_modified) | |
| 198 self.wf.steps['571'].tool_id = 'foo' | |
| 199 self.assertTrue(self.wf.is_modified) | |
| 200 | |
| 201 def test_input_map(self): | |
| 202 class DummyLD(object): | |
| 203 SRC = 'ld' | |
| 204 | |
| 205 def __init__(self, id_): | |
| 206 self.id = id_ | |
| 207 | |
| 208 label = 'Input Dataset' | |
| 209 self.assertEqual(self.wf.input_labels, set([label])) | |
| 210 input_map = self.wf.convert_input_map( | |
| 211 {label: [DummyLD('a'), DummyLD('b')]}) | |
| 212 # {'571': {'id': 'a', 'src': 'ld'}, '572': {'id': 'b', 'src': 'ld'}} | |
| 213 # OR | |
| 214 # {'571': {'id': 'b', 'src': 'ld'}, '572': {'id': 'a', 'src': 'ld'}} | |
| 215 self.assertEqual(set(input_map), set(['571', '572'])) | |
| 216 for d in input_map.values(): | |
| 217 self.assertEqual(set(d), set(['id', 'src'])) | |
| 218 self.assertEqual(d['src'], 'ld') | |
| 219 self.assertIn(d['id'], 'ab') | |
| 220 | |
| 221 | |
| 222 @test_util.skip_unless_galaxy() | |
| 223 class GalaxyObjectsTestBase(unittest.TestCase): | |
| 224 | |
| 225 def setUp(self): | |
| 226 galaxy_key = os.environ['BIOBLEND_GALAXY_API_KEY'] | |
| 227 galaxy_url = os.environ['BIOBLEND_GALAXY_URL'] | |
| 228 self.gi = galaxy_instance.GalaxyInstance(galaxy_url, galaxy_key) | |
| 229 | |
| 230 | |
| 231 class TestGalaxyInstance(GalaxyObjectsTestBase): | |
| 232 | |
| 233 def test_library(self): | |
| 234 name = 'test_%s' % uuid.uuid4().hex | |
| 235 description, synopsis = 'D', 'S' | |
| 236 lib = self.gi.libraries.create( | |
| 237 name, description=description, synopsis=synopsis) | |
| 238 self.assertEqual(lib.name, name) | |
| 239 self.assertEqual(lib.description, description) | |
| 240 self.assertEqual(lib.synopsis, synopsis) | |
| 241 self.assertEqual(len(lib.content_infos), 1) # root folder | |
| 242 self.assertEqual(len(lib.folder_ids), 1) | |
| 243 self.assertEqual(len(lib.dataset_ids), 0) | |
| 244 self.assertIn(lib.id, [_.id for _ in self.gi.libraries.list()]) | |
| 245 lib.delete() | |
| 246 self.assertFalse(lib.is_mapped) | |
| 247 | |
| 248 def test_workflow_from_str(self): | |
| 249 with open(SAMPLE_FN) as f: | |
| 250 wf = self.gi.workflows.import_new(f.read()) | |
| 251 self._check_and_del_workflow(wf) | |
| 252 | |
| 253 def test_workflow_collections_from_str(self): | |
| 254 with open(SAMPLE_WF_COLL_FN) as f: | |
| 255 wf = self.gi.workflows.import_new(f.read()) | |
| 256 self._check_and_del_workflow(wf) | |
| 257 | |
| 258 @test_util.skip_unless_galaxy('release_19.01') | |
| 259 def test_workflow_parameter_input(self): | |
| 260 with open(SAMPLE_WF_PARAMETER_INPUT_FN) as f: | |
| 261 self.gi.workflows.import_new(f.read()) | |
| 262 | |
| 263 def test_workflow_from_dict(self): | |
| 264 with open(SAMPLE_FN) as f: | |
| 265 wf = self.gi.workflows.import_new(json.load(f)) | |
| 266 self._check_and_del_workflow(wf) | |
| 267 | |
| 268 def test_workflow_publish_from_dict(self): | |
| 269 with open(SAMPLE_FN) as f: | |
| 270 wf = self.gi.workflows.import_new(json.load(f), publish=True) | |
| 271 self._check_and_del_workflow(wf, check_is_public=True) | |
| 272 | |
| 273 def test_workflow_missing_tools(self): | |
| 274 with open(SAMPLE_FN) as f: | |
| 275 wf_dump = json.load(f) | |
| 276 wf_info = self.gi.gi.workflows.import_workflow_dict(wf_dump) | |
| 277 wf_dict = self.gi.gi.workflows.show_workflow(wf_info['id']) | |
| 278 for id_, step in wf_dict['steps'].items(): | |
| 279 if step['type'] == 'tool': | |
| 280 for k in 'tool_inputs', 'tool_version': | |
| 281 wf_dict['steps'][id_][k] = None | |
| 282 wf = wrappers.Workflow(wf_dict, gi=self.gi) | |
| 283 self.assertFalse(wf.is_runnable) | |
| 284 self.assertRaises(RuntimeError, wf.run) | |
| 285 wf.delete() | |
| 286 | |
| 287 def test_workflow_export(self): | |
| 288 with open(SAMPLE_FN) as f: | |
| 289 wf1 = self.gi.workflows.import_new(f.read()) | |
| 290 wf2 = self.gi.workflows.import_new(wf1.export()) | |
| 291 self.assertNotEqual(wf1.id, wf2.id) | |
| 292 for wf in wf1, wf2: | |
| 293 self._check_and_del_workflow(wf) | |
| 294 | |
| 295 def _check_and_del_workflow(self, wf, check_is_public=False): | |
| 296 # Galaxy appends additional text to imported workflow names | |
| 297 self.assertTrue(wf.name.startswith('paste_columns')) | |
| 298 self.assertEqual(len(wf.steps), 3) | |
| 299 for step_id, step in wf.steps.items(): | |
| 300 self.assertIsInstance(step, wrappers.Step) | |
| 301 self.assertEqual(step_id, step.id) | |
| 302 self.assertIsInstance(step.tool_inputs, dict) | |
| 303 if step.type == 'tool': | |
| 304 self.assertIsNotNone(step.tool_id) | |
| 305 self.assertIsNotNone(step.tool_version) | |
| 306 self.assertIsInstance(step.input_steps, dict) | |
| 307 elif step.type in ('data_collection_input', 'data_input'): | |
| 308 self.assertIsNone(step.tool_id) | |
| 309 self.assertIsNone(step.tool_version) | |
| 310 self.assertEqual(step.input_steps, {}) | |
| 311 wf_ids = {_.id for _ in self.gi.workflows.list()} | |
| 312 self.assertIn(wf.id, wf_ids) | |
| 313 if check_is_public: | |
| 314 self.assertTrue(wf.published) | |
| 315 wf.delete() | |
| 316 | |
| 317 # not very accurate: | |
| 318 # * we can't publish a wf from the API | |
| 319 # * we can't directly get another user's wf | |
| 320 def test_workflow_from_shared(self): | |
| 321 all_prevs = dict( | |
| 322 (_.id, _) for _ in self.gi.workflows.get_previews(published=True) | |
| 323 ) | |
| 324 pub_only_ids = set(all_prevs).difference( | |
| 325 _.id for _ in self.gi.workflows.get_previews()) | |
| 326 if pub_only_ids: | |
| 327 wf_id = pub_only_ids.pop() | |
| 328 imported = self.gi.workflows.import_shared(wf_id) | |
| 329 self.assertIsInstance(imported, wrappers.Workflow) | |
| 330 imported.delete() | |
| 331 else: | |
| 332 self.skipTest('no published workflows, manually publish a workflow to run this test') | |
| 333 | |
| 334 def test_get_libraries(self): | |
| 335 self._test_multi_get('library') | |
| 336 | |
| 337 def test_get_histories(self): | |
| 338 self._test_multi_get('history') | |
| 339 | |
| 340 def test_get_workflows(self): | |
| 341 self._test_multi_get('workflow') | |
| 342 | |
| 343 def _normalized_functions(self, obj_type): | |
| 344 if obj_type == 'library': | |
| 345 create = self.gi.libraries.create | |
| 346 get_objs = self.gi.libraries.list | |
| 347 get_prevs = self.gi.libraries.get_previews | |
| 348 del_kwargs = {} | |
| 349 elif obj_type == 'history': | |
| 350 create = self.gi.histories.create | |
| 351 get_objs = self.gi.histories.list | |
| 352 get_prevs = self.gi.histories.get_previews | |
| 353 del_kwargs = {'purge': True} | |
| 354 elif obj_type == 'workflow': | |
| 355 def create(name): | |
| 356 with open(SAMPLE_FN) as f: | |
| 357 d = json.load(f) | |
| 358 d['name'] = name | |
| 359 return self.gi.workflows.import_new(d) | |
| 360 | |
| 361 get_objs = self.gi.workflows.list | |
| 362 get_prevs = self.gi.workflows.get_previews | |
| 363 del_kwargs = {} | |
| 364 return create, get_objs, get_prevs, del_kwargs | |
| 365 | |
| 366 def _test_multi_get(self, obj_type): | |
| 367 create, get_objs, get_prevs, del_kwargs = self._normalized_functions( | |
| 368 obj_type) | |
| 369 | |
| 370 def ids(seq): | |
| 371 return set(_.id for _ in seq) | |
| 372 | |
| 373 names = ['test_%s' % uuid.uuid4().hex for _ in range(2)] | |
| 374 objs = [] | |
| 375 try: | |
| 376 objs = [create(_) for _ in names] | |
| 377 self.assertLessEqual(ids(objs), ids(get_objs())) | |
| 378 if obj_type != 'workflow': | |
| 379 filtered = get_objs(name=names[0]) | |
| 380 self.assertEqual(len(filtered), 1) | |
| 381 self.assertEqual(filtered[0].id, objs[0].id) | |
| 382 del_id = objs[-1].id | |
| 383 objs.pop().delete(**del_kwargs) | |
| 384 self.assertIn(del_id, ids(get_prevs(deleted=True))) | |
| 385 else: | |
| 386 # Galaxy appends info strings to imported workflow names | |
| 387 prev = get_prevs()[0] | |
| 388 filtered = get_objs(name=prev.name) | |
| 389 self.assertEqual(len(filtered), 1) | |
| 390 self.assertEqual(filtered[0].id, prev.id) | |
| 391 finally: | |
| 392 for o in objs: | |
| 393 o.delete(**del_kwargs) | |
| 394 | |
| 395 def test_delete_libraries_by_name(self): | |
| 396 self._test_delete_by_name('library') | |
| 397 | |
| 398 def test_delete_histories_by_name(self): | |
| 399 self._test_delete_by_name('history') | |
| 400 | |
| 401 def test_delete_workflows_by_name(self): | |
| 402 self._test_delete_by_name('workflow') | |
| 403 | |
| 404 def _test_delete_by_name(self, obj_type): | |
| 405 create, _, get_prevs, del_kwargs = self._normalized_functions( | |
| 406 obj_type) | |
| 407 name = 'test_%s' % uuid.uuid4().hex | |
| 408 objs = [create(name) for _ in range(2)] # noqa: F812 | |
| 409 final_name = objs[0].name | |
| 410 prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted] | |
| 411 self.assertEqual(len(prevs), len(objs)) | |
| 412 del_kwargs['name'] = final_name | |
| 413 objs[0].gi_module.delete(**del_kwargs) | |
| 414 prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted] | |
| 415 self.assertEqual(len(prevs), 0) | |
| 416 | |
| 417 | |
| 418 class TestLibrary(GalaxyObjectsTestBase): | |
| 419 # just something that can be expected to be always up | |
| 420 DS_URL = 'https://tools.ietf.org/rfc/rfc1866.txt' | |
| 421 | |
| 422 def setUp(self): | |
| 423 super().setUp() | |
| 424 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) | |
| 425 | |
| 426 def tearDown(self): | |
| 427 self.lib.delete() | |
| 428 | |
| 429 def test_root_folder(self): | |
| 430 r = self.lib.root_folder | |
| 431 self.assertIsNone(r.parent) | |
| 432 | |
| 433 def test_folder(self): | |
| 434 name, desc = 'test_%s' % uuid.uuid4().hex, 'D' | |
| 435 folder = self.lib.create_folder(name, description=desc) | |
| 436 self.assertEqual(folder.name, name) | |
| 437 self.assertEqual(folder.description, desc) | |
| 438 self.assertIs(folder.container, self.lib) | |
| 439 self.assertEqual(folder.parent.id, self.lib.root_folder.id) | |
| 440 self.assertEqual(len(self.lib.content_infos), 2) | |
| 441 self.assertEqual(len(self.lib.folder_ids), 2) | |
| 442 self.assertIn(folder.id, self.lib.folder_ids) | |
| 443 retrieved = self.lib.get_folder(folder.id) | |
| 444 self.assertEqual(folder.id, retrieved.id) | |
| 445 | |
| 446 def _check_datasets(self, dss): | |
| 447 self.assertEqual(len(dss), len(self.lib.dataset_ids)) | |
| 448 self.assertEqual(set(_.id for _ in dss), set(self.lib.dataset_ids)) | |
| 449 for ds in dss: | |
| 450 self.assertIs(ds.container, self.lib) | |
| 451 | |
| 452 def test_dataset(self): | |
| 453 folder = self.lib.create_folder('test_%s' % uuid.uuid4().hex) | |
| 454 ds = self.lib.upload_data(FOO_DATA, folder=folder) | |
| 455 self.assertEqual(len(self.lib.content_infos), 3) | |
| 456 self.assertEqual(len(self.lib.folder_ids), 2) | |
| 457 self._check_datasets([ds]) | |
| 458 | |
| 459 def test_dataset_from_url(self): | |
| 460 if is_reachable(self.DS_URL): | |
| 461 ds = self.lib.upload_from_url(self.DS_URL) | |
| 462 self._check_datasets([ds]) | |
| 463 else: | |
| 464 self.skipTest('%s not reachable' % self.DS_URL) | |
| 465 | |
| 466 def test_dataset_from_local(self): | |
| 467 with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f: | |
| 468 f.write(FOO_DATA) | |
| 469 f.flush() | |
| 470 ds = self.lib.upload_from_local(f.name) | |
| 471 self._check_datasets([ds]) | |
| 472 | |
| 473 def test_datasets_from_fs(self): | |
| 474 bnames = ['f%d.txt' % i for i in range(2)] | |
| 475 dss, fnames = upload_from_fs(self.lib, bnames) | |
| 476 self._check_datasets(dss) | |
| 477 dss, fnames = upload_from_fs( | |
| 478 self.lib, bnames, link_data_only='link_to_files') | |
| 479 for ds, fn in zip(dss, fnames): | |
| 480 self.assertEqual(ds.file_name, fn) | |
| 481 | |
| 482 def test_copy_from_dataset(self): | |
| 483 hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) | |
| 484 try: | |
| 485 hda = hist.paste_content(FOO_DATA) | |
| 486 ds = self.lib.copy_from_dataset(hda) | |
| 487 finally: | |
| 488 hist.delete(purge=True) | |
| 489 self._check_datasets([ds]) | |
| 490 | |
| 491 def test_get_dataset(self): | |
| 492 ds = self.lib.upload_data(FOO_DATA) | |
| 493 retrieved = self.lib.get_dataset(ds.id) | |
| 494 self.assertEqual(ds.id, retrieved.id) | |
| 495 | |
| 496 def test_get_datasets(self): | |
| 497 bnames = ['f%d.txt' % _ for _ in range(2)] | |
| 498 dss, _ = upload_from_fs(self.lib, bnames) | |
| 499 retrieved = self.lib.get_datasets() | |
| 500 self.assertEqual(len(dss), len(retrieved)) | |
| 501 self.assertEqual(set(_.id for _ in dss), set(_.id for _ in retrieved)) | |
| 502 name = '/%s' % bnames[0] | |
| 503 selected = self.lib.get_datasets(name=name) | |
| 504 self.assertEqual(len(selected), 1) | |
| 505 self.assertEqual(selected[0].name, bnames[0]) | |
| 506 | |
| 507 | |
| 508 class TestLDContents(GalaxyObjectsTestBase): | |
| 509 | |
| 510 def setUp(self): | |
| 511 super().setUp() | |
| 512 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) | |
| 513 self.ds = self.lib.upload_data(FOO_DATA) | |
| 514 self.ds.wait() | |
| 515 | |
| 516 def tearDown(self): | |
| 517 self.lib.delete() | |
| 518 | |
| 519 def test_dataset_get_stream(self): | |
| 520 for idx, c in enumerate(self.ds.get_stream(chunk_size=1)): | |
| 521 self.assertEqual(FOO_DATA[idx].encode(), c) | |
| 522 | |
| 523 def test_dataset_peek(self): | |
| 524 fetched_data = self.ds.peek(chunk_size=4) | |
| 525 self.assertEqual(FOO_DATA[0:4].encode(), fetched_data) | |
| 526 | |
| 527 def test_dataset_download(self): | |
| 528 with tempfile.TemporaryFile() as f: | |
| 529 self.ds.download(f) | |
| 530 f.seek(0) | |
| 531 self.assertEqual(FOO_DATA.encode(), f.read()) | |
| 532 | |
| 533 def test_dataset_get_contents(self): | |
| 534 self.assertEqual(FOO_DATA.encode(), self.ds.get_contents()) | |
| 535 | |
| 536 def test_dataset_delete(self): | |
| 537 self.ds.delete() | |
| 538 # Cannot test this yet because the 'deleted' attribute is not exported | |
| 539 # by the API at the moment | |
| 540 # self.assertTrue(self.ds.deleted) | |
| 541 | |
| 542 def test_dataset_update(self): | |
| 543 new_name = 'test_%s' % uuid.uuid4().hex | |
| 544 new_misc_info = 'Annotation for %s' % new_name | |
| 545 new_genome_build = 'hg19' | |
| 546 updated_ldda = self.ds.update(name=new_name, misc_info=new_misc_info, genome_build=new_genome_build) | |
| 547 self.assertEqual(self.ds.id, updated_ldda.id) | |
| 548 self.assertEqual(self.ds.name, new_name) | |
| 549 self.assertEqual(self.ds.misc_info, new_misc_info) | |
| 550 self.assertEqual(self.ds.genome_build, new_genome_build) | |
| 551 | |
| 552 | |
| 553 class TestHistory(GalaxyObjectsTestBase): | |
| 554 | |
| 555 def setUp(self): | |
| 556 super().setUp() | |
| 557 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) | |
| 558 | |
| 559 def tearDown(self): | |
| 560 self.hist.delete(purge=True) | |
| 561 | |
| 562 def test_create_delete(self): | |
| 563 name = 'test_%s' % uuid.uuid4().hex | |
| 564 hist = self.gi.histories.create(name) | |
| 565 self.assertEqual(hist.name, name) | |
| 566 hist_id = hist.id | |
| 567 self.assertIn(hist_id, [_.id for _ in self.gi.histories.list()]) | |
| 568 hist.delete(purge=True) | |
| 569 self.assertFalse(hist.is_mapped) | |
| 570 h = self.gi.histories.get(hist_id) | |
| 571 self.assertTrue(h.deleted) | |
| 572 | |
| 573 def _check_dataset(self, hda): | |
| 574 self.assertIsInstance(hda, wrappers.HistoryDatasetAssociation) | |
| 575 self.assertIs(hda.container, self.hist) | |
| 576 self.assertEqual(len(self.hist.dataset_ids), 1) | |
| 577 self.assertEqual(self.hist.dataset_ids[0], hda.id) | |
| 578 | |
| 579 def test_import_dataset(self): | |
| 580 lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) | |
| 581 lds = lib.upload_data(FOO_DATA) | |
| 582 self.assertEqual(len(self.hist.dataset_ids), 0) | |
| 583 hda = self.hist.import_dataset(lds) | |
| 584 lib.delete() | |
| 585 self._check_dataset(hda) | |
| 586 | |
| 587 def test_upload_file(self): | |
| 588 with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f: | |
| 589 f.write(FOO_DATA) | |
| 590 f.flush() | |
| 591 hda = self.hist.upload_file(f.name) | |
| 592 self._check_dataset(hda) | |
| 593 | |
| 594 def test_paste_content(self): | |
| 595 hda = self.hist.paste_content(FOO_DATA) | |
| 596 self._check_dataset(hda) | |
| 597 | |
| 598 def test_get_dataset(self): | |
| 599 hda = self.hist.paste_content(FOO_DATA) | |
| 600 retrieved = self.hist.get_dataset(hda.id) | |
| 601 self.assertEqual(hda.id, retrieved.id) | |
| 602 | |
| 603 def test_get_datasets(self): | |
| 604 bnames = ['f%d.txt' % _ for _ in range(2)] | |
| 605 lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) | |
| 606 lds = upload_from_fs(lib, bnames)[0] | |
| 607 hdas = [self.hist.import_dataset(_) for _ in lds] | |
| 608 lib.delete() | |
| 609 retrieved = self.hist.get_datasets() | |
| 610 self.assertEqual(len(hdas), len(retrieved)) | |
| 611 self.assertEqual(set(_.id for _ in hdas), set(_.id for _ in retrieved)) | |
| 612 selected = self.hist.get_datasets(name=bnames[0]) | |
| 613 self.assertEqual(len(selected), 1) | |
| 614 self.assertEqual(selected[0].name, bnames[0]) | |
| 615 | |
| 616 def test_export_and_download(self): | |
| 617 jeha_id = self.hist.export(wait=True, maxwait=60) | |
| 618 self.assertTrue(jeha_id) | |
| 619 tempdir = tempfile.mkdtemp(prefix='bioblend_test_') | |
| 620 temp_fn = os.path.join(tempdir, 'export.tar.gz') | |
| 621 try: | |
| 622 with open(temp_fn, 'wb') as fo: | |
| 623 self.hist.download(jeha_id, fo) | |
| 624 self.assertTrue(tarfile.is_tarfile(temp_fn)) | |
| 625 finally: | |
| 626 shutil.rmtree(tempdir) | |
| 627 | |
| 628 def test_update(self): | |
| 629 new_name = 'test_%s' % uuid.uuid4().hex | |
| 630 new_annotation = 'Annotation for %s' % new_name | |
| 631 new_tags = ['tag1', 'tag2'] | |
| 632 updated_hist = self.hist.update(name=new_name, annotation=new_annotation, tags=new_tags) | |
| 633 self.assertEqual(self.hist.id, updated_hist.id) | |
| 634 self.assertEqual(self.hist.name, new_name) | |
| 635 self.assertEqual(self.hist.annotation, new_annotation) | |
| 636 self.assertEqual(self.hist.tags, new_tags) | |
| 637 updated_hist = self.hist.update(published=True) | |
| 638 self.assertEqual(self.hist.id, updated_hist.id) | |
| 639 self.assertTrue(self.hist.published) | |
| 640 | |
| 641 def test_create_dataset_collection(self): | |
| 642 self._create_collection_description() | |
| 643 hdca = self.hist.create_dataset_collection(self.collection_description) | |
| 644 self.assertIsInstance(hdca, wrappers.HistoryDatasetCollectionAssociation) | |
| 645 self.assertEqual(hdca.collection_type, 'list') | |
| 646 self.assertIs(hdca.container, self.hist) | |
| 647 self.assertEqual(len(hdca.elements), 2) | |
| 648 self.assertEqual(self.dataset1.id, hdca.elements[0]['object']['id']) | |
| 649 self.assertEqual(self.dataset2.id, hdca.elements[1]['object']['id']) | |
| 650 | |
| 651 def test_delete_dataset_collection(self): | |
| 652 self._create_collection_description() | |
| 653 hdca = self.hist.create_dataset_collection(self.collection_description) | |
| 654 hdca.delete() | |
| 655 self.assertTrue(hdca.deleted) | |
| 656 | |
| 657 def _create_collection_description(self): | |
| 658 self.dataset1 = self.hist.paste_content(FOO_DATA) | |
| 659 self.dataset2 = self.hist.paste_content(FOO_DATA_2) | |
| 660 self.collection_description = dataset_collections.CollectionDescription( | |
| 661 name="MyDatasetList", | |
| 662 elements=[ | |
| 663 dataset_collections.HistoryDatasetElement(name="sample1", id=self.dataset1.id), | |
| 664 dataset_collections.HistoryDatasetElement(name="sample2", id=self.dataset2.id), | |
| 665 ] | |
| 666 ) | |
| 667 | |
| 668 | |
| 669 class TestHDAContents(GalaxyObjectsTestBase): | |
| 670 | |
| 671 def setUp(self): | |
| 672 super().setUp() | |
| 673 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) | |
| 674 self.ds = self.hist.paste_content(FOO_DATA) | |
| 675 self.ds.wait() | |
| 676 | |
| 677 def tearDown(self): | |
| 678 self.hist.delete(purge=True) | |
| 679 | |
| 680 def test_dataset_get_stream(self): | |
| 681 for idx, c in enumerate(self.ds.get_stream(chunk_size=1)): | |
| 682 self.assertEqual(FOO_DATA[idx].encode(), c) | |
| 683 | |
| 684 def test_dataset_peek(self): | |
| 685 fetched_data = self.ds.peek(chunk_size=4) | |
| 686 self.assertEqual(FOO_DATA[0:4].encode(), fetched_data) | |
| 687 | |
| 688 def test_dataset_download(self): | |
| 689 with tempfile.TemporaryFile() as f: | |
| 690 self.ds.download(f) | |
| 691 f.seek(0) | |
| 692 self.assertEqual(FOO_DATA.encode(), f.read()) | |
| 693 | |
| 694 def test_dataset_get_contents(self): | |
| 695 self.assertEqual(FOO_DATA.encode(), self.ds.get_contents()) | |
| 696 | |
| 697 def test_dataset_update(self): | |
| 698 new_name = 'test_%s' % uuid.uuid4().hex | |
| 699 new_annotation = 'Annotation for %s' % new_name | |
| 700 new_genome_build = 'hg19' | |
| 701 updated_hda = self.ds.update(name=new_name, annotation=new_annotation, genome_build=new_genome_build) | |
| 702 self.assertEqual(self.ds.id, updated_hda.id) | |
| 703 self.assertEqual(self.ds.name, new_name) | |
| 704 self.assertEqual(self.ds.annotation, new_annotation) | |
| 705 self.assertEqual(self.ds.genome_build, new_genome_build) | |
| 706 | |
| 707 def test_dataset_delete(self): | |
| 708 self.ds.delete() | |
| 709 self.assertTrue(self.ds.deleted) | |
| 710 self.assertFalse(self.ds.purged) | |
| 711 | |
| 712 def test_dataset_purge(self): | |
| 713 self.ds.delete(purge=True) | |
| 714 self.assertTrue(self.ds.deleted) | |
| 715 self.assertTrue(self.ds.purged) | |
| 716 | |
| 717 | |
| 718 class TestRunWorkflow(GalaxyObjectsTestBase): | |
| 719 | |
| 720 def setUp(self): | |
| 721 super().setUp() | |
| 722 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) | |
| 723 with open(SAMPLE_FN) as f: | |
| 724 self.wf = self.gi.workflows.import_new(f.read()) | |
| 725 self.contents = ['one\ntwo\n', '1\n2\n'] | |
| 726 self.inputs = [self.lib.upload_data(_) for _ in self.contents] | |
| 727 | |
| 728 def tearDown(self): | |
| 729 self.wf.delete() | |
| 730 self.lib.delete() | |
| 731 | |
| 732 def _test(self, existing_hist=False, params=False): | |
| 733 hist_name = 'test_%s' % uuid.uuid4().hex | |
| 734 if existing_hist: | |
| 735 hist = self.gi.histories.create(hist_name) | |
| 736 else: | |
| 737 hist = hist_name | |
| 738 if params: | |
| 739 params = {'Paste1': {'delimiter': 'U'}} | |
| 740 sep = '_' # 'U' maps to '_' in the paste tool | |
| 741 else: | |
| 742 params = None | |
| 743 sep = '\t' # default | |
| 744 input_map = {'Input 1': self.inputs[0], 'Input 2': self.inputs[1]} | |
| 745 sys.stderr.write(os.linesep) | |
| 746 outputs, out_hist = self.wf.run( | |
| 747 input_map, hist, params=params, wait=True, polling_interval=1) | |
| 748 self.assertEqual(len(outputs), 1) | |
| 749 out_ds = outputs[0] | |
| 750 self.assertIn(out_ds.id, out_hist.dataset_ids) | |
| 751 res = out_ds.get_contents() | |
| 752 exp_rows = zip(*(_.splitlines() for _ in self.contents)) | |
| 753 exp_res = ("\n".join(sep.join(t) for t in exp_rows) + "\n").encode() | |
| 754 self.assertEqual(res, exp_res) | |
| 755 if existing_hist: | |
| 756 self.assertEqual(out_hist.id, hist.id) | |
| 757 out_hist.delete(purge=True) | |
| 758 | |
| 759 def test_existing_history(self): | |
| 760 self._test(existing_hist=True) | |
| 761 | |
| 762 def test_new_history(self): | |
| 763 self._test(existing_hist=False) | |
| 764 | |
| 765 def test_params(self): | |
| 766 self._test(params=True) | |
| 767 | |
| 768 | |
| 769 class TestRunDatasetCollectionWorkflow(GalaxyObjectsTestBase): | |
| 770 | |
| 771 def setUp(self): | |
| 772 super().setUp() | |
| 773 with open(SAMPLE_WF_COLL_FN) as f: | |
| 774 self.wf = self.gi.workflows.import_new(f.read()) | |
| 775 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) | |
| 776 | |
| 777 def tearDown(self): | |
| 778 self.wf.delete() | |
| 779 self.hist.delete(purge=True) | |
| 780 | |
| 781 def test_run_workflow_with_dataset_collection(self): | |
| 782 dataset1 = self.hist.paste_content(FOO_DATA) | |
| 783 dataset2 = self.hist.paste_content(FOO_DATA_2) | |
| 784 collection_description = dataset_collections.CollectionDescription( | |
| 785 name="MyDatasetList", | |
| 786 elements=[ | |
| 787 dataset_collections.HistoryDatasetElement(name="sample1", id=dataset1.id), | |
| 788 dataset_collections.HistoryDatasetElement(name="sample2", id=dataset2.id), | |
| 789 ] | |
| 790 ) | |
| 791 dataset_collection = self.hist.create_dataset_collection(collection_description) | |
| 792 input_map = {"Input Dataset Collection": dataset_collection, | |
| 793 "Input 2": dataset1} | |
| 794 outputs, out_hist = self.wf.run(input_map, self.hist, wait=True) | |
| 795 self.assertEqual(len(outputs), 1) | |
| 796 out_hdca = outputs[0] | |
| 797 self.assertIsInstance(out_hdca, wrappers.HistoryDatasetCollectionAssociation) | |
| 798 self.assertEqual(out_hdca.collection_type, 'list') | |
| 799 self.assertEqual(len(out_hdca.elements), 2) | |
| 800 self.assertEqual(out_hist.id, self.hist.id) | |
| 801 | |
| 802 | |
| 803 class TestJob(GalaxyObjectsTestBase): | |
| 804 | |
| 805 def test_get(self): | |
| 806 job_prevs = self.gi.jobs.get_previews() | |
| 807 if len(job_prevs) > 0: | |
| 808 job_prev = job_prevs[0] | |
| 809 self.assertIsInstance(job_prev, wrappers.JobPreview) | |
| 810 job = self.gi.jobs.get(job_prev.id) | |
| 811 self.assertIsInstance(job, wrappers.Job) | |
| 812 self.assertEqual(job.id, job_prev.id) | |
| 813 for job in self.gi.jobs.list(): | |
| 814 self.assertIsInstance(job, wrappers.Job) | |
| 815 | |
| 816 | |
| 817 def suite(): | |
| 818 loader = unittest.TestLoader() | |
| 819 s = unittest.TestSuite() | |
| 820 s.addTests([loader.loadTestsFromTestCase(c) for c in ( | |
| 821 TestWrapper, | |
| 822 TestWorkflow, | |
| 823 TestGalaxyInstance, | |
| 824 TestLibrary, | |
| 825 TestLDContents, | |
| 826 TestHistory, | |
| 827 TestHDAContents, | |
| 828 TestRunWorkflow, | |
| 829 )]) | |
| 830 return s | |
| 831 | |
| 832 | |
| 833 if __name__ == '__main__': | |
| 834 tests = suite() | |
| 835 RUNNER = unittest.TextTestRunner(verbosity=2) | |
| 836 RUNNER.run(tests) |
