comparison env/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyObjects.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 # pylint: disable=C0103,E1101
2 import json
3 import os
4 import shutil
5 import socket
6 import sys
7 import tarfile
8 import tempfile
9 import uuid
10 from ssl import SSLError
11
12 import six
13 from six.moves.urllib.error import URLError
14 from six.moves.urllib.request import urlopen
15
16 import bioblend
17 import bioblend.galaxy.objects.galaxy_instance as galaxy_instance
18 import bioblend.galaxy.objects.wrappers as wrappers
19 from bioblend import ConnectionError
20 from bioblend.galaxy import dataset_collections
21 from . import test_util
22 from .test_util import unittest
23
24 bioblend.set_stream_logger('test', level='INFO')
25 socket.setdefaulttimeout(10.0)
26 SAMPLE_FN = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
27 SAMPLE_WF_COLL_FN = test_util.get_abspath(os.path.join('data', 'paste_columns_collections.ga'))
28 FOO_DATA = 'foo\nbar\n'
29 FOO_DATA_2 = 'foo2\nbar2\n'
30 SAMPLE_WF_DICT = {
31 'deleted': False,
32 'id': '9005c5112febe774',
33 'inputs': {
34 '571': {'label': 'Input Dataset', 'value': ''},
35 '572': {'label': 'Input Dataset', 'value': ''},
36 },
37 'model_class': 'StoredWorkflow',
38 'name': 'paste_columns',
39 'published': False,
40 'steps': {
41 '571': {
42 'id': 571,
43 'input_steps': {},
44 'tool_id': None,
45 'tool_inputs': {'name': 'Input Dataset'},
46 'tool_version': None,
47 'type': 'data_input',
48 },
49 '572': {
50 'id': 572,
51 'input_steps': {},
52 'tool_id': None,
53 'tool_inputs': {'name': 'Input Dataset'},
54 'tool_version': None,
55 'type': 'data_input',
56 },
57 '573': {
58 'id': 573,
59 'input_steps': {
60 'input1': {'source_step': 571, 'step_output': 'output'},
61 'input2': {'source_step': 572, 'step_output': 'output'},
62 },
63 'tool_id': 'Paste1',
64 'tool_inputs': {
65 'delimiter': '"T"',
66 'input1': 'null',
67 'input2': 'null',
68 },
69 'tool_version': '1.0.0',
70 'type': 'tool',
71 }
72 },
73 'tags': [],
74 'url': '/api/workflows/9005c5112febe774',
75 }
76
77
78 def is_reachable(url):
79 res = None
80 try:
81 res = urlopen(url, timeout=5)
82 except (SSLError, URLError, socket.timeout):
83 return False
84 if res is not None:
85 res.close()
86 return True
87
88
89 def upload_from_fs(lib, bnames, **kwargs):
90 tempdir = tempfile.mkdtemp(prefix='bioblend_test_')
91 try:
92 fnames = [os.path.join(tempdir, _) for _ in bnames]
93 for fn in fnames:
94 with open(fn, 'w') as f:
95 f.write(FOO_DATA)
96 dss = lib.upload_from_galaxy_fs(fnames, **kwargs)
97 finally:
98 shutil.rmtree(tempdir)
99 return dss, fnames
100
101
102 class MockWrapper(wrappers.Wrapper):
103 BASE_ATTRS = frozenset(['a', 'b'])
104
105 def __init__(self, *args, **kwargs):
106 super(MockWrapper, self).__init__(*args, **kwargs)
107
108 @property
109 def gi_module(self):
110 return super(MockWrapper, self).gi_module()
111
112
113 class TestWrapper(unittest.TestCase):
114
115 def setUp(self):
116 self.d = {'a': 1, 'b': [2, 3], 'c': {'x': 4}}
117 self.assertRaises(TypeError, wrappers.Wrapper, self.d)
118 self.w = MockWrapper(self.d)
119
120 def test_initialize(self):
121 for k in MockWrapper.BASE_ATTRS:
122 self.assertEqual(getattr(self.w, k), self.d[k])
123 self.w.a = 222
124 self.w.b[0] = 222
125 self.assertEqual(self.w.a, 222)
126 self.assertEqual(self.w.b[0], 222)
127 self.assertEqual(self.d['a'], 1)
128 self.assertEqual(self.d['b'][0], 2)
129 self.assertRaises(AttributeError, getattr, self.w, 'foo')
130 self.assertRaises(AttributeError, setattr, self.w, 'foo', 0)
131
132 def test_taint(self):
133 self.assertFalse(self.w.is_modified)
134 self.w.a = 111 # pylint: disable=W0201
135 self.assertTrue(self.w.is_modified)
136
137 def test_serialize(self):
138 w = MockWrapper.from_json(self.w.to_json())
139 self.assertEqual(w.wrapped, self.w.wrapped)
140
141 def test_clone(self):
142 w = self.w.clone()
143 self.assertEqual(w.wrapped, self.w.wrapped)
144 w.b[0] = 111
145 self.assertEqual(self.w.b[0], 2)
146
147 def test_kwargs(self):
148 parent = MockWrapper({'a': 10})
149 w = MockWrapper(self.d, parent=parent)
150 self.assertIs(w.parent, parent)
151 self.assertRaises(AttributeError, setattr, w, 'parent', 0)
152
153
154 class TestWorkflow(unittest.TestCase):
155
156 def setUp(self):
157 self.wf = wrappers.Workflow(SAMPLE_WF_DICT)
158
159 def test_initialize(self):
160 self.assertEqual(self.wf.id, '9005c5112febe774')
161 self.assertEqual(self.wf.name, 'paste_columns')
162 self.assertEqual(self.wf.deleted, False)
163 self.assertEqual(self.wf.published, False)
164 self.assertEqual(self.wf.tags, [])
165 self.assertEqual(
166 self.wf.input_labels_to_ids, {'Input Dataset': set(['571', '572'])})
167 self.assertEqual(self.wf.tool_labels_to_ids, {'Paste1': set(['573'])})
168 self.assertEqual(self.wf.data_input_ids, set(['571', '572']))
169 self.assertEqual(self.wf.source_ids, set(['571', '572']))
170 self.assertEqual(self.wf.sink_ids, set(['573']))
171
172 def test_dag(self):
173 inv_dag = {}
174 for h, tails in six.iteritems(self.wf.dag):
175 for t in tails:
176 inv_dag.setdefault(str(t), set()).add(h)
177 self.assertEqual(self.wf.inv_dag, inv_dag)
178 heads = set(self.wf.dag)
179 self.assertEqual(heads, set.union(*self.wf.inv_dag.values()))
180 tails = set(self.wf.inv_dag)
181 self.assertEqual(tails, set.union(*self.wf.dag.values()))
182 ids = self.wf.sorted_step_ids()
183 self.assertEqual(set(ids), heads | tails)
184 for h, tails in six.iteritems(self.wf.dag):
185 for t in tails:
186 self.assertLess(ids.index(h), ids.index(t))
187
188 def test_steps(self):
189 steps = SAMPLE_WF_DICT['steps']
190 for sid, s in six.iteritems(self.wf.steps):
191 self.assertIsInstance(s, wrappers.Step)
192 self.assertEqual(s.id, sid)
193 self.assertIn(sid, steps)
194 self.assertIs(s.parent, self.wf)
195 self.assertEqual(self.wf.data_input_ids, set(['571', '572']))
196 self.assertEqual(self.wf.tool_ids, set(['573']))
197
198 def test_taint(self):
199 self.assertFalse(self.wf.is_modified)
200 self.wf.steps['571'].tool_id = 'foo'
201 self.assertTrue(self.wf.is_modified)
202
203 def test_input_map(self):
204 class DummyLD(object):
205 SRC = 'ld'
206
207 def __init__(self, id_):
208 self.id = id_
209
210 label = 'Input Dataset'
211 self.assertEqual(self.wf.input_labels, set([label]))
212 input_map = self.wf.convert_input_map(
213 {label: [DummyLD('a'), DummyLD('b')]})
214 # {'571': {'id': 'a', 'src': 'ld'}, '572': {'id': 'b', 'src': 'ld'}}
215 # OR
216 # {'571': {'id': 'b', 'src': 'ld'}, '572': {'id': 'a', 'src': 'ld'}}
217 self.assertEqual(set(input_map), set(['571', '572']))
218 for d in six.itervalues(input_map):
219 self.assertEqual(set(d), set(['id', 'src']))
220 self.assertEqual(d['src'], 'ld')
221 self.assertIn(d['id'], 'ab')
222
223
224 @test_util.skip_unless_galaxy()
225 class GalaxyObjectsTestBase(unittest.TestCase):
226
227 def setUp(self):
228 galaxy_key = os.environ['BIOBLEND_GALAXY_API_KEY']
229 galaxy_url = os.environ['BIOBLEND_GALAXY_URL']
230 self.gi = galaxy_instance.GalaxyInstance(galaxy_url, galaxy_key)
231
232
233 class TestGalaxyInstance(GalaxyObjectsTestBase):
234
235 def test_library(self):
236 name = 'test_%s' % uuid.uuid4().hex
237 description, synopsis = 'D', 'S'
238 lib = self.gi.libraries.create(
239 name, description=description, synopsis=synopsis)
240 self.assertEqual(lib.name, name)
241 self.assertEqual(lib.description, description)
242 self.assertEqual(lib.synopsis, synopsis)
243 self.assertEqual(len(lib.content_infos), 1) # root folder
244 self.assertEqual(len(lib.folder_ids), 1)
245 self.assertEqual(len(lib.dataset_ids), 0)
246 self.assertIn(lib.id, [_.id for _ in self.gi.libraries.list()])
247 lib.delete()
248 self.assertFalse(lib.is_mapped)
249
250 def test_workflow_from_str(self):
251 with open(SAMPLE_FN) as f:
252 wf = self.gi.workflows.import_new(f.read())
253 self._check_and_del_workflow(wf)
254
255 def test_workflow_collections_from_str(self):
256 with open(SAMPLE_WF_COLL_FN) as f:
257 wf = self.gi.workflows.import_new(f.read())
258 self._check_and_del_workflow(wf)
259
260 def test_workflow_from_dict(self):
261 with open(SAMPLE_FN) as f:
262 wf = self.gi.workflows.import_new(json.load(f))
263 self._check_and_del_workflow(wf)
264
265 def test_workflow_publish_from_dict(self):
266 with open(SAMPLE_FN) as f:
267 wf = self.gi.workflows.import_new(json.load(f), publish=True)
268 self._check_and_del_workflow(wf, check_is_public=True)
269
270 def test_workflow_missing_tools(self):
271 with open(SAMPLE_FN) as f:
272 wf_dump = json.load(f)
273 wf_info = self.gi.gi.workflows.import_workflow_dict(wf_dump)
274 wf_dict = self.gi.gi.workflows.show_workflow(wf_info['id'])
275 for id_, step in six.iteritems(wf_dict['steps']):
276 if step['type'] == 'tool':
277 for k in 'tool_inputs', 'tool_version':
278 wf_dict['steps'][id_][k] = None
279 wf = wrappers.Workflow(wf_dict, gi=self.gi)
280 self.assertFalse(wf.is_runnable)
281 self.assertRaises(RuntimeError, wf.run)
282 wf.delete()
283
284 def test_workflow_export(self):
285 with open(SAMPLE_FN) as f:
286 wf1 = self.gi.workflows.import_new(f.read())
287 wf2 = self.gi.workflows.import_new(wf1.export())
288 self.assertNotEqual(wf1.id, wf2.id)
289 for wf in wf1, wf2:
290 self._check_and_del_workflow(wf)
291
292 def _check_and_del_workflow(self, wf, check_is_public=False):
293 # Galaxy appends additional text to imported workflow names
294 self.assertTrue(wf.name.startswith('paste_columns'))
295 self.assertEqual(len(wf.steps), 3)
296 for step_id, step in six.iteritems(wf.steps):
297 self.assertIsInstance(step, wrappers.Step)
298 self.assertEqual(step_id, step.id)
299 self.assertIsInstance(step.tool_inputs, dict)
300 if step.type == 'tool':
301 self.assertIsNotNone(step.tool_id)
302 self.assertIsNotNone(step.tool_version)
303 self.assertIsInstance(step.input_steps, dict)
304 elif step.type in ('data_collection_input', 'data_input'):
305 self.assertIsNone(step.tool_id)
306 self.assertIsNone(step.tool_version)
307 self.assertEqual(step.input_steps, {})
308 wf_ids = set(_.id for _ in self.gi.workflows.list())
309 self.assertIn(wf.id, wf_ids)
310 if check_is_public:
311 self.assertTrue(wf.published)
312 wf.delete()
313
314 # not very accurate:
315 # * we can't publish a wf from the API
316 # * we can't directly get another user's wf
317 def test_workflow_from_shared(self):
318 all_prevs = dict(
319 (_.id, _) for _ in self.gi.workflows.get_previews(published=True)
320 )
321 pub_only_ids = set(all_prevs).difference(
322 _.id for _ in self.gi.workflows.get_previews())
323 if pub_only_ids:
324 wf_id = pub_only_ids.pop()
325 imported = self.gi.workflows.import_shared(wf_id)
326 self.assertIsInstance(imported, wrappers.Workflow)
327 imported.delete()
328 else:
329 self.skipTest('no published workflows, manually publish a workflow to run this test')
330
331 def test_get_libraries(self):
332 self._test_multi_get('library')
333
334 def test_get_histories(self):
335 self._test_multi_get('history')
336
337 def test_get_workflows(self):
338 self._test_multi_get('workflow')
339
340 def _normalized_functions(self, obj_type):
341 if obj_type == 'library':
342 create = self.gi.libraries.create
343 get_objs = self.gi.libraries.list
344 get_prevs = self.gi.libraries.get_previews
345 del_kwargs = {}
346 elif obj_type == 'history':
347 create = self.gi.histories.create
348 get_objs = self.gi.histories.list
349 get_prevs = self.gi.histories.get_previews
350 del_kwargs = {'purge': True}
351 elif obj_type == 'workflow':
352 def create(name):
353 with open(SAMPLE_FN) as f:
354 d = json.load(f)
355 d['name'] = name
356 return self.gi.workflows.import_new(d)
357
358 get_objs = self.gi.workflows.list
359 get_prevs = self.gi.workflows.get_previews
360 del_kwargs = {}
361 return create, get_objs, get_prevs, del_kwargs
362
363 def _test_multi_get(self, obj_type):
364 create, get_objs, get_prevs, del_kwargs = self._normalized_functions(
365 obj_type)
366
367 def ids(seq):
368 return set(_.id for _ in seq)
369
370 names = ['test_%s' % uuid.uuid4().hex for _ in range(2)]
371 objs = []
372 try:
373 objs = [create(_) for _ in names]
374 self.assertLessEqual(ids(objs), ids(get_objs()))
375 if obj_type != 'workflow':
376 filtered = get_objs(name=names[0])
377 self.assertEqual(len(filtered), 1)
378 self.assertEqual(filtered[0].id, objs[0].id)
379 del_id = objs[-1].id
380 objs.pop().delete(**del_kwargs)
381 self.assertIn(del_id, ids(get_prevs(deleted=True)))
382 else:
383 # Galaxy appends info strings to imported workflow names
384 prev = get_prevs()[0]
385 filtered = get_objs(name=prev.name)
386 self.assertEqual(len(filtered), 1)
387 self.assertEqual(filtered[0].id, prev.id)
388 finally:
389 for o in objs:
390 o.delete(**del_kwargs)
391
392 def test_delete_libraries_by_name(self):
393 self._test_delete_by_name('library')
394
395 def test_delete_histories_by_name(self):
396 self._test_delete_by_name('history')
397
398 def test_delete_workflows_by_name(self):
399 self._test_delete_by_name('workflow')
400
401 def _test_delete_by_name(self, obj_type):
402 create, _, get_prevs, del_kwargs = self._normalized_functions(
403 obj_type)
404 name = 'test_%s' % uuid.uuid4().hex
405 objs = [create(name) for _ in range(2)] # noqa: F812
406 final_name = objs[0].name
407 prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted]
408 self.assertEqual(len(prevs), len(objs))
409 del_kwargs['name'] = final_name
410 objs[0].gi_module.delete(**del_kwargs)
411 prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted]
412 self.assertEqual(len(prevs), 0)
413
414
415 class TestLibrary(GalaxyObjectsTestBase):
416 # just something that can be expected to be always up
417 DS_URL = 'https://tools.ietf.org/rfc/rfc1866.txt'
418
419 def setUp(self):
420 super(TestLibrary, self).setUp()
421 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
422
423 def tearDown(self):
424 self.lib.delete()
425
426 def test_root_folder(self):
427 r = self.lib.root_folder
428 self.assertIsNone(r.parent)
429
430 def test_folder(self):
431 name, desc = 'test_%s' % uuid.uuid4().hex, 'D'
432 folder = self.lib.create_folder(name, description=desc)
433 self.assertEqual(folder.name, name)
434 self.assertEqual(folder.description, desc)
435 self.assertIs(folder.container, self.lib)
436 self.assertEqual(folder.parent.id, self.lib.root_folder.id)
437 self.assertEqual(len(self.lib.content_infos), 2)
438 self.assertEqual(len(self.lib.folder_ids), 2)
439 self.assertIn(folder.id, self.lib.folder_ids)
440 retrieved = self.lib.get_folder(folder.id)
441 self.assertEqual(folder.id, retrieved.id)
442
443 def _check_datasets(self, dss):
444 self.assertEqual(len(dss), len(self.lib.dataset_ids))
445 self.assertEqual(set(_.id for _ in dss), set(self.lib.dataset_ids))
446 for ds in dss:
447 self.assertIs(ds.container, self.lib)
448
449 def test_dataset(self):
450 folder = self.lib.create_folder('test_%s' % uuid.uuid4().hex)
451 ds = self.lib.upload_data(FOO_DATA, folder=folder)
452 self.assertEqual(len(self.lib.content_infos), 3)
453 self.assertEqual(len(self.lib.folder_ids), 2)
454 self._check_datasets([ds])
455
456 def test_dataset_from_url(self):
457 if is_reachable(self.DS_URL):
458 ds = self.lib.upload_from_url(self.DS_URL)
459 self._check_datasets([ds])
460 else:
461 self.skipTest('%s not reachable' % self.DS_URL)
462
463 def test_dataset_from_local(self):
464 with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f:
465 f.write(FOO_DATA)
466 f.flush()
467 ds = self.lib.upload_from_local(f.name)
468 self._check_datasets([ds])
469
470 def test_datasets_from_fs(self):
471 bnames = ['f%d.txt' % i for i in range(2)]
472 dss, fnames = upload_from_fs(self.lib, bnames)
473 self._check_datasets(dss)
474 dss, fnames = upload_from_fs(
475 self.lib, bnames, link_data_only='link_to_files')
476 for ds, fn in zip(dss, fnames):
477 self.assertEqual(ds.file_name, fn)
478
479 def test_copy_from_dataset(self):
480 hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
481 try:
482 hda = hist.paste_content(FOO_DATA)
483 ds = self.lib.copy_from_dataset(hda)
484 finally:
485 hist.delete(purge=True)
486 self._check_datasets([ds])
487
488 def test_get_dataset(self):
489 ds = self.lib.upload_data(FOO_DATA)
490 retrieved = self.lib.get_dataset(ds.id)
491 self.assertEqual(ds.id, retrieved.id)
492
493 def test_get_datasets(self):
494 bnames = ['f%d.txt' % _ for _ in range(2)]
495 dss, _ = upload_from_fs(self.lib, bnames)
496 retrieved = self.lib.get_datasets()
497 self.assertEqual(len(dss), len(retrieved))
498 self.assertEqual(set(_.id for _ in dss), set(_.id for _ in retrieved))
499 name = '/%s' % bnames[0]
500 selected = self.lib.get_datasets(name=name)
501 self.assertEqual(len(selected), 1)
502 self.assertEqual(selected[0].name, bnames[0])
503
504
505 class TestLDContents(GalaxyObjectsTestBase):
506
507 def setUp(self):
508 super(TestLDContents, self).setUp()
509 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
510 self.ds = self.lib.upload_data(FOO_DATA)
511 self.ds.wait()
512
513 def tearDown(self):
514 self.lib.delete()
515
516 def test_dataset_get_stream(self):
517 for idx, c in enumerate(self.ds.get_stream(chunk_size=1)):
518 self.assertEqual(six.b(FOO_DATA[idx]), c)
519
520 def test_dataset_peek(self):
521 fetched_data = self.ds.peek(chunk_size=4)
522 self.assertEqual(six.b(FOO_DATA[0:4]), fetched_data)
523
524 def test_dataset_download(self):
525 with tempfile.TemporaryFile() as f:
526 self.ds.download(f)
527 f.seek(0)
528 self.assertEqual(six.b(FOO_DATA), f.read())
529
530 def test_dataset_get_contents(self):
531 self.assertEqual(six.b(FOO_DATA), self.ds.get_contents())
532
533 def test_dataset_delete(self):
534 self.ds.delete()
535 # Cannot test this yet because the 'deleted' attribute is not exported
536 # by the API at the moment
537 # self.assertTrue(self.ds.deleted)
538
539 @test_util.skip_unless_galaxy('release_17.09')
540 def test_dataset_update(self):
541 new_name = 'test_%s' % uuid.uuid4().hex
542 new_misc_info = 'Annotation for %s' % new_name
543 new_genome_build = 'hg19'
544 updated_ldda = self.ds.update(name=new_name, misc_info=new_misc_info, genome_build=new_genome_build)
545 self.assertEqual(self.ds.id, updated_ldda.id)
546 self.assertEqual(self.ds.name, new_name)
547 self.assertEqual(self.ds.misc_info, new_misc_info)
548 self.assertEqual(self.ds.genome_build, new_genome_build)
549
550
551 class TestHistory(GalaxyObjectsTestBase):
552
553 def setUp(self):
554 super(TestHistory, self).setUp()
555 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
556
557 def tearDown(self):
558 self.hist.delete(purge=True)
559
560 def test_create_delete(self):
561 name = 'test_%s' % uuid.uuid4().hex
562 hist = self.gi.histories.create(name)
563 self.assertEqual(hist.name, name)
564 hist_id = hist.id
565 self.assertIn(hist_id, [_.id for _ in self.gi.histories.list()])
566 hist.delete(purge=True)
567 self.assertFalse(hist.is_mapped)
568 try:
569 h = self.gi.histories.get(hist_id)
570 self.assertTrue(h.deleted)
571 except ConnectionError:
572 # Galaxy up to release_2015.01.13 gives a ConnectionError
573 pass
574
575 def _check_dataset(self, hda):
576 self.assertIsInstance(hda, wrappers.HistoryDatasetAssociation)
577 self.assertIs(hda.container, self.hist)
578 self.assertEqual(len(self.hist.dataset_ids), 1)
579 self.assertEqual(self.hist.dataset_ids[0], hda.id)
580
581 def test_import_dataset(self):
582 lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
583 lds = lib.upload_data(FOO_DATA)
584 self.assertEqual(len(self.hist.dataset_ids), 0)
585 hda = self.hist.import_dataset(lds)
586 lib.delete()
587 self._check_dataset(hda)
588
589 def test_upload_file(self):
590 with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f:
591 f.write(FOO_DATA)
592 f.flush()
593 hda = self.hist.upload_file(f.name)
594 self._check_dataset(hda)
595
596 def test_paste_content(self):
597 hda = self.hist.paste_content(FOO_DATA)
598 self._check_dataset(hda)
599
600 def test_get_dataset(self):
601 hda = self.hist.paste_content(FOO_DATA)
602 retrieved = self.hist.get_dataset(hda.id)
603 self.assertEqual(hda.id, retrieved.id)
604
605 def test_get_datasets(self):
606 bnames = ['f%d.txt' % _ for _ in range(2)]
607 lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
608 lds = upload_from_fs(lib, bnames)[0]
609 hdas = [self.hist.import_dataset(_) for _ in lds]
610 lib.delete()
611 retrieved = self.hist.get_datasets()
612 self.assertEqual(len(hdas), len(retrieved))
613 self.assertEqual(set(_.id for _ in hdas), set(_.id for _ in retrieved))
614 selected = self.hist.get_datasets(name=bnames[0])
615 self.assertEqual(len(selected), 1)
616 self.assertEqual(selected[0].name, bnames[0])
617
618 def test_export_and_download(self):
619 jeha_id = self.hist.export(wait=True, maxwait=60)
620 self.assertTrue(jeha_id)
621 tempdir = tempfile.mkdtemp(prefix='bioblend_test_')
622 temp_fn = os.path.join(tempdir, 'export.tar.gz')
623 try:
624 with open(temp_fn, 'wb') as fo:
625 self.hist.download(jeha_id, fo)
626 self.assertTrue(tarfile.is_tarfile(temp_fn))
627 finally:
628 shutil.rmtree(tempdir)
629
630 def test_update(self):
631 new_name = 'test_%s' % uuid.uuid4().hex
632 new_annotation = 'Annotation for %s' % new_name
633 new_tags = ['tag1', 'tag2']
634 updated_hist = self.hist.update(name=new_name, annotation=new_annotation, tags=new_tags)
635 self.assertEqual(self.hist.id, updated_hist.id)
636 self.assertEqual(self.hist.name, new_name)
637 self.assertEqual(self.hist.annotation, new_annotation)
638 self.assertEqual(self.hist.tags, new_tags)
639 updated_hist = self.hist.update(published=True)
640 self.assertEqual(self.hist.id, updated_hist.id)
641 self.assertTrue(self.hist.published)
642
643 def test_create_dataset_collection(self):
644 self._create_collection_description()
645 hdca = self.hist.create_dataset_collection(self.collection_description)
646 self.assertIsInstance(hdca, wrappers.HistoryDatasetCollectionAssociation)
647 self.assertEqual(hdca.collection_type, 'list')
648 self.assertIs(hdca.container, self.hist)
649 self.assertEqual(len(hdca.elements), 2)
650 self.assertEqual(self.dataset1.id, hdca.elements[0]['object']['id'])
651 self.assertEqual(self.dataset2.id, hdca.elements[1]['object']['id'])
652
653 def test_delete_dataset_collection(self):
654 self._create_collection_description()
655 hdca = self.hist.create_dataset_collection(self.collection_description)
656 hdca.delete()
657 self.assertTrue(hdca.deleted)
658
659 def _create_collection_description(self):
660 self.dataset1 = self.hist.paste_content(FOO_DATA)
661 self.dataset2 = self.hist.paste_content(FOO_DATA_2)
662 self.collection_description = dataset_collections.CollectionDescription(
663 name="MyDatasetList",
664 elements=[
665 dataset_collections.HistoryDatasetElement(name="sample1", id=self.dataset1.id),
666 dataset_collections.HistoryDatasetElement(name="sample2", id=self.dataset2.id),
667 ]
668 )
669
670
671 class TestHDAContents(GalaxyObjectsTestBase):
672
673 def setUp(self):
674 super(TestHDAContents, self).setUp()
675 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
676 self.ds = self.hist.paste_content(FOO_DATA)
677 self.ds.wait()
678
679 def tearDown(self):
680 self.hist.delete(purge=True)
681
682 def test_dataset_get_stream(self):
683 for idx, c in enumerate(self.ds.get_stream(chunk_size=1)):
684 self.assertEqual(six.b(FOO_DATA[idx]), c)
685
686 def test_dataset_peek(self):
687 fetched_data = self.ds.peek(chunk_size=4)
688 self.assertEqual(six.b(FOO_DATA[0:4]), fetched_data)
689
690 def test_dataset_download(self):
691 with tempfile.TemporaryFile() as f:
692 self.ds.download(f)
693 f.seek(0)
694 self.assertEqual(six.b(FOO_DATA), f.read())
695
696 def test_dataset_get_contents(self):
697 self.assertEqual(six.b(FOO_DATA), self.ds.get_contents())
698
699 def test_dataset_update(self):
700 new_name = 'test_%s' % uuid.uuid4().hex
701 new_annotation = 'Annotation for %s' % new_name
702 new_genome_build = 'hg19'
703 updated_hda = self.ds.update(name=new_name, annotation=new_annotation, genome_build=new_genome_build)
704 self.assertEqual(self.ds.id, updated_hda.id)
705 self.assertEqual(self.ds.name, new_name)
706 self.assertEqual(self.ds.annotation, new_annotation)
707 self.assertEqual(self.ds.genome_build, new_genome_build)
708
709 def test_dataset_delete(self):
710 self.ds.delete()
711 self.assertTrue(self.ds.deleted)
712 self.assertFalse(self.ds.purged)
713
714 @test_util.skip_unless_galaxy("release_17.05")
715 def test_dataset_purge(self):
716 self.ds.delete(purge=True)
717 # Galaxy from release_15.03 to release_17.01 wrongly reports ds.deleted as False, see https://github.com/galaxyproject/galaxy/issues/3548
718 self.assertTrue(self.ds.deleted)
719 self.assertTrue(self.ds.purged)
720
721
722 class TestRunWorkflow(GalaxyObjectsTestBase):
723
724 def setUp(self):
725 super(TestRunWorkflow, self).setUp()
726 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
727 with open(SAMPLE_FN) as f:
728 self.wf = self.gi.workflows.import_new(f.read())
729 self.contents = ['one\ntwo\n', '1\n2\n']
730 self.inputs = [self.lib.upload_data(_) for _ in self.contents]
731
732 def tearDown(self):
733 self.wf.delete()
734 self.lib.delete()
735
736 def _test(self, existing_hist=False, params=False):
737 hist_name = 'test_%s' % uuid.uuid4().hex
738 if existing_hist:
739 hist = self.gi.histories.create(hist_name)
740 else:
741 hist = hist_name
742 if params:
743 params = {'Paste1': {'delimiter': 'U'}}
744 sep = '_' # 'U' maps to '_' in the paste tool
745 else:
746 params = None
747 sep = '\t' # default
748 input_map = {'Input 1': self.inputs[0], 'Input 2': self.inputs[1]}
749 sys.stderr.write(os.linesep)
750 outputs, out_hist = self.wf.run(
751 input_map, hist, params=params, wait=True, polling_interval=1)
752 self.assertEqual(len(outputs), 1)
753 out_ds = outputs[0]
754 self.assertIn(out_ds.id, out_hist.dataset_ids)
755 res = out_ds.get_contents()
756 exp_rows = zip(*(_.splitlines() for _ in self.contents))
757 exp_res = six.b("\n".join(sep.join(t) for t in exp_rows) + "\n")
758 self.assertEqual(res, exp_res)
759 if existing_hist:
760 self.assertEqual(out_hist.id, hist.id)
761 out_hist.delete(purge=True)
762
763 def test_existing_history(self):
764 self._test(existing_hist=True)
765
766 def test_new_history(self):
767 self._test(existing_hist=False)
768
769 def test_params(self):
770 self._test(params=True)
771
772
773 class TestRunDatasetCollectionWorkflow(GalaxyObjectsTestBase):
774
775 def setUp(self):
776 super(TestRunDatasetCollectionWorkflow, self).setUp()
777 with open(SAMPLE_WF_COLL_FN) as f:
778 self.wf = self.gi.workflows.import_new(f.read())
779 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
780
781 def tearDown(self):
782 self.wf.delete()
783 self.hist.delete(purge=True)
784
785 def test_run_workflow_with_dataset_collection(self):
786 dataset1 = self.hist.paste_content(FOO_DATA)
787 dataset2 = self.hist.paste_content(FOO_DATA_2)
788 collection_description = dataset_collections.CollectionDescription(
789 name="MyDatasetList",
790 elements=[
791 dataset_collections.HistoryDatasetElement(name="sample1", id=dataset1.id),
792 dataset_collections.HistoryDatasetElement(name="sample2", id=dataset2.id),
793 ]
794 )
795 dataset_collection = self.hist.create_dataset_collection(collection_description)
796 input_map = {"Input Dataset Collection": dataset_collection,
797 "Input 2": dataset1}
798 outputs, out_hist = self.wf.run(input_map, self.hist, wait=True)
799 self.assertEqual(len(outputs), 1)
800 out_hdca = outputs[0]
801 self.assertIsInstance(out_hdca, wrappers.HistoryDatasetCollectionAssociation)
802 self.assertEqual(out_hdca.collection_type, 'list')
803 self.assertEqual(len(out_hdca.elements), 2)
804 self.assertEqual(out_hist.id, self.hist.id)
805
806
807 class TestJob(GalaxyObjectsTestBase):
808
809 def setUp(self):
810 super(TestJob, self).setUp()
811
812 def test_get(self):
813 job_prevs = self.gi.jobs.get_previews()
814 if len(job_prevs) > 0:
815 job_prev = job_prevs[0]
816 self.assertIsInstance(job_prev, wrappers.JobPreview)
817 job = self.gi.jobs.get(job_prev.id)
818 self.assertIsInstance(job, wrappers.Job)
819 self.assertEqual(job.id, job_prev.id)
820 for job in self.gi.jobs.list():
821 self.assertIsInstance(job, wrappers.Job)
822
823
824 def suite():
825 loader = unittest.TestLoader()
826 s = unittest.TestSuite()
827 s.addTests([loader.loadTestsFromTestCase(c) for c in (
828 TestWrapper,
829 TestWorkflow,
830 TestGalaxyInstance,
831 TestLibrary,
832 TestLDContents,
833 TestHistory,
834 TestHDAContents,
835 TestRunWorkflow,
836 )])
837 return s
838
839
840 if __name__ == '__main__':
841 # By default, run all tests. To run specific tests, do the following:
842 # python -m unittest <module>.<class>.<test_method>
843 tests = suite()
844 RUNNER = unittest.TextTestRunner(verbosity=2)
845 RUNNER.run(tests)