comparison planemo/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyObjects.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d30785e31577
1 # pylint: disable=C0103,E1101
2 import json
3 import os
4 import shutil
5 import socket
6 import sys
7 import tarfile
8 import tempfile
9 import uuid
10 from ssl import SSLError
11 from urllib.error import URLError
12 from urllib.request import urlopen
13
14 import bioblend
15 import bioblend.galaxy.objects.galaxy_instance as galaxy_instance
16 import bioblend.galaxy.objects.wrappers as wrappers
17 from bioblend.galaxy import dataset_collections
18 from . import test_util
19 from .test_util import unittest
20
21 bioblend.set_stream_logger('test', level='INFO')
22 socket.setdefaulttimeout(10.0)
23 SAMPLE_FN = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
24 SAMPLE_WF_COLL_FN = test_util.get_abspath(os.path.join('data', 'paste_columns_collections.ga'))
25 SAMPLE_WF_PARAMETER_INPUT_FN = test_util.get_abspath(os.path.join('data', 'workflow_with_parameter_input.ga'))
26 FOO_DATA = 'foo\nbar\n'
27 FOO_DATA_2 = 'foo2\nbar2\n'
28 SAMPLE_WF_DICT = {
29 'deleted': False,
30 'id': '9005c5112febe774',
31 'inputs': {
32 '571': {'label': 'Input Dataset', 'value': ''},
33 '572': {'label': 'Input Dataset', 'value': ''},
34 },
35 'model_class': 'StoredWorkflow',
36 'name': 'paste_columns',
37 'published': False,
38 'steps': {
39 '571': {
40 'id': 571,
41 'input_steps': {},
42 'tool_id': None,
43 'tool_inputs': {'name': 'Input Dataset'},
44 'tool_version': None,
45 'type': 'data_input',
46 },
47 '572': {
48 'id': 572,
49 'input_steps': {},
50 'tool_id': None,
51 'tool_inputs': {'name': 'Input Dataset'},
52 'tool_version': None,
53 'type': 'data_input',
54 },
55 '573': {
56 'id': 573,
57 'input_steps': {
58 'input1': {'source_step': 571, 'step_output': 'output'},
59 'input2': {'source_step': 572, 'step_output': 'output'},
60 },
61 'tool_id': 'Paste1',
62 'tool_inputs': {
63 'delimiter': '"T"',
64 'input1': 'null',
65 'input2': 'null',
66 },
67 'tool_version': '1.0.0',
68 'type': 'tool',
69 }
70 },
71 'tags': [],
72 'url': '/api/workflows/9005c5112febe774',
73 }
74
75
76 def is_reachable(url):
77 res = None
78 try:
79 res = urlopen(url, timeout=5)
80 except (SSLError, URLError, socket.timeout):
81 return False
82 if res is not None:
83 res.close()
84 return True
85
86
87 def upload_from_fs(lib, bnames, **kwargs):
88 tempdir = tempfile.mkdtemp(prefix='bioblend_test_')
89 try:
90 fnames = [os.path.join(tempdir, _) for _ in bnames]
91 for fn in fnames:
92 with open(fn, 'w') as f:
93 f.write(FOO_DATA)
94 dss = lib.upload_from_galaxy_fs(fnames, **kwargs)
95 finally:
96 shutil.rmtree(tempdir)
97 return dss, fnames
98
99
100 class MockWrapper(wrappers.Wrapper):
101 BASE_ATTRS = frozenset(['a', 'b'])
102
103 def __init__(self, *args, **kwargs):
104 super().__init__(*args, **kwargs)
105
106 @property
107 def gi_module(self):
108 return super().gi_module()
109
110
111 class TestWrapper(unittest.TestCase):
112
113 def setUp(self):
114 self.d = {'a': 1, 'b': [2, 3], 'c': {'x': 4}}
115 self.assertRaises(TypeError, wrappers.Wrapper, self.d)
116 self.w = MockWrapper(self.d)
117
118 def test_initialize(self):
119 for k in MockWrapper.BASE_ATTRS:
120 self.assertEqual(getattr(self.w, k), self.d[k])
121 self.w.a = 222
122 self.w.b[0] = 222
123 self.assertEqual(self.w.a, 222)
124 self.assertEqual(self.w.b[0], 222)
125 self.assertEqual(self.d['a'], 1)
126 self.assertEqual(self.d['b'][0], 2)
127 self.assertRaises(AttributeError, getattr, self.w, 'foo')
128 self.assertRaises(AttributeError, setattr, self.w, 'foo', 0)
129
130 def test_taint(self):
131 self.assertFalse(self.w.is_modified)
132 self.w.a = 111 # pylint: disable=W0201
133 self.assertTrue(self.w.is_modified)
134
135 def test_serialize(self):
136 w = MockWrapper.from_json(self.w.to_json())
137 self.assertEqual(w.wrapped, self.w.wrapped)
138
139 def test_clone(self):
140 w = self.w.clone()
141 self.assertEqual(w.wrapped, self.w.wrapped)
142 w.b[0] = 111
143 self.assertEqual(self.w.b[0], 2)
144
145 def test_kwargs(self):
146 parent = MockWrapper({'a': 10})
147 w = MockWrapper(self.d, parent=parent)
148 self.assertIs(w.parent, parent)
149 self.assertRaises(AttributeError, setattr, w, 'parent', 0)
150
151
152 class TestWorkflow(unittest.TestCase):
153
154 def setUp(self):
155 self.wf = wrappers.Workflow(SAMPLE_WF_DICT)
156
157 def test_initialize(self):
158 self.assertEqual(self.wf.id, '9005c5112febe774')
159 self.assertEqual(self.wf.name, 'paste_columns')
160 self.assertEqual(self.wf.deleted, False)
161 self.assertEqual(self.wf.published, False)
162 self.assertEqual(self.wf.tags, [])
163 self.assertEqual(
164 self.wf.input_labels_to_ids, {'Input Dataset': set(['571', '572'])})
165 self.assertEqual(self.wf.tool_labels_to_ids, {'Paste1': set(['573'])})
166 self.assertEqual(self.wf.data_input_ids, set(['571', '572']))
167 self.assertEqual(self.wf.source_ids, set(['571', '572']))
168 self.assertEqual(self.wf.sink_ids, set(['573']))
169
170 def test_dag(self):
171 inv_dag = {}
172 for h, tails in self.wf.dag.items():
173 for t in tails:
174 inv_dag.setdefault(str(t), set()).add(h)
175 self.assertEqual(self.wf.inv_dag, inv_dag)
176 heads = set(self.wf.dag)
177 self.assertEqual(heads, set.union(*self.wf.inv_dag.values()))
178 tails = set(self.wf.inv_dag)
179 self.assertEqual(tails, set.union(*self.wf.dag.values()))
180 ids = self.wf.sorted_step_ids()
181 self.assertEqual(set(ids), heads | tails)
182 for h, tails in self.wf.dag.items():
183 for t in tails:
184 self.assertLess(ids.index(h), ids.index(t))
185
186 def test_steps(self):
187 steps = SAMPLE_WF_DICT['steps']
188 for sid, s in self.wf.steps.items():
189 self.assertIsInstance(s, wrappers.Step)
190 self.assertEqual(s.id, sid)
191 self.assertIn(sid, steps)
192 self.assertIs(s.parent, self.wf)
193 self.assertEqual(self.wf.data_input_ids, set(['571', '572']))
194 self.assertEqual(self.wf.tool_ids, set(['573']))
195
196 def test_taint(self):
197 self.assertFalse(self.wf.is_modified)
198 self.wf.steps['571'].tool_id = 'foo'
199 self.assertTrue(self.wf.is_modified)
200
201 def test_input_map(self):
202 class DummyLD(object):
203 SRC = 'ld'
204
205 def __init__(self, id_):
206 self.id = id_
207
208 label = 'Input Dataset'
209 self.assertEqual(self.wf.input_labels, set([label]))
210 input_map = self.wf.convert_input_map(
211 {label: [DummyLD('a'), DummyLD('b')]})
212 # {'571': {'id': 'a', 'src': 'ld'}, '572': {'id': 'b', 'src': 'ld'}}
213 # OR
214 # {'571': {'id': 'b', 'src': 'ld'}, '572': {'id': 'a', 'src': 'ld'}}
215 self.assertEqual(set(input_map), set(['571', '572']))
216 for d in input_map.values():
217 self.assertEqual(set(d), set(['id', 'src']))
218 self.assertEqual(d['src'], 'ld')
219 self.assertIn(d['id'], 'ab')
220
221
222 @test_util.skip_unless_galaxy()
223 class GalaxyObjectsTestBase(unittest.TestCase):
224
225 def setUp(self):
226 galaxy_key = os.environ['BIOBLEND_GALAXY_API_KEY']
227 galaxy_url = os.environ['BIOBLEND_GALAXY_URL']
228 self.gi = galaxy_instance.GalaxyInstance(galaxy_url, galaxy_key)
229
230
231 class TestGalaxyInstance(GalaxyObjectsTestBase):
232
233 def test_library(self):
234 name = 'test_%s' % uuid.uuid4().hex
235 description, synopsis = 'D', 'S'
236 lib = self.gi.libraries.create(
237 name, description=description, synopsis=synopsis)
238 self.assertEqual(lib.name, name)
239 self.assertEqual(lib.description, description)
240 self.assertEqual(lib.synopsis, synopsis)
241 self.assertEqual(len(lib.content_infos), 1) # root folder
242 self.assertEqual(len(lib.folder_ids), 1)
243 self.assertEqual(len(lib.dataset_ids), 0)
244 self.assertIn(lib.id, [_.id for _ in self.gi.libraries.list()])
245 lib.delete()
246 self.assertFalse(lib.is_mapped)
247
248 def test_workflow_from_str(self):
249 with open(SAMPLE_FN) as f:
250 wf = self.gi.workflows.import_new(f.read())
251 self._check_and_del_workflow(wf)
252
253 def test_workflow_collections_from_str(self):
254 with open(SAMPLE_WF_COLL_FN) as f:
255 wf = self.gi.workflows.import_new(f.read())
256 self._check_and_del_workflow(wf)
257
258 @test_util.skip_unless_galaxy('release_19.01')
259 def test_workflow_parameter_input(self):
260 with open(SAMPLE_WF_PARAMETER_INPUT_FN) as f:
261 self.gi.workflows.import_new(f.read())
262
263 def test_workflow_from_dict(self):
264 with open(SAMPLE_FN) as f:
265 wf = self.gi.workflows.import_new(json.load(f))
266 self._check_and_del_workflow(wf)
267
268 def test_workflow_publish_from_dict(self):
269 with open(SAMPLE_FN) as f:
270 wf = self.gi.workflows.import_new(json.load(f), publish=True)
271 self._check_and_del_workflow(wf, check_is_public=True)
272
273 def test_workflow_missing_tools(self):
274 with open(SAMPLE_FN) as f:
275 wf_dump = json.load(f)
276 wf_info = self.gi.gi.workflows.import_workflow_dict(wf_dump)
277 wf_dict = self.gi.gi.workflows.show_workflow(wf_info['id'])
278 for id_, step in wf_dict['steps'].items():
279 if step['type'] == 'tool':
280 for k in 'tool_inputs', 'tool_version':
281 wf_dict['steps'][id_][k] = None
282 wf = wrappers.Workflow(wf_dict, gi=self.gi)
283 self.assertFalse(wf.is_runnable)
284 self.assertRaises(RuntimeError, wf.run)
285 wf.delete()
286
287 def test_workflow_export(self):
288 with open(SAMPLE_FN) as f:
289 wf1 = self.gi.workflows.import_new(f.read())
290 wf2 = self.gi.workflows.import_new(wf1.export())
291 self.assertNotEqual(wf1.id, wf2.id)
292 for wf in wf1, wf2:
293 self._check_and_del_workflow(wf)
294
295 def _check_and_del_workflow(self, wf, check_is_public=False):
296 # Galaxy appends additional text to imported workflow names
297 self.assertTrue(wf.name.startswith('paste_columns'))
298 self.assertEqual(len(wf.steps), 3)
299 for step_id, step in wf.steps.items():
300 self.assertIsInstance(step, wrappers.Step)
301 self.assertEqual(step_id, step.id)
302 self.assertIsInstance(step.tool_inputs, dict)
303 if step.type == 'tool':
304 self.assertIsNotNone(step.tool_id)
305 self.assertIsNotNone(step.tool_version)
306 self.assertIsInstance(step.input_steps, dict)
307 elif step.type in ('data_collection_input', 'data_input'):
308 self.assertIsNone(step.tool_id)
309 self.assertIsNone(step.tool_version)
310 self.assertEqual(step.input_steps, {})
311 wf_ids = {_.id for _ in self.gi.workflows.list()}
312 self.assertIn(wf.id, wf_ids)
313 if check_is_public:
314 self.assertTrue(wf.published)
315 wf.delete()
316
317 # not very accurate:
318 # * we can't publish a wf from the API
319 # * we can't directly get another user's wf
320 def test_workflow_from_shared(self):
321 all_prevs = dict(
322 (_.id, _) for _ in self.gi.workflows.get_previews(published=True)
323 )
324 pub_only_ids = set(all_prevs).difference(
325 _.id for _ in self.gi.workflows.get_previews())
326 if pub_only_ids:
327 wf_id = pub_only_ids.pop()
328 imported = self.gi.workflows.import_shared(wf_id)
329 self.assertIsInstance(imported, wrappers.Workflow)
330 imported.delete()
331 else:
332 self.skipTest('no published workflows, manually publish a workflow to run this test')
333
334 def test_get_libraries(self):
335 self._test_multi_get('library')
336
337 def test_get_histories(self):
338 self._test_multi_get('history')
339
340 def test_get_workflows(self):
341 self._test_multi_get('workflow')
342
343 def _normalized_functions(self, obj_type):
344 if obj_type == 'library':
345 create = self.gi.libraries.create
346 get_objs = self.gi.libraries.list
347 get_prevs = self.gi.libraries.get_previews
348 del_kwargs = {}
349 elif obj_type == 'history':
350 create = self.gi.histories.create
351 get_objs = self.gi.histories.list
352 get_prevs = self.gi.histories.get_previews
353 del_kwargs = {'purge': True}
354 elif obj_type == 'workflow':
355 def create(name):
356 with open(SAMPLE_FN) as f:
357 d = json.load(f)
358 d['name'] = name
359 return self.gi.workflows.import_new(d)
360
361 get_objs = self.gi.workflows.list
362 get_prevs = self.gi.workflows.get_previews
363 del_kwargs = {}
364 return create, get_objs, get_prevs, del_kwargs
365
366 def _test_multi_get(self, obj_type):
367 create, get_objs, get_prevs, del_kwargs = self._normalized_functions(
368 obj_type)
369
370 def ids(seq):
371 return set(_.id for _ in seq)
372
373 names = ['test_%s' % uuid.uuid4().hex for _ in range(2)]
374 objs = []
375 try:
376 objs = [create(_) for _ in names]
377 self.assertLessEqual(ids(objs), ids(get_objs()))
378 if obj_type != 'workflow':
379 filtered = get_objs(name=names[0])
380 self.assertEqual(len(filtered), 1)
381 self.assertEqual(filtered[0].id, objs[0].id)
382 del_id = objs[-1].id
383 objs.pop().delete(**del_kwargs)
384 self.assertIn(del_id, ids(get_prevs(deleted=True)))
385 else:
386 # Galaxy appends info strings to imported workflow names
387 prev = get_prevs()[0]
388 filtered = get_objs(name=prev.name)
389 self.assertEqual(len(filtered), 1)
390 self.assertEqual(filtered[0].id, prev.id)
391 finally:
392 for o in objs:
393 o.delete(**del_kwargs)
394
395 def test_delete_libraries_by_name(self):
396 self._test_delete_by_name('library')
397
398 def test_delete_histories_by_name(self):
399 self._test_delete_by_name('history')
400
401 def test_delete_workflows_by_name(self):
402 self._test_delete_by_name('workflow')
403
404 def _test_delete_by_name(self, obj_type):
405 create, _, get_prevs, del_kwargs = self._normalized_functions(
406 obj_type)
407 name = 'test_%s' % uuid.uuid4().hex
408 objs = [create(name) for _ in range(2)] # noqa: F812
409 final_name = objs[0].name
410 prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted]
411 self.assertEqual(len(prevs), len(objs))
412 del_kwargs['name'] = final_name
413 objs[0].gi_module.delete(**del_kwargs)
414 prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted]
415 self.assertEqual(len(prevs), 0)
416
417
418 class TestLibrary(GalaxyObjectsTestBase):
419 # just something that can be expected to be always up
420 DS_URL = 'https://tools.ietf.org/rfc/rfc1866.txt'
421
422 def setUp(self):
423 super().setUp()
424 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
425
426 def tearDown(self):
427 self.lib.delete()
428
429 def test_root_folder(self):
430 r = self.lib.root_folder
431 self.assertIsNone(r.parent)
432
433 def test_folder(self):
434 name, desc = 'test_%s' % uuid.uuid4().hex, 'D'
435 folder = self.lib.create_folder(name, description=desc)
436 self.assertEqual(folder.name, name)
437 self.assertEqual(folder.description, desc)
438 self.assertIs(folder.container, self.lib)
439 self.assertEqual(folder.parent.id, self.lib.root_folder.id)
440 self.assertEqual(len(self.lib.content_infos), 2)
441 self.assertEqual(len(self.lib.folder_ids), 2)
442 self.assertIn(folder.id, self.lib.folder_ids)
443 retrieved = self.lib.get_folder(folder.id)
444 self.assertEqual(folder.id, retrieved.id)
445
446 def _check_datasets(self, dss):
447 self.assertEqual(len(dss), len(self.lib.dataset_ids))
448 self.assertEqual(set(_.id for _ in dss), set(self.lib.dataset_ids))
449 for ds in dss:
450 self.assertIs(ds.container, self.lib)
451
452 def test_dataset(self):
453 folder = self.lib.create_folder('test_%s' % uuid.uuid4().hex)
454 ds = self.lib.upload_data(FOO_DATA, folder=folder)
455 self.assertEqual(len(self.lib.content_infos), 3)
456 self.assertEqual(len(self.lib.folder_ids), 2)
457 self._check_datasets([ds])
458
459 def test_dataset_from_url(self):
460 if is_reachable(self.DS_URL):
461 ds = self.lib.upload_from_url(self.DS_URL)
462 self._check_datasets([ds])
463 else:
464 self.skipTest('%s not reachable' % self.DS_URL)
465
466 def test_dataset_from_local(self):
467 with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f:
468 f.write(FOO_DATA)
469 f.flush()
470 ds = self.lib.upload_from_local(f.name)
471 self._check_datasets([ds])
472
473 def test_datasets_from_fs(self):
474 bnames = ['f%d.txt' % i for i in range(2)]
475 dss, fnames = upload_from_fs(self.lib, bnames)
476 self._check_datasets(dss)
477 dss, fnames = upload_from_fs(
478 self.lib, bnames, link_data_only='link_to_files')
479 for ds, fn in zip(dss, fnames):
480 self.assertEqual(ds.file_name, fn)
481
482 def test_copy_from_dataset(self):
483 hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
484 try:
485 hda = hist.paste_content(FOO_DATA)
486 ds = self.lib.copy_from_dataset(hda)
487 finally:
488 hist.delete(purge=True)
489 self._check_datasets([ds])
490
491 def test_get_dataset(self):
492 ds = self.lib.upload_data(FOO_DATA)
493 retrieved = self.lib.get_dataset(ds.id)
494 self.assertEqual(ds.id, retrieved.id)
495
496 def test_get_datasets(self):
497 bnames = ['f%d.txt' % _ for _ in range(2)]
498 dss, _ = upload_from_fs(self.lib, bnames)
499 retrieved = self.lib.get_datasets()
500 self.assertEqual(len(dss), len(retrieved))
501 self.assertEqual(set(_.id for _ in dss), set(_.id for _ in retrieved))
502 name = '/%s' % bnames[0]
503 selected = self.lib.get_datasets(name=name)
504 self.assertEqual(len(selected), 1)
505 self.assertEqual(selected[0].name, bnames[0])
506
507
508 class TestLDContents(GalaxyObjectsTestBase):
509
510 def setUp(self):
511 super().setUp()
512 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
513 self.ds = self.lib.upload_data(FOO_DATA)
514 self.ds.wait()
515
516 def tearDown(self):
517 self.lib.delete()
518
519 def test_dataset_get_stream(self):
520 for idx, c in enumerate(self.ds.get_stream(chunk_size=1)):
521 self.assertEqual(FOO_DATA[idx].encode(), c)
522
523 def test_dataset_peek(self):
524 fetched_data = self.ds.peek(chunk_size=4)
525 self.assertEqual(FOO_DATA[0:4].encode(), fetched_data)
526
527 def test_dataset_download(self):
528 with tempfile.TemporaryFile() as f:
529 self.ds.download(f)
530 f.seek(0)
531 self.assertEqual(FOO_DATA.encode(), f.read())
532
533 def test_dataset_get_contents(self):
534 self.assertEqual(FOO_DATA.encode(), self.ds.get_contents())
535
536 def test_dataset_delete(self):
537 self.ds.delete()
538 # Cannot test this yet because the 'deleted' attribute is not exported
539 # by the API at the moment
540 # self.assertTrue(self.ds.deleted)
541
542 def test_dataset_update(self):
543 new_name = 'test_%s' % uuid.uuid4().hex
544 new_misc_info = 'Annotation for %s' % new_name
545 new_genome_build = 'hg19'
546 updated_ldda = self.ds.update(name=new_name, misc_info=new_misc_info, genome_build=new_genome_build)
547 self.assertEqual(self.ds.id, updated_ldda.id)
548 self.assertEqual(self.ds.name, new_name)
549 self.assertEqual(self.ds.misc_info, new_misc_info)
550 self.assertEqual(self.ds.genome_build, new_genome_build)
551
552
553 class TestHistory(GalaxyObjectsTestBase):
554
555 def setUp(self):
556 super().setUp()
557 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
558
559 def tearDown(self):
560 self.hist.delete(purge=True)
561
562 def test_create_delete(self):
563 name = 'test_%s' % uuid.uuid4().hex
564 hist = self.gi.histories.create(name)
565 self.assertEqual(hist.name, name)
566 hist_id = hist.id
567 self.assertIn(hist_id, [_.id for _ in self.gi.histories.list()])
568 hist.delete(purge=True)
569 self.assertFalse(hist.is_mapped)
570 h = self.gi.histories.get(hist_id)
571 self.assertTrue(h.deleted)
572
573 def _check_dataset(self, hda):
574 self.assertIsInstance(hda, wrappers.HistoryDatasetAssociation)
575 self.assertIs(hda.container, self.hist)
576 self.assertEqual(len(self.hist.dataset_ids), 1)
577 self.assertEqual(self.hist.dataset_ids[0], hda.id)
578
579 def test_import_dataset(self):
580 lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
581 lds = lib.upload_data(FOO_DATA)
582 self.assertEqual(len(self.hist.dataset_ids), 0)
583 hda = self.hist.import_dataset(lds)
584 lib.delete()
585 self._check_dataset(hda)
586
587 def test_upload_file(self):
588 with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f:
589 f.write(FOO_DATA)
590 f.flush()
591 hda = self.hist.upload_file(f.name)
592 self._check_dataset(hda)
593
594 def test_paste_content(self):
595 hda = self.hist.paste_content(FOO_DATA)
596 self._check_dataset(hda)
597
598 def test_get_dataset(self):
599 hda = self.hist.paste_content(FOO_DATA)
600 retrieved = self.hist.get_dataset(hda.id)
601 self.assertEqual(hda.id, retrieved.id)
602
603 def test_get_datasets(self):
604 bnames = ['f%d.txt' % _ for _ in range(2)]
605 lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
606 lds = upload_from_fs(lib, bnames)[0]
607 hdas = [self.hist.import_dataset(_) for _ in lds]
608 lib.delete()
609 retrieved = self.hist.get_datasets()
610 self.assertEqual(len(hdas), len(retrieved))
611 self.assertEqual(set(_.id for _ in hdas), set(_.id for _ in retrieved))
612 selected = self.hist.get_datasets(name=bnames[0])
613 self.assertEqual(len(selected), 1)
614 self.assertEqual(selected[0].name, bnames[0])
615
616 def test_export_and_download(self):
617 jeha_id = self.hist.export(wait=True, maxwait=60)
618 self.assertTrue(jeha_id)
619 tempdir = tempfile.mkdtemp(prefix='bioblend_test_')
620 temp_fn = os.path.join(tempdir, 'export.tar.gz')
621 try:
622 with open(temp_fn, 'wb') as fo:
623 self.hist.download(jeha_id, fo)
624 self.assertTrue(tarfile.is_tarfile(temp_fn))
625 finally:
626 shutil.rmtree(tempdir)
627
628 def test_update(self):
629 new_name = 'test_%s' % uuid.uuid4().hex
630 new_annotation = 'Annotation for %s' % new_name
631 new_tags = ['tag1', 'tag2']
632 updated_hist = self.hist.update(name=new_name, annotation=new_annotation, tags=new_tags)
633 self.assertEqual(self.hist.id, updated_hist.id)
634 self.assertEqual(self.hist.name, new_name)
635 self.assertEqual(self.hist.annotation, new_annotation)
636 self.assertEqual(self.hist.tags, new_tags)
637 updated_hist = self.hist.update(published=True)
638 self.assertEqual(self.hist.id, updated_hist.id)
639 self.assertTrue(self.hist.published)
640
641 def test_create_dataset_collection(self):
642 self._create_collection_description()
643 hdca = self.hist.create_dataset_collection(self.collection_description)
644 self.assertIsInstance(hdca, wrappers.HistoryDatasetCollectionAssociation)
645 self.assertEqual(hdca.collection_type, 'list')
646 self.assertIs(hdca.container, self.hist)
647 self.assertEqual(len(hdca.elements), 2)
648 self.assertEqual(self.dataset1.id, hdca.elements[0]['object']['id'])
649 self.assertEqual(self.dataset2.id, hdca.elements[1]['object']['id'])
650
651 def test_delete_dataset_collection(self):
652 self._create_collection_description()
653 hdca = self.hist.create_dataset_collection(self.collection_description)
654 hdca.delete()
655 self.assertTrue(hdca.deleted)
656
657 def _create_collection_description(self):
658 self.dataset1 = self.hist.paste_content(FOO_DATA)
659 self.dataset2 = self.hist.paste_content(FOO_DATA_2)
660 self.collection_description = dataset_collections.CollectionDescription(
661 name="MyDatasetList",
662 elements=[
663 dataset_collections.HistoryDatasetElement(name="sample1", id=self.dataset1.id),
664 dataset_collections.HistoryDatasetElement(name="sample2", id=self.dataset2.id),
665 ]
666 )
667
668
669 class TestHDAContents(GalaxyObjectsTestBase):
670
671 def setUp(self):
672 super().setUp()
673 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
674 self.ds = self.hist.paste_content(FOO_DATA)
675 self.ds.wait()
676
677 def tearDown(self):
678 self.hist.delete(purge=True)
679
680 def test_dataset_get_stream(self):
681 for idx, c in enumerate(self.ds.get_stream(chunk_size=1)):
682 self.assertEqual(FOO_DATA[idx].encode(), c)
683
684 def test_dataset_peek(self):
685 fetched_data = self.ds.peek(chunk_size=4)
686 self.assertEqual(FOO_DATA[0:4].encode(), fetched_data)
687
688 def test_dataset_download(self):
689 with tempfile.TemporaryFile() as f:
690 self.ds.download(f)
691 f.seek(0)
692 self.assertEqual(FOO_DATA.encode(), f.read())
693
694 def test_dataset_get_contents(self):
695 self.assertEqual(FOO_DATA.encode(), self.ds.get_contents())
696
697 def test_dataset_update(self):
698 new_name = 'test_%s' % uuid.uuid4().hex
699 new_annotation = 'Annotation for %s' % new_name
700 new_genome_build = 'hg19'
701 updated_hda = self.ds.update(name=new_name, annotation=new_annotation, genome_build=new_genome_build)
702 self.assertEqual(self.ds.id, updated_hda.id)
703 self.assertEqual(self.ds.name, new_name)
704 self.assertEqual(self.ds.annotation, new_annotation)
705 self.assertEqual(self.ds.genome_build, new_genome_build)
706
707 def test_dataset_delete(self):
708 self.ds.delete()
709 self.assertTrue(self.ds.deleted)
710 self.assertFalse(self.ds.purged)
711
712 def test_dataset_purge(self):
713 self.ds.delete(purge=True)
714 self.assertTrue(self.ds.deleted)
715 self.assertTrue(self.ds.purged)
716
717
718 class TestRunWorkflow(GalaxyObjectsTestBase):
719
720 def setUp(self):
721 super().setUp()
722 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
723 with open(SAMPLE_FN) as f:
724 self.wf = self.gi.workflows.import_new(f.read())
725 self.contents = ['one\ntwo\n', '1\n2\n']
726 self.inputs = [self.lib.upload_data(_) for _ in self.contents]
727
728 def tearDown(self):
729 self.wf.delete()
730 self.lib.delete()
731
732 def _test(self, existing_hist=False, params=False):
733 hist_name = 'test_%s' % uuid.uuid4().hex
734 if existing_hist:
735 hist = self.gi.histories.create(hist_name)
736 else:
737 hist = hist_name
738 if params:
739 params = {'Paste1': {'delimiter': 'U'}}
740 sep = '_' # 'U' maps to '_' in the paste tool
741 else:
742 params = None
743 sep = '\t' # default
744 input_map = {'Input 1': self.inputs[0], 'Input 2': self.inputs[1]}
745 sys.stderr.write(os.linesep)
746 outputs, out_hist = self.wf.run(
747 input_map, hist, params=params, wait=True, polling_interval=1)
748 self.assertEqual(len(outputs), 1)
749 out_ds = outputs[0]
750 self.assertIn(out_ds.id, out_hist.dataset_ids)
751 res = out_ds.get_contents()
752 exp_rows = zip(*(_.splitlines() for _ in self.contents))
753 exp_res = ("\n".join(sep.join(t) for t in exp_rows) + "\n").encode()
754 self.assertEqual(res, exp_res)
755 if existing_hist:
756 self.assertEqual(out_hist.id, hist.id)
757 out_hist.delete(purge=True)
758
759 def test_existing_history(self):
760 self._test(existing_hist=True)
761
762 def test_new_history(self):
763 self._test(existing_hist=False)
764
765 def test_params(self):
766 self._test(params=True)
767
768
769 class TestRunDatasetCollectionWorkflow(GalaxyObjectsTestBase):
770
771 def setUp(self):
772 super().setUp()
773 with open(SAMPLE_WF_COLL_FN) as f:
774 self.wf = self.gi.workflows.import_new(f.read())
775 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
776
777 def tearDown(self):
778 self.wf.delete()
779 self.hist.delete(purge=True)
780
781 def test_run_workflow_with_dataset_collection(self):
782 dataset1 = self.hist.paste_content(FOO_DATA)
783 dataset2 = self.hist.paste_content(FOO_DATA_2)
784 collection_description = dataset_collections.CollectionDescription(
785 name="MyDatasetList",
786 elements=[
787 dataset_collections.HistoryDatasetElement(name="sample1", id=dataset1.id),
788 dataset_collections.HistoryDatasetElement(name="sample2", id=dataset2.id),
789 ]
790 )
791 dataset_collection = self.hist.create_dataset_collection(collection_description)
792 input_map = {"Input Dataset Collection": dataset_collection,
793 "Input 2": dataset1}
794 outputs, out_hist = self.wf.run(input_map, self.hist, wait=True)
795 self.assertEqual(len(outputs), 1)
796 out_hdca = outputs[0]
797 self.assertIsInstance(out_hdca, wrappers.HistoryDatasetCollectionAssociation)
798 self.assertEqual(out_hdca.collection_type, 'list')
799 self.assertEqual(len(out_hdca.elements), 2)
800 self.assertEqual(out_hist.id, self.hist.id)
801
802
803 class TestJob(GalaxyObjectsTestBase):
804
805 def test_get(self):
806 job_prevs = self.gi.jobs.get_previews()
807 if len(job_prevs) > 0:
808 job_prev = job_prevs[0]
809 self.assertIsInstance(job_prev, wrappers.JobPreview)
810 job = self.gi.jobs.get(job_prev.id)
811 self.assertIsInstance(job, wrappers.Job)
812 self.assertEqual(job.id, job_prev.id)
813 for job in self.gi.jobs.list():
814 self.assertIsInstance(job, wrappers.Job)
815
816
817 def suite():
818 loader = unittest.TestLoader()
819 s = unittest.TestSuite()
820 s.addTests([loader.loadTestsFromTestCase(c) for c in (
821 TestWrapper,
822 TestWorkflow,
823 TestGalaxyInstance,
824 TestLibrary,
825 TestLDContents,
826 TestHistory,
827 TestHDAContents,
828 TestRunWorkflow,
829 )])
830 return s
831
832
833 if __name__ == '__main__':
834 tests = suite()
835 RUNNER = unittest.TextTestRunner(verbosity=2)
836 RUNNER.run(tests)