Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyObjects.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 # pylint: disable=C0103,E1101 | |
2 import json | |
3 import os | |
4 import shutil | |
5 import socket | |
6 import sys | |
7 import tarfile | |
8 import tempfile | |
9 import uuid | |
10 from ssl import SSLError | |
11 | |
12 import six | |
13 from six.moves.urllib.error import URLError | |
14 from six.moves.urllib.request import urlopen | |
15 | |
16 import bioblend | |
17 import bioblend.galaxy.objects.galaxy_instance as galaxy_instance | |
18 import bioblend.galaxy.objects.wrappers as wrappers | |
19 from bioblend import ConnectionError | |
20 from bioblend.galaxy import dataset_collections | |
21 from . import test_util | |
22 from .test_util import unittest | |
23 | |
24 bioblend.set_stream_logger('test', level='INFO') | |
25 socket.setdefaulttimeout(10.0) | |
26 SAMPLE_FN = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
27 SAMPLE_WF_COLL_FN = test_util.get_abspath(os.path.join('data', 'paste_columns_collections.ga')) | |
28 FOO_DATA = 'foo\nbar\n' | |
29 FOO_DATA_2 = 'foo2\nbar2\n' | |
30 SAMPLE_WF_DICT = { | |
31 'deleted': False, | |
32 'id': '9005c5112febe774', | |
33 'inputs': { | |
34 '571': {'label': 'Input Dataset', 'value': ''}, | |
35 '572': {'label': 'Input Dataset', 'value': ''}, | |
36 }, | |
37 'model_class': 'StoredWorkflow', | |
38 'name': 'paste_columns', | |
39 'published': False, | |
40 'steps': { | |
41 '571': { | |
42 'id': 571, | |
43 'input_steps': {}, | |
44 'tool_id': None, | |
45 'tool_inputs': {'name': 'Input Dataset'}, | |
46 'tool_version': None, | |
47 'type': 'data_input', | |
48 }, | |
49 '572': { | |
50 'id': 572, | |
51 'input_steps': {}, | |
52 'tool_id': None, | |
53 'tool_inputs': {'name': 'Input Dataset'}, | |
54 'tool_version': None, | |
55 'type': 'data_input', | |
56 }, | |
57 '573': { | |
58 'id': 573, | |
59 'input_steps': { | |
60 'input1': {'source_step': 571, 'step_output': 'output'}, | |
61 'input2': {'source_step': 572, 'step_output': 'output'}, | |
62 }, | |
63 'tool_id': 'Paste1', | |
64 'tool_inputs': { | |
65 'delimiter': '"T"', | |
66 'input1': 'null', | |
67 'input2': 'null', | |
68 }, | |
69 'tool_version': '1.0.0', | |
70 'type': 'tool', | |
71 } | |
72 }, | |
73 'tags': [], | |
74 'url': '/api/workflows/9005c5112febe774', | |
75 } | |
76 | |
77 | |
78 def is_reachable(url): | |
79 res = None | |
80 try: | |
81 res = urlopen(url, timeout=5) | |
82 except (SSLError, URLError, socket.timeout): | |
83 return False | |
84 if res is not None: | |
85 res.close() | |
86 return True | |
87 | |
88 | |
89 def upload_from_fs(lib, bnames, **kwargs): | |
90 tempdir = tempfile.mkdtemp(prefix='bioblend_test_') | |
91 try: | |
92 fnames = [os.path.join(tempdir, _) for _ in bnames] | |
93 for fn in fnames: | |
94 with open(fn, 'w') as f: | |
95 f.write(FOO_DATA) | |
96 dss = lib.upload_from_galaxy_fs(fnames, **kwargs) | |
97 finally: | |
98 shutil.rmtree(tempdir) | |
99 return dss, fnames | |
100 | |
101 | |
102 class MockWrapper(wrappers.Wrapper): | |
103 BASE_ATTRS = frozenset(['a', 'b']) | |
104 | |
105 def __init__(self, *args, **kwargs): | |
106 super(MockWrapper, self).__init__(*args, **kwargs) | |
107 | |
108 @property | |
109 def gi_module(self): | |
110 return super(MockWrapper, self).gi_module() | |
111 | |
112 | |
113 class TestWrapper(unittest.TestCase): | |
114 | |
115 def setUp(self): | |
116 self.d = {'a': 1, 'b': [2, 3], 'c': {'x': 4}} | |
117 self.assertRaises(TypeError, wrappers.Wrapper, self.d) | |
118 self.w = MockWrapper(self.d) | |
119 | |
120 def test_initialize(self): | |
121 for k in MockWrapper.BASE_ATTRS: | |
122 self.assertEqual(getattr(self.w, k), self.d[k]) | |
123 self.w.a = 222 | |
124 self.w.b[0] = 222 | |
125 self.assertEqual(self.w.a, 222) | |
126 self.assertEqual(self.w.b[0], 222) | |
127 self.assertEqual(self.d['a'], 1) | |
128 self.assertEqual(self.d['b'][0], 2) | |
129 self.assertRaises(AttributeError, getattr, self.w, 'foo') | |
130 self.assertRaises(AttributeError, setattr, self.w, 'foo', 0) | |
131 | |
132 def test_taint(self): | |
133 self.assertFalse(self.w.is_modified) | |
134 self.w.a = 111 # pylint: disable=W0201 | |
135 self.assertTrue(self.w.is_modified) | |
136 | |
137 def test_serialize(self): | |
138 w = MockWrapper.from_json(self.w.to_json()) | |
139 self.assertEqual(w.wrapped, self.w.wrapped) | |
140 | |
141 def test_clone(self): | |
142 w = self.w.clone() | |
143 self.assertEqual(w.wrapped, self.w.wrapped) | |
144 w.b[0] = 111 | |
145 self.assertEqual(self.w.b[0], 2) | |
146 | |
147 def test_kwargs(self): | |
148 parent = MockWrapper({'a': 10}) | |
149 w = MockWrapper(self.d, parent=parent) | |
150 self.assertIs(w.parent, parent) | |
151 self.assertRaises(AttributeError, setattr, w, 'parent', 0) | |
152 | |
153 | |
154 class TestWorkflow(unittest.TestCase): | |
155 | |
156 def setUp(self): | |
157 self.wf = wrappers.Workflow(SAMPLE_WF_DICT) | |
158 | |
159 def test_initialize(self): | |
160 self.assertEqual(self.wf.id, '9005c5112febe774') | |
161 self.assertEqual(self.wf.name, 'paste_columns') | |
162 self.assertEqual(self.wf.deleted, False) | |
163 self.assertEqual(self.wf.published, False) | |
164 self.assertEqual(self.wf.tags, []) | |
165 self.assertEqual( | |
166 self.wf.input_labels_to_ids, {'Input Dataset': set(['571', '572'])}) | |
167 self.assertEqual(self.wf.tool_labels_to_ids, {'Paste1': set(['573'])}) | |
168 self.assertEqual(self.wf.data_input_ids, set(['571', '572'])) | |
169 self.assertEqual(self.wf.source_ids, set(['571', '572'])) | |
170 self.assertEqual(self.wf.sink_ids, set(['573'])) | |
171 | |
172 def test_dag(self): | |
173 inv_dag = {} | |
174 for h, tails in six.iteritems(self.wf.dag): | |
175 for t in tails: | |
176 inv_dag.setdefault(str(t), set()).add(h) | |
177 self.assertEqual(self.wf.inv_dag, inv_dag) | |
178 heads = set(self.wf.dag) | |
179 self.assertEqual(heads, set.union(*self.wf.inv_dag.values())) | |
180 tails = set(self.wf.inv_dag) | |
181 self.assertEqual(tails, set.union(*self.wf.dag.values())) | |
182 ids = self.wf.sorted_step_ids() | |
183 self.assertEqual(set(ids), heads | tails) | |
184 for h, tails in six.iteritems(self.wf.dag): | |
185 for t in tails: | |
186 self.assertLess(ids.index(h), ids.index(t)) | |
187 | |
188 def test_steps(self): | |
189 steps = SAMPLE_WF_DICT['steps'] | |
190 for sid, s in six.iteritems(self.wf.steps): | |
191 self.assertIsInstance(s, wrappers.Step) | |
192 self.assertEqual(s.id, sid) | |
193 self.assertIn(sid, steps) | |
194 self.assertIs(s.parent, self.wf) | |
195 self.assertEqual(self.wf.data_input_ids, set(['571', '572'])) | |
196 self.assertEqual(self.wf.tool_ids, set(['573'])) | |
197 | |
198 def test_taint(self): | |
199 self.assertFalse(self.wf.is_modified) | |
200 self.wf.steps['571'].tool_id = 'foo' | |
201 self.assertTrue(self.wf.is_modified) | |
202 | |
203 def test_input_map(self): | |
204 class DummyLD(object): | |
205 SRC = 'ld' | |
206 | |
207 def __init__(self, id_): | |
208 self.id = id_ | |
209 | |
210 label = 'Input Dataset' | |
211 self.assertEqual(self.wf.input_labels, set([label])) | |
212 input_map = self.wf.convert_input_map( | |
213 {label: [DummyLD('a'), DummyLD('b')]}) | |
214 # {'571': {'id': 'a', 'src': 'ld'}, '572': {'id': 'b', 'src': 'ld'}} | |
215 # OR | |
216 # {'571': {'id': 'b', 'src': 'ld'}, '572': {'id': 'a', 'src': 'ld'}} | |
217 self.assertEqual(set(input_map), set(['571', '572'])) | |
218 for d in six.itervalues(input_map): | |
219 self.assertEqual(set(d), set(['id', 'src'])) | |
220 self.assertEqual(d['src'], 'ld') | |
221 self.assertIn(d['id'], 'ab') | |
222 | |
223 | |
224 @test_util.skip_unless_galaxy() | |
225 class GalaxyObjectsTestBase(unittest.TestCase): | |
226 | |
227 def setUp(self): | |
228 galaxy_key = os.environ['BIOBLEND_GALAXY_API_KEY'] | |
229 galaxy_url = os.environ['BIOBLEND_GALAXY_URL'] | |
230 self.gi = galaxy_instance.GalaxyInstance(galaxy_url, galaxy_key) | |
231 | |
232 | |
233 class TestGalaxyInstance(GalaxyObjectsTestBase): | |
234 | |
235 def test_library(self): | |
236 name = 'test_%s' % uuid.uuid4().hex | |
237 description, synopsis = 'D', 'S' | |
238 lib = self.gi.libraries.create( | |
239 name, description=description, synopsis=synopsis) | |
240 self.assertEqual(lib.name, name) | |
241 self.assertEqual(lib.description, description) | |
242 self.assertEqual(lib.synopsis, synopsis) | |
243 self.assertEqual(len(lib.content_infos), 1) # root folder | |
244 self.assertEqual(len(lib.folder_ids), 1) | |
245 self.assertEqual(len(lib.dataset_ids), 0) | |
246 self.assertIn(lib.id, [_.id for _ in self.gi.libraries.list()]) | |
247 lib.delete() | |
248 self.assertFalse(lib.is_mapped) | |
249 | |
250 def test_workflow_from_str(self): | |
251 with open(SAMPLE_FN) as f: | |
252 wf = self.gi.workflows.import_new(f.read()) | |
253 self._check_and_del_workflow(wf) | |
254 | |
255 def test_workflow_collections_from_str(self): | |
256 with open(SAMPLE_WF_COLL_FN) as f: | |
257 wf = self.gi.workflows.import_new(f.read()) | |
258 self._check_and_del_workflow(wf) | |
259 | |
260 def test_workflow_from_dict(self): | |
261 with open(SAMPLE_FN) as f: | |
262 wf = self.gi.workflows.import_new(json.load(f)) | |
263 self._check_and_del_workflow(wf) | |
264 | |
265 def test_workflow_publish_from_dict(self): | |
266 with open(SAMPLE_FN) as f: | |
267 wf = self.gi.workflows.import_new(json.load(f), publish=True) | |
268 self._check_and_del_workflow(wf, check_is_public=True) | |
269 | |
270 def test_workflow_missing_tools(self): | |
271 with open(SAMPLE_FN) as f: | |
272 wf_dump = json.load(f) | |
273 wf_info = self.gi.gi.workflows.import_workflow_dict(wf_dump) | |
274 wf_dict = self.gi.gi.workflows.show_workflow(wf_info['id']) | |
275 for id_, step in six.iteritems(wf_dict['steps']): | |
276 if step['type'] == 'tool': | |
277 for k in 'tool_inputs', 'tool_version': | |
278 wf_dict['steps'][id_][k] = None | |
279 wf = wrappers.Workflow(wf_dict, gi=self.gi) | |
280 self.assertFalse(wf.is_runnable) | |
281 self.assertRaises(RuntimeError, wf.run) | |
282 wf.delete() | |
283 | |
284 def test_workflow_export(self): | |
285 with open(SAMPLE_FN) as f: | |
286 wf1 = self.gi.workflows.import_new(f.read()) | |
287 wf2 = self.gi.workflows.import_new(wf1.export()) | |
288 self.assertNotEqual(wf1.id, wf2.id) | |
289 for wf in wf1, wf2: | |
290 self._check_and_del_workflow(wf) | |
291 | |
292 def _check_and_del_workflow(self, wf, check_is_public=False): | |
293 # Galaxy appends additional text to imported workflow names | |
294 self.assertTrue(wf.name.startswith('paste_columns')) | |
295 self.assertEqual(len(wf.steps), 3) | |
296 for step_id, step in six.iteritems(wf.steps): | |
297 self.assertIsInstance(step, wrappers.Step) | |
298 self.assertEqual(step_id, step.id) | |
299 self.assertIsInstance(step.tool_inputs, dict) | |
300 if step.type == 'tool': | |
301 self.assertIsNotNone(step.tool_id) | |
302 self.assertIsNotNone(step.tool_version) | |
303 self.assertIsInstance(step.input_steps, dict) | |
304 elif step.type in ('data_collection_input', 'data_input'): | |
305 self.assertIsNone(step.tool_id) | |
306 self.assertIsNone(step.tool_version) | |
307 self.assertEqual(step.input_steps, {}) | |
308 wf_ids = set(_.id for _ in self.gi.workflows.list()) | |
309 self.assertIn(wf.id, wf_ids) | |
310 if check_is_public: | |
311 self.assertTrue(wf.published) | |
312 wf.delete() | |
313 | |
314 # not very accurate: | |
315 # * we can't publish a wf from the API | |
316 # * we can't directly get another user's wf | |
317 def test_workflow_from_shared(self): | |
318 all_prevs = dict( | |
319 (_.id, _) for _ in self.gi.workflows.get_previews(published=True) | |
320 ) | |
321 pub_only_ids = set(all_prevs).difference( | |
322 _.id for _ in self.gi.workflows.get_previews()) | |
323 if pub_only_ids: | |
324 wf_id = pub_only_ids.pop() | |
325 imported = self.gi.workflows.import_shared(wf_id) | |
326 self.assertIsInstance(imported, wrappers.Workflow) | |
327 imported.delete() | |
328 else: | |
329 self.skipTest('no published workflows, manually publish a workflow to run this test') | |
330 | |
331 def test_get_libraries(self): | |
332 self._test_multi_get('library') | |
333 | |
334 def test_get_histories(self): | |
335 self._test_multi_get('history') | |
336 | |
337 def test_get_workflows(self): | |
338 self._test_multi_get('workflow') | |
339 | |
340 def _normalized_functions(self, obj_type): | |
341 if obj_type == 'library': | |
342 create = self.gi.libraries.create | |
343 get_objs = self.gi.libraries.list | |
344 get_prevs = self.gi.libraries.get_previews | |
345 del_kwargs = {} | |
346 elif obj_type == 'history': | |
347 create = self.gi.histories.create | |
348 get_objs = self.gi.histories.list | |
349 get_prevs = self.gi.histories.get_previews | |
350 del_kwargs = {'purge': True} | |
351 elif obj_type == 'workflow': | |
352 def create(name): | |
353 with open(SAMPLE_FN) as f: | |
354 d = json.load(f) | |
355 d['name'] = name | |
356 return self.gi.workflows.import_new(d) | |
357 | |
358 get_objs = self.gi.workflows.list | |
359 get_prevs = self.gi.workflows.get_previews | |
360 del_kwargs = {} | |
361 return create, get_objs, get_prevs, del_kwargs | |
362 | |
363 def _test_multi_get(self, obj_type): | |
364 create, get_objs, get_prevs, del_kwargs = self._normalized_functions( | |
365 obj_type) | |
366 | |
367 def ids(seq): | |
368 return set(_.id for _ in seq) | |
369 | |
370 names = ['test_%s' % uuid.uuid4().hex for _ in range(2)] | |
371 objs = [] | |
372 try: | |
373 objs = [create(_) for _ in names] | |
374 self.assertLessEqual(ids(objs), ids(get_objs())) | |
375 if obj_type != 'workflow': | |
376 filtered = get_objs(name=names[0]) | |
377 self.assertEqual(len(filtered), 1) | |
378 self.assertEqual(filtered[0].id, objs[0].id) | |
379 del_id = objs[-1].id | |
380 objs.pop().delete(**del_kwargs) | |
381 self.assertIn(del_id, ids(get_prevs(deleted=True))) | |
382 else: | |
383 # Galaxy appends info strings to imported workflow names | |
384 prev = get_prevs()[0] | |
385 filtered = get_objs(name=prev.name) | |
386 self.assertEqual(len(filtered), 1) | |
387 self.assertEqual(filtered[0].id, prev.id) | |
388 finally: | |
389 for o in objs: | |
390 o.delete(**del_kwargs) | |
391 | |
392 def test_delete_libraries_by_name(self): | |
393 self._test_delete_by_name('library') | |
394 | |
395 def test_delete_histories_by_name(self): | |
396 self._test_delete_by_name('history') | |
397 | |
398 def test_delete_workflows_by_name(self): | |
399 self._test_delete_by_name('workflow') | |
400 | |
401 def _test_delete_by_name(self, obj_type): | |
402 create, _, get_prevs, del_kwargs = self._normalized_functions( | |
403 obj_type) | |
404 name = 'test_%s' % uuid.uuid4().hex | |
405 objs = [create(name) for _ in range(2)] # noqa: F812 | |
406 final_name = objs[0].name | |
407 prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted] | |
408 self.assertEqual(len(prevs), len(objs)) | |
409 del_kwargs['name'] = final_name | |
410 objs[0].gi_module.delete(**del_kwargs) | |
411 prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted] | |
412 self.assertEqual(len(prevs), 0) | |
413 | |
414 | |
415 class TestLibrary(GalaxyObjectsTestBase): | |
416 # just something that can be expected to be always up | |
417 DS_URL = 'https://tools.ietf.org/rfc/rfc1866.txt' | |
418 | |
419 def setUp(self): | |
420 super(TestLibrary, self).setUp() | |
421 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) | |
422 | |
423 def tearDown(self): | |
424 self.lib.delete() | |
425 | |
426 def test_root_folder(self): | |
427 r = self.lib.root_folder | |
428 self.assertIsNone(r.parent) | |
429 | |
430 def test_folder(self): | |
431 name, desc = 'test_%s' % uuid.uuid4().hex, 'D' | |
432 folder = self.lib.create_folder(name, description=desc) | |
433 self.assertEqual(folder.name, name) | |
434 self.assertEqual(folder.description, desc) | |
435 self.assertIs(folder.container, self.lib) | |
436 self.assertEqual(folder.parent.id, self.lib.root_folder.id) | |
437 self.assertEqual(len(self.lib.content_infos), 2) | |
438 self.assertEqual(len(self.lib.folder_ids), 2) | |
439 self.assertIn(folder.id, self.lib.folder_ids) | |
440 retrieved = self.lib.get_folder(folder.id) | |
441 self.assertEqual(folder.id, retrieved.id) | |
442 | |
443 def _check_datasets(self, dss): | |
444 self.assertEqual(len(dss), len(self.lib.dataset_ids)) | |
445 self.assertEqual(set(_.id for _ in dss), set(self.lib.dataset_ids)) | |
446 for ds in dss: | |
447 self.assertIs(ds.container, self.lib) | |
448 | |
449 def test_dataset(self): | |
450 folder = self.lib.create_folder('test_%s' % uuid.uuid4().hex) | |
451 ds = self.lib.upload_data(FOO_DATA, folder=folder) | |
452 self.assertEqual(len(self.lib.content_infos), 3) | |
453 self.assertEqual(len(self.lib.folder_ids), 2) | |
454 self._check_datasets([ds]) | |
455 | |
456 def test_dataset_from_url(self): | |
457 if is_reachable(self.DS_URL): | |
458 ds = self.lib.upload_from_url(self.DS_URL) | |
459 self._check_datasets([ds]) | |
460 else: | |
461 self.skipTest('%s not reachable' % self.DS_URL) | |
462 | |
463 def test_dataset_from_local(self): | |
464 with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f: | |
465 f.write(FOO_DATA) | |
466 f.flush() | |
467 ds = self.lib.upload_from_local(f.name) | |
468 self._check_datasets([ds]) | |
469 | |
470 def test_datasets_from_fs(self): | |
471 bnames = ['f%d.txt' % i for i in range(2)] | |
472 dss, fnames = upload_from_fs(self.lib, bnames) | |
473 self._check_datasets(dss) | |
474 dss, fnames = upload_from_fs( | |
475 self.lib, bnames, link_data_only='link_to_files') | |
476 for ds, fn in zip(dss, fnames): | |
477 self.assertEqual(ds.file_name, fn) | |
478 | |
479 def test_copy_from_dataset(self): | |
480 hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) | |
481 try: | |
482 hda = hist.paste_content(FOO_DATA) | |
483 ds = self.lib.copy_from_dataset(hda) | |
484 finally: | |
485 hist.delete(purge=True) | |
486 self._check_datasets([ds]) | |
487 | |
488 def test_get_dataset(self): | |
489 ds = self.lib.upload_data(FOO_DATA) | |
490 retrieved = self.lib.get_dataset(ds.id) | |
491 self.assertEqual(ds.id, retrieved.id) | |
492 | |
493 def test_get_datasets(self): | |
494 bnames = ['f%d.txt' % _ for _ in range(2)] | |
495 dss, _ = upload_from_fs(self.lib, bnames) | |
496 retrieved = self.lib.get_datasets() | |
497 self.assertEqual(len(dss), len(retrieved)) | |
498 self.assertEqual(set(_.id for _ in dss), set(_.id for _ in retrieved)) | |
499 name = '/%s' % bnames[0] | |
500 selected = self.lib.get_datasets(name=name) | |
501 self.assertEqual(len(selected), 1) | |
502 self.assertEqual(selected[0].name, bnames[0]) | |
503 | |
504 | |
505 class TestLDContents(GalaxyObjectsTestBase): | |
506 | |
507 def setUp(self): | |
508 super(TestLDContents, self).setUp() | |
509 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) | |
510 self.ds = self.lib.upload_data(FOO_DATA) | |
511 self.ds.wait() | |
512 | |
513 def tearDown(self): | |
514 self.lib.delete() | |
515 | |
516 def test_dataset_get_stream(self): | |
517 for idx, c in enumerate(self.ds.get_stream(chunk_size=1)): | |
518 self.assertEqual(six.b(FOO_DATA[idx]), c) | |
519 | |
520 def test_dataset_peek(self): | |
521 fetched_data = self.ds.peek(chunk_size=4) | |
522 self.assertEqual(six.b(FOO_DATA[0:4]), fetched_data) | |
523 | |
524 def test_dataset_download(self): | |
525 with tempfile.TemporaryFile() as f: | |
526 self.ds.download(f) | |
527 f.seek(0) | |
528 self.assertEqual(six.b(FOO_DATA), f.read()) | |
529 | |
530 def test_dataset_get_contents(self): | |
531 self.assertEqual(six.b(FOO_DATA), self.ds.get_contents()) | |
532 | |
533 def test_dataset_delete(self): | |
534 self.ds.delete() | |
535 # Cannot test this yet because the 'deleted' attribute is not exported | |
536 # by the API at the moment | |
537 # self.assertTrue(self.ds.deleted) | |
538 | |
539 @test_util.skip_unless_galaxy('release_17.09') | |
540 def test_dataset_update(self): | |
541 new_name = 'test_%s' % uuid.uuid4().hex | |
542 new_misc_info = 'Annotation for %s' % new_name | |
543 new_genome_build = 'hg19' | |
544 updated_ldda = self.ds.update(name=new_name, misc_info=new_misc_info, genome_build=new_genome_build) | |
545 self.assertEqual(self.ds.id, updated_ldda.id) | |
546 self.assertEqual(self.ds.name, new_name) | |
547 self.assertEqual(self.ds.misc_info, new_misc_info) | |
548 self.assertEqual(self.ds.genome_build, new_genome_build) | |
549 | |
550 | |
551 class TestHistory(GalaxyObjectsTestBase): | |
552 | |
553 def setUp(self): | |
554 super(TestHistory, self).setUp() | |
555 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) | |
556 | |
557 def tearDown(self): | |
558 self.hist.delete(purge=True) | |
559 | |
560 def test_create_delete(self): | |
561 name = 'test_%s' % uuid.uuid4().hex | |
562 hist = self.gi.histories.create(name) | |
563 self.assertEqual(hist.name, name) | |
564 hist_id = hist.id | |
565 self.assertIn(hist_id, [_.id for _ in self.gi.histories.list()]) | |
566 hist.delete(purge=True) | |
567 self.assertFalse(hist.is_mapped) | |
568 try: | |
569 h = self.gi.histories.get(hist_id) | |
570 self.assertTrue(h.deleted) | |
571 except ConnectionError: | |
572 # Galaxy up to release_2015.01.13 gives a ConnectionError | |
573 pass | |
574 | |
575 def _check_dataset(self, hda): | |
576 self.assertIsInstance(hda, wrappers.HistoryDatasetAssociation) | |
577 self.assertIs(hda.container, self.hist) | |
578 self.assertEqual(len(self.hist.dataset_ids), 1) | |
579 self.assertEqual(self.hist.dataset_ids[0], hda.id) | |
580 | |
581 def test_import_dataset(self): | |
582 lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) | |
583 lds = lib.upload_data(FOO_DATA) | |
584 self.assertEqual(len(self.hist.dataset_ids), 0) | |
585 hda = self.hist.import_dataset(lds) | |
586 lib.delete() | |
587 self._check_dataset(hda) | |
588 | |
589 def test_upload_file(self): | |
590 with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f: | |
591 f.write(FOO_DATA) | |
592 f.flush() | |
593 hda = self.hist.upload_file(f.name) | |
594 self._check_dataset(hda) | |
595 | |
596 def test_paste_content(self): | |
597 hda = self.hist.paste_content(FOO_DATA) | |
598 self._check_dataset(hda) | |
599 | |
600 def test_get_dataset(self): | |
601 hda = self.hist.paste_content(FOO_DATA) | |
602 retrieved = self.hist.get_dataset(hda.id) | |
603 self.assertEqual(hda.id, retrieved.id) | |
604 | |
605 def test_get_datasets(self): | |
606 bnames = ['f%d.txt' % _ for _ in range(2)] | |
607 lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) | |
608 lds = upload_from_fs(lib, bnames)[0] | |
609 hdas = [self.hist.import_dataset(_) for _ in lds] | |
610 lib.delete() | |
611 retrieved = self.hist.get_datasets() | |
612 self.assertEqual(len(hdas), len(retrieved)) | |
613 self.assertEqual(set(_.id for _ in hdas), set(_.id for _ in retrieved)) | |
614 selected = self.hist.get_datasets(name=bnames[0]) | |
615 self.assertEqual(len(selected), 1) | |
616 self.assertEqual(selected[0].name, bnames[0]) | |
617 | |
618 def test_export_and_download(self): | |
619 jeha_id = self.hist.export(wait=True, maxwait=60) | |
620 self.assertTrue(jeha_id) | |
621 tempdir = tempfile.mkdtemp(prefix='bioblend_test_') | |
622 temp_fn = os.path.join(tempdir, 'export.tar.gz') | |
623 try: | |
624 with open(temp_fn, 'wb') as fo: | |
625 self.hist.download(jeha_id, fo) | |
626 self.assertTrue(tarfile.is_tarfile(temp_fn)) | |
627 finally: | |
628 shutil.rmtree(tempdir) | |
629 | |
630 def test_update(self): | |
631 new_name = 'test_%s' % uuid.uuid4().hex | |
632 new_annotation = 'Annotation for %s' % new_name | |
633 new_tags = ['tag1', 'tag2'] | |
634 updated_hist = self.hist.update(name=new_name, annotation=new_annotation, tags=new_tags) | |
635 self.assertEqual(self.hist.id, updated_hist.id) | |
636 self.assertEqual(self.hist.name, new_name) | |
637 self.assertEqual(self.hist.annotation, new_annotation) | |
638 self.assertEqual(self.hist.tags, new_tags) | |
639 updated_hist = self.hist.update(published=True) | |
640 self.assertEqual(self.hist.id, updated_hist.id) | |
641 self.assertTrue(self.hist.published) | |
642 | |
643 def test_create_dataset_collection(self): | |
644 self._create_collection_description() | |
645 hdca = self.hist.create_dataset_collection(self.collection_description) | |
646 self.assertIsInstance(hdca, wrappers.HistoryDatasetCollectionAssociation) | |
647 self.assertEqual(hdca.collection_type, 'list') | |
648 self.assertIs(hdca.container, self.hist) | |
649 self.assertEqual(len(hdca.elements), 2) | |
650 self.assertEqual(self.dataset1.id, hdca.elements[0]['object']['id']) | |
651 self.assertEqual(self.dataset2.id, hdca.elements[1]['object']['id']) | |
652 | |
653 def test_delete_dataset_collection(self): | |
654 self._create_collection_description() | |
655 hdca = self.hist.create_dataset_collection(self.collection_description) | |
656 hdca.delete() | |
657 self.assertTrue(hdca.deleted) | |
658 | |
659 def _create_collection_description(self): | |
660 self.dataset1 = self.hist.paste_content(FOO_DATA) | |
661 self.dataset2 = self.hist.paste_content(FOO_DATA_2) | |
662 self.collection_description = dataset_collections.CollectionDescription( | |
663 name="MyDatasetList", | |
664 elements=[ | |
665 dataset_collections.HistoryDatasetElement(name="sample1", id=self.dataset1.id), | |
666 dataset_collections.HistoryDatasetElement(name="sample2", id=self.dataset2.id), | |
667 ] | |
668 ) | |
669 | |
670 | |
671 class TestHDAContents(GalaxyObjectsTestBase): | |
672 | |
673 def setUp(self): | |
674 super(TestHDAContents, self).setUp() | |
675 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) | |
676 self.ds = self.hist.paste_content(FOO_DATA) | |
677 self.ds.wait() | |
678 | |
679 def tearDown(self): | |
680 self.hist.delete(purge=True) | |
681 | |
682 def test_dataset_get_stream(self): | |
683 for idx, c in enumerate(self.ds.get_stream(chunk_size=1)): | |
684 self.assertEqual(six.b(FOO_DATA[idx]), c) | |
685 | |
686 def test_dataset_peek(self): | |
687 fetched_data = self.ds.peek(chunk_size=4) | |
688 self.assertEqual(six.b(FOO_DATA[0:4]), fetched_data) | |
689 | |
690 def test_dataset_download(self): | |
691 with tempfile.TemporaryFile() as f: | |
692 self.ds.download(f) | |
693 f.seek(0) | |
694 self.assertEqual(six.b(FOO_DATA), f.read()) | |
695 | |
696 def test_dataset_get_contents(self): | |
697 self.assertEqual(six.b(FOO_DATA), self.ds.get_contents()) | |
698 | |
699 def test_dataset_update(self): | |
700 new_name = 'test_%s' % uuid.uuid4().hex | |
701 new_annotation = 'Annotation for %s' % new_name | |
702 new_genome_build = 'hg19' | |
703 updated_hda = self.ds.update(name=new_name, annotation=new_annotation, genome_build=new_genome_build) | |
704 self.assertEqual(self.ds.id, updated_hda.id) | |
705 self.assertEqual(self.ds.name, new_name) | |
706 self.assertEqual(self.ds.annotation, new_annotation) | |
707 self.assertEqual(self.ds.genome_build, new_genome_build) | |
708 | |
709 def test_dataset_delete(self): | |
710 self.ds.delete() | |
711 self.assertTrue(self.ds.deleted) | |
712 self.assertFalse(self.ds.purged) | |
713 | |
714 @test_util.skip_unless_galaxy("release_17.05") | |
715 def test_dataset_purge(self): | |
716 self.ds.delete(purge=True) | |
717 # Galaxy from release_15.03 to release_17.01 wrongly reports ds.deleted as False, see https://github.com/galaxyproject/galaxy/issues/3548 | |
718 self.assertTrue(self.ds.deleted) | |
719 self.assertTrue(self.ds.purged) | |
720 | |
721 | |
722 class TestRunWorkflow(GalaxyObjectsTestBase): | |
723 | |
724 def setUp(self): | |
725 super(TestRunWorkflow, self).setUp() | |
726 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) | |
727 with open(SAMPLE_FN) as f: | |
728 self.wf = self.gi.workflows.import_new(f.read()) | |
729 self.contents = ['one\ntwo\n', '1\n2\n'] | |
730 self.inputs = [self.lib.upload_data(_) for _ in self.contents] | |
731 | |
732 def tearDown(self): | |
733 self.wf.delete() | |
734 self.lib.delete() | |
735 | |
736 def _test(self, existing_hist=False, params=False): | |
737 hist_name = 'test_%s' % uuid.uuid4().hex | |
738 if existing_hist: | |
739 hist = self.gi.histories.create(hist_name) | |
740 else: | |
741 hist = hist_name | |
742 if params: | |
743 params = {'Paste1': {'delimiter': 'U'}} | |
744 sep = '_' # 'U' maps to '_' in the paste tool | |
745 else: | |
746 params = None | |
747 sep = '\t' # default | |
748 input_map = {'Input 1': self.inputs[0], 'Input 2': self.inputs[1]} | |
749 sys.stderr.write(os.linesep) | |
750 outputs, out_hist = self.wf.run( | |
751 input_map, hist, params=params, wait=True, polling_interval=1) | |
752 self.assertEqual(len(outputs), 1) | |
753 out_ds = outputs[0] | |
754 self.assertIn(out_ds.id, out_hist.dataset_ids) | |
755 res = out_ds.get_contents() | |
756 exp_rows = zip(*(_.splitlines() for _ in self.contents)) | |
757 exp_res = six.b("\n".join(sep.join(t) for t in exp_rows) + "\n") | |
758 self.assertEqual(res, exp_res) | |
759 if existing_hist: | |
760 self.assertEqual(out_hist.id, hist.id) | |
761 out_hist.delete(purge=True) | |
762 | |
763 def test_existing_history(self): | |
764 self._test(existing_hist=True) | |
765 | |
766 def test_new_history(self): | |
767 self._test(existing_hist=False) | |
768 | |
769 def test_params(self): | |
770 self._test(params=True) | |
771 | |
772 | |
773 class TestRunDatasetCollectionWorkflow(GalaxyObjectsTestBase): | |
774 | |
775 def setUp(self): | |
776 super(TestRunDatasetCollectionWorkflow, self).setUp() | |
777 with open(SAMPLE_WF_COLL_FN) as f: | |
778 self.wf = self.gi.workflows.import_new(f.read()) | |
779 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) | |
780 | |
781 def tearDown(self): | |
782 self.wf.delete() | |
783 self.hist.delete(purge=True) | |
784 | |
785 def test_run_workflow_with_dataset_collection(self): | |
786 dataset1 = self.hist.paste_content(FOO_DATA) | |
787 dataset2 = self.hist.paste_content(FOO_DATA_2) | |
788 collection_description = dataset_collections.CollectionDescription( | |
789 name="MyDatasetList", | |
790 elements=[ | |
791 dataset_collections.HistoryDatasetElement(name="sample1", id=dataset1.id), | |
792 dataset_collections.HistoryDatasetElement(name="sample2", id=dataset2.id), | |
793 ] | |
794 ) | |
795 dataset_collection = self.hist.create_dataset_collection(collection_description) | |
796 input_map = {"Input Dataset Collection": dataset_collection, | |
797 "Input 2": dataset1} | |
798 outputs, out_hist = self.wf.run(input_map, self.hist, wait=True) | |
799 self.assertEqual(len(outputs), 1) | |
800 out_hdca = outputs[0] | |
801 self.assertIsInstance(out_hdca, wrappers.HistoryDatasetCollectionAssociation) | |
802 self.assertEqual(out_hdca.collection_type, 'list') | |
803 self.assertEqual(len(out_hdca.elements), 2) | |
804 self.assertEqual(out_hist.id, self.hist.id) | |
805 | |
806 | |
807 class TestJob(GalaxyObjectsTestBase): | |
808 | |
809 def setUp(self): | |
810 super(TestJob, self).setUp() | |
811 | |
812 def test_get(self): | |
813 job_prevs = self.gi.jobs.get_previews() | |
814 if len(job_prevs) > 0: | |
815 job_prev = job_prevs[0] | |
816 self.assertIsInstance(job_prev, wrappers.JobPreview) | |
817 job = self.gi.jobs.get(job_prev.id) | |
818 self.assertIsInstance(job, wrappers.Job) | |
819 self.assertEqual(job.id, job_prev.id) | |
820 for job in self.gi.jobs.list(): | |
821 self.assertIsInstance(job, wrappers.Job) | |
822 | |
823 | |
824 def suite(): | |
825 loader = unittest.TestLoader() | |
826 s = unittest.TestSuite() | |
827 s.addTests([loader.loadTestsFromTestCase(c) for c in ( | |
828 TestWrapper, | |
829 TestWorkflow, | |
830 TestGalaxyInstance, | |
831 TestLibrary, | |
832 TestLDContents, | |
833 TestHistory, | |
834 TestHDAContents, | |
835 TestRunWorkflow, | |
836 )]) | |
837 return s | |
838 | |
839 | |
840 if __name__ == '__main__': | |
841 # By default, run all tests. To run specific tests, do the following: | |
842 # python -m unittest <module>.<class>.<test_method> | |
843 tests = suite() | |
844 RUNNER = unittest.TextTestRunner(verbosity=2) | |
845 RUNNER.run(tests) |