Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyObjects.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyObjects.py Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,836 @@ +# pylint: disable=C0103,E1101 +import json +import os +import shutil +import socket +import sys +import tarfile +import tempfile +import uuid +from ssl import SSLError +from urllib.error import URLError +from urllib.request import urlopen + +import bioblend +import bioblend.galaxy.objects.galaxy_instance as galaxy_instance +import bioblend.galaxy.objects.wrappers as wrappers +from bioblend.galaxy import dataset_collections +from . import test_util +from .test_util import unittest + +bioblend.set_stream_logger('test', level='INFO') +socket.setdefaulttimeout(10.0) +SAMPLE_FN = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) +SAMPLE_WF_COLL_FN = test_util.get_abspath(os.path.join('data', 'paste_columns_collections.ga')) +SAMPLE_WF_PARAMETER_INPUT_FN = test_util.get_abspath(os.path.join('data', 'workflow_with_parameter_input.ga')) +FOO_DATA = 'foo\nbar\n' +FOO_DATA_2 = 'foo2\nbar2\n' +SAMPLE_WF_DICT = { + 'deleted': False, + 'id': '9005c5112febe774', + 'inputs': { + '571': {'label': 'Input Dataset', 'value': ''}, + '572': {'label': 'Input Dataset', 'value': ''}, + }, + 'model_class': 'StoredWorkflow', + 'name': 'paste_columns', + 'published': False, + 'steps': { + '571': { + 'id': 571, + 'input_steps': {}, + 'tool_id': None, + 'tool_inputs': {'name': 'Input Dataset'}, + 'tool_version': None, + 'type': 'data_input', + }, + '572': { + 'id': 572, + 'input_steps': {}, + 'tool_id': None, + 'tool_inputs': {'name': 'Input Dataset'}, + 'tool_version': None, + 'type': 'data_input', + }, + '573': { + 'id': 573, + 'input_steps': { + 'input1': {'source_step': 571, 'step_output': 'output'}, + 'input2': {'source_step': 572, 'step_output': 'output'}, + }, + 'tool_id': 'Paste1', + 'tool_inputs': { + 'delimiter': '"T"', + 'input1': 'null', + 'input2': 'null', + }, + 'tool_version': '1.0.0', + 'type': 'tool', + } + }, + 'tags': [], + 'url': '/api/workflows/9005c5112febe774', +} + + +def is_reachable(url): + res = None + try: + res = urlopen(url, timeout=5) + except (SSLError, URLError, socket.timeout): + return False + if res is not None: + res.close() + return True + + +def upload_from_fs(lib, bnames, **kwargs): + tempdir = tempfile.mkdtemp(prefix='bioblend_test_') + try: + fnames = [os.path.join(tempdir, _) for _ in bnames] + for fn in fnames: + with open(fn, 'w') as f: + f.write(FOO_DATA) + dss = lib.upload_from_galaxy_fs(fnames, **kwargs) + finally: + shutil.rmtree(tempdir) + return dss, fnames + + +class MockWrapper(wrappers.Wrapper): + BASE_ATTRS = frozenset(['a', 'b']) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @property + def gi_module(self): + return super().gi_module() + + +class TestWrapper(unittest.TestCase): + + def setUp(self): + self.d = {'a': 1, 'b': [2, 3], 'c': {'x': 4}} + self.assertRaises(TypeError, wrappers.Wrapper, self.d) + self.w = MockWrapper(self.d) + + def test_initialize(self): + for k in MockWrapper.BASE_ATTRS: + self.assertEqual(getattr(self.w, k), self.d[k]) + self.w.a = 222 + self.w.b[0] = 222 + self.assertEqual(self.w.a, 222) + self.assertEqual(self.w.b[0], 222) + self.assertEqual(self.d['a'], 1) + self.assertEqual(self.d['b'][0], 2) + self.assertRaises(AttributeError, getattr, self.w, 'foo') + self.assertRaises(AttributeError, setattr, self.w, 'foo', 0) + + def test_taint(self): + self.assertFalse(self.w.is_modified) + self.w.a = 111 # pylint: disable=W0201 + self.assertTrue(self.w.is_modified) + + def test_serialize(self): + w = MockWrapper.from_json(self.w.to_json()) + self.assertEqual(w.wrapped, self.w.wrapped) + + def test_clone(self): + w = self.w.clone() + self.assertEqual(w.wrapped, self.w.wrapped) + w.b[0] = 111 + self.assertEqual(self.w.b[0], 2) + + def test_kwargs(self): + parent = MockWrapper({'a': 10}) + w = MockWrapper(self.d, parent=parent) + self.assertIs(w.parent, parent) + self.assertRaises(AttributeError, setattr, w, 'parent', 0) + + +class TestWorkflow(unittest.TestCase): + + def setUp(self): + self.wf = wrappers.Workflow(SAMPLE_WF_DICT) + + def test_initialize(self): + self.assertEqual(self.wf.id, '9005c5112febe774') + self.assertEqual(self.wf.name, 'paste_columns') + self.assertEqual(self.wf.deleted, False) + self.assertEqual(self.wf.published, False) + self.assertEqual(self.wf.tags, []) + self.assertEqual( + self.wf.input_labels_to_ids, {'Input Dataset': set(['571', '572'])}) + self.assertEqual(self.wf.tool_labels_to_ids, {'Paste1': set(['573'])}) + self.assertEqual(self.wf.data_input_ids, set(['571', '572'])) + self.assertEqual(self.wf.source_ids, set(['571', '572'])) + self.assertEqual(self.wf.sink_ids, set(['573'])) + + def test_dag(self): + inv_dag = {} + for h, tails in self.wf.dag.items(): + for t in tails: + inv_dag.setdefault(str(t), set()).add(h) + self.assertEqual(self.wf.inv_dag, inv_dag) + heads = set(self.wf.dag) + self.assertEqual(heads, set.union(*self.wf.inv_dag.values())) + tails = set(self.wf.inv_dag) + self.assertEqual(tails, set.union(*self.wf.dag.values())) + ids = self.wf.sorted_step_ids() + self.assertEqual(set(ids), heads | tails) + for h, tails in self.wf.dag.items(): + for t in tails: + self.assertLess(ids.index(h), ids.index(t)) + + def test_steps(self): + steps = SAMPLE_WF_DICT['steps'] + for sid, s in self.wf.steps.items(): + self.assertIsInstance(s, wrappers.Step) + self.assertEqual(s.id, sid) + self.assertIn(sid, steps) + self.assertIs(s.parent, self.wf) + self.assertEqual(self.wf.data_input_ids, set(['571', '572'])) + self.assertEqual(self.wf.tool_ids, set(['573'])) + + def test_taint(self): + self.assertFalse(self.wf.is_modified) + self.wf.steps['571'].tool_id = 'foo' + self.assertTrue(self.wf.is_modified) + + def test_input_map(self): + class DummyLD(object): + SRC = 'ld' + + def __init__(self, id_): + self.id = id_ + + label = 'Input Dataset' + self.assertEqual(self.wf.input_labels, set([label])) + input_map = self.wf.convert_input_map( + {label: [DummyLD('a'), DummyLD('b')]}) + # {'571': {'id': 'a', 'src': 'ld'}, '572': {'id': 'b', 'src': 'ld'}} + # OR + # {'571': {'id': 'b', 'src': 'ld'}, '572': {'id': 'a', 'src': 'ld'}} + self.assertEqual(set(input_map), set(['571', '572'])) + for d in input_map.values(): + self.assertEqual(set(d), set(['id', 'src'])) + self.assertEqual(d['src'], 'ld') + self.assertIn(d['id'], 'ab') + + +@test_util.skip_unless_galaxy() +class GalaxyObjectsTestBase(unittest.TestCase): + + def setUp(self): + galaxy_key = os.environ['BIOBLEND_GALAXY_API_KEY'] + galaxy_url = os.environ['BIOBLEND_GALAXY_URL'] + self.gi = galaxy_instance.GalaxyInstance(galaxy_url, galaxy_key) + + +class TestGalaxyInstance(GalaxyObjectsTestBase): + + def test_library(self): + name = 'test_%s' % uuid.uuid4().hex + description, synopsis = 'D', 'S' + lib = self.gi.libraries.create( + name, description=description, synopsis=synopsis) + self.assertEqual(lib.name, name) + self.assertEqual(lib.description, description) + self.assertEqual(lib.synopsis, synopsis) + self.assertEqual(len(lib.content_infos), 1) # root folder + self.assertEqual(len(lib.folder_ids), 1) + self.assertEqual(len(lib.dataset_ids), 0) + self.assertIn(lib.id, [_.id for _ in self.gi.libraries.list()]) + lib.delete() + self.assertFalse(lib.is_mapped) + + def test_workflow_from_str(self): + with open(SAMPLE_FN) as f: + wf = self.gi.workflows.import_new(f.read()) + self._check_and_del_workflow(wf) + + def test_workflow_collections_from_str(self): + with open(SAMPLE_WF_COLL_FN) as f: + wf = self.gi.workflows.import_new(f.read()) + self._check_and_del_workflow(wf) + + @test_util.skip_unless_galaxy('release_19.01') + def test_workflow_parameter_input(self): + with open(SAMPLE_WF_PARAMETER_INPUT_FN) as f: + self.gi.workflows.import_new(f.read()) + + def test_workflow_from_dict(self): + with open(SAMPLE_FN) as f: + wf = self.gi.workflows.import_new(json.load(f)) + self._check_and_del_workflow(wf) + + def test_workflow_publish_from_dict(self): + with open(SAMPLE_FN) as f: + wf = self.gi.workflows.import_new(json.load(f), publish=True) + self._check_and_del_workflow(wf, check_is_public=True) + + def test_workflow_missing_tools(self): + with open(SAMPLE_FN) as f: + wf_dump = json.load(f) + wf_info = self.gi.gi.workflows.import_workflow_dict(wf_dump) + wf_dict = self.gi.gi.workflows.show_workflow(wf_info['id']) + for id_, step in wf_dict['steps'].items(): + if step['type'] == 'tool': + for k in 'tool_inputs', 'tool_version': + wf_dict['steps'][id_][k] = None + wf = wrappers.Workflow(wf_dict, gi=self.gi) + self.assertFalse(wf.is_runnable) + self.assertRaises(RuntimeError, wf.run) + wf.delete() + + def test_workflow_export(self): + with open(SAMPLE_FN) as f: + wf1 = self.gi.workflows.import_new(f.read()) + wf2 = self.gi.workflows.import_new(wf1.export()) + self.assertNotEqual(wf1.id, wf2.id) + for wf in wf1, wf2: + self._check_and_del_workflow(wf) + + def _check_and_del_workflow(self, wf, check_is_public=False): + # Galaxy appends additional text to imported workflow names + self.assertTrue(wf.name.startswith('paste_columns')) + self.assertEqual(len(wf.steps), 3) + for step_id, step in wf.steps.items(): + self.assertIsInstance(step, wrappers.Step) + self.assertEqual(step_id, step.id) + self.assertIsInstance(step.tool_inputs, dict) + if step.type == 'tool': + self.assertIsNotNone(step.tool_id) + self.assertIsNotNone(step.tool_version) + self.assertIsInstance(step.input_steps, dict) + elif step.type in ('data_collection_input', 'data_input'): + self.assertIsNone(step.tool_id) + self.assertIsNone(step.tool_version) + self.assertEqual(step.input_steps, {}) + wf_ids = {_.id for _ in self.gi.workflows.list()} + self.assertIn(wf.id, wf_ids) + if check_is_public: + self.assertTrue(wf.published) + wf.delete() + + # not very accurate: + # * we can't publish a wf from the API + # * we can't directly get another user's wf + def test_workflow_from_shared(self): + all_prevs = dict( + (_.id, _) for _ in self.gi.workflows.get_previews(published=True) + ) + pub_only_ids = set(all_prevs).difference( + _.id for _ in self.gi.workflows.get_previews()) + if pub_only_ids: + wf_id = pub_only_ids.pop() + imported = self.gi.workflows.import_shared(wf_id) + self.assertIsInstance(imported, wrappers.Workflow) + imported.delete() + else: + self.skipTest('no published workflows, manually publish a workflow to run this test') + + def test_get_libraries(self): + self._test_multi_get('library') + + def test_get_histories(self): + self._test_multi_get('history') + + def test_get_workflows(self): + self._test_multi_get('workflow') + + def _normalized_functions(self, obj_type): + if obj_type == 'library': + create = self.gi.libraries.create + get_objs = self.gi.libraries.list + get_prevs = self.gi.libraries.get_previews + del_kwargs = {} + elif obj_type == 'history': + create = self.gi.histories.create + get_objs = self.gi.histories.list + get_prevs = self.gi.histories.get_previews + del_kwargs = {'purge': True} + elif obj_type == 'workflow': + def create(name): + with open(SAMPLE_FN) as f: + d = json.load(f) + d['name'] = name + return self.gi.workflows.import_new(d) + + get_objs = self.gi.workflows.list + get_prevs = self.gi.workflows.get_previews + del_kwargs = {} + return create, get_objs, get_prevs, del_kwargs + + def _test_multi_get(self, obj_type): + create, get_objs, get_prevs, del_kwargs = self._normalized_functions( + obj_type) + + def ids(seq): + return set(_.id for _ in seq) + + names = ['test_%s' % uuid.uuid4().hex for _ in range(2)] + objs = [] + try: + objs = [create(_) for _ in names] + self.assertLessEqual(ids(objs), ids(get_objs())) + if obj_type != 'workflow': + filtered = get_objs(name=names[0]) + self.assertEqual(len(filtered), 1) + self.assertEqual(filtered[0].id, objs[0].id) + del_id = objs[-1].id + objs.pop().delete(**del_kwargs) + self.assertIn(del_id, ids(get_prevs(deleted=True))) + else: + # Galaxy appends info strings to imported workflow names + prev = get_prevs()[0] + filtered = get_objs(name=prev.name) + self.assertEqual(len(filtered), 1) + self.assertEqual(filtered[0].id, prev.id) + finally: + for o in objs: + o.delete(**del_kwargs) + + def test_delete_libraries_by_name(self): + self._test_delete_by_name('library') + + def test_delete_histories_by_name(self): + self._test_delete_by_name('history') + + def test_delete_workflows_by_name(self): + self._test_delete_by_name('workflow') + + def _test_delete_by_name(self, obj_type): + create, _, get_prevs, del_kwargs = self._normalized_functions( + obj_type) + name = 'test_%s' % uuid.uuid4().hex + objs = [create(name) for _ in range(2)] # noqa: F812 + final_name = objs[0].name + prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted] + self.assertEqual(len(prevs), len(objs)) + del_kwargs['name'] = final_name + objs[0].gi_module.delete(**del_kwargs) + prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted] + self.assertEqual(len(prevs), 0) + + +class TestLibrary(GalaxyObjectsTestBase): + # just something that can be expected to be always up + DS_URL = 'https://tools.ietf.org/rfc/rfc1866.txt' + + def setUp(self): + super().setUp() + self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) + + def tearDown(self): + self.lib.delete() + + def test_root_folder(self): + r = self.lib.root_folder + self.assertIsNone(r.parent) + + def test_folder(self): + name, desc = 'test_%s' % uuid.uuid4().hex, 'D' + folder = self.lib.create_folder(name, description=desc) + self.assertEqual(folder.name, name) + self.assertEqual(folder.description, desc) + self.assertIs(folder.container, self.lib) + self.assertEqual(folder.parent.id, self.lib.root_folder.id) + self.assertEqual(len(self.lib.content_infos), 2) + self.assertEqual(len(self.lib.folder_ids), 2) + self.assertIn(folder.id, self.lib.folder_ids) + retrieved = self.lib.get_folder(folder.id) + self.assertEqual(folder.id, retrieved.id) + + def _check_datasets(self, dss): + self.assertEqual(len(dss), len(self.lib.dataset_ids)) + self.assertEqual(set(_.id for _ in dss), set(self.lib.dataset_ids)) + for ds in dss: + self.assertIs(ds.container, self.lib) + + def test_dataset(self): + folder = self.lib.create_folder('test_%s' % uuid.uuid4().hex) + ds = self.lib.upload_data(FOO_DATA, folder=folder) + self.assertEqual(len(self.lib.content_infos), 3) + self.assertEqual(len(self.lib.folder_ids), 2) + self._check_datasets([ds]) + + def test_dataset_from_url(self): + if is_reachable(self.DS_URL): + ds = self.lib.upload_from_url(self.DS_URL) + self._check_datasets([ds]) + else: + self.skipTest('%s not reachable' % self.DS_URL) + + def test_dataset_from_local(self): + with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f: + f.write(FOO_DATA) + f.flush() + ds = self.lib.upload_from_local(f.name) + self._check_datasets([ds]) + + def test_datasets_from_fs(self): + bnames = ['f%d.txt' % i for i in range(2)] + dss, fnames = upload_from_fs(self.lib, bnames) + self._check_datasets(dss) + dss, fnames = upload_from_fs( + self.lib, bnames, link_data_only='link_to_files') + for ds, fn in zip(dss, fnames): + self.assertEqual(ds.file_name, fn) + + def test_copy_from_dataset(self): + hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) + try: + hda = hist.paste_content(FOO_DATA) + ds = self.lib.copy_from_dataset(hda) + finally: + hist.delete(purge=True) + self._check_datasets([ds]) + + def test_get_dataset(self): + ds = self.lib.upload_data(FOO_DATA) + retrieved = self.lib.get_dataset(ds.id) + self.assertEqual(ds.id, retrieved.id) + + def test_get_datasets(self): + bnames = ['f%d.txt' % _ for _ in range(2)] + dss, _ = upload_from_fs(self.lib, bnames) + retrieved = self.lib.get_datasets() + self.assertEqual(len(dss), len(retrieved)) + self.assertEqual(set(_.id for _ in dss), set(_.id for _ in retrieved)) + name = '/%s' % bnames[0] + selected = self.lib.get_datasets(name=name) + self.assertEqual(len(selected), 1) + self.assertEqual(selected[0].name, bnames[0]) + + +class TestLDContents(GalaxyObjectsTestBase): + + def setUp(self): + super().setUp() + self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) + self.ds = self.lib.upload_data(FOO_DATA) + self.ds.wait() + + def tearDown(self): + self.lib.delete() + + def test_dataset_get_stream(self): + for idx, c in enumerate(self.ds.get_stream(chunk_size=1)): + self.assertEqual(FOO_DATA[idx].encode(), c) + + def test_dataset_peek(self): + fetched_data = self.ds.peek(chunk_size=4) + self.assertEqual(FOO_DATA[0:4].encode(), fetched_data) + + def test_dataset_download(self): + with tempfile.TemporaryFile() as f: + self.ds.download(f) + f.seek(0) + self.assertEqual(FOO_DATA.encode(), f.read()) + + def test_dataset_get_contents(self): + self.assertEqual(FOO_DATA.encode(), self.ds.get_contents()) + + def test_dataset_delete(self): + self.ds.delete() + # Cannot test this yet because the 'deleted' attribute is not exported + # by the API at the moment + # self.assertTrue(self.ds.deleted) + + def test_dataset_update(self): + new_name = 'test_%s' % uuid.uuid4().hex + new_misc_info = 'Annotation for %s' % new_name + new_genome_build = 'hg19' + updated_ldda = self.ds.update(name=new_name, misc_info=new_misc_info, genome_build=new_genome_build) + self.assertEqual(self.ds.id, updated_ldda.id) + self.assertEqual(self.ds.name, new_name) + self.assertEqual(self.ds.misc_info, new_misc_info) + self.assertEqual(self.ds.genome_build, new_genome_build) + + +class TestHistory(GalaxyObjectsTestBase): + + def setUp(self): + super().setUp() + self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) + + def tearDown(self): + self.hist.delete(purge=True) + + def test_create_delete(self): + name = 'test_%s' % uuid.uuid4().hex + hist = self.gi.histories.create(name) + self.assertEqual(hist.name, name) + hist_id = hist.id + self.assertIn(hist_id, [_.id for _ in self.gi.histories.list()]) + hist.delete(purge=True) + self.assertFalse(hist.is_mapped) + h = self.gi.histories.get(hist_id) + self.assertTrue(h.deleted) + + def _check_dataset(self, hda): + self.assertIsInstance(hda, wrappers.HistoryDatasetAssociation) + self.assertIs(hda.container, self.hist) + self.assertEqual(len(self.hist.dataset_ids), 1) + self.assertEqual(self.hist.dataset_ids[0], hda.id) + + def test_import_dataset(self): + lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) + lds = lib.upload_data(FOO_DATA) + self.assertEqual(len(self.hist.dataset_ids), 0) + hda = self.hist.import_dataset(lds) + lib.delete() + self._check_dataset(hda) + + def test_upload_file(self): + with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f: + f.write(FOO_DATA) + f.flush() + hda = self.hist.upload_file(f.name) + self._check_dataset(hda) + + def test_paste_content(self): + hda = self.hist.paste_content(FOO_DATA) + self._check_dataset(hda) + + def test_get_dataset(self): + hda = self.hist.paste_content(FOO_DATA) + retrieved = self.hist.get_dataset(hda.id) + self.assertEqual(hda.id, retrieved.id) + + def test_get_datasets(self): + bnames = ['f%d.txt' % _ for _ in range(2)] + lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) + lds = upload_from_fs(lib, bnames)[0] + hdas = [self.hist.import_dataset(_) for _ in lds] + lib.delete() + retrieved = self.hist.get_datasets() + self.assertEqual(len(hdas), len(retrieved)) + self.assertEqual(set(_.id for _ in hdas), set(_.id for _ in retrieved)) + selected = self.hist.get_datasets(name=bnames[0]) + self.assertEqual(len(selected), 1) + self.assertEqual(selected[0].name, bnames[0]) + + def test_export_and_download(self): + jeha_id = self.hist.export(wait=True, maxwait=60) + self.assertTrue(jeha_id) + tempdir = tempfile.mkdtemp(prefix='bioblend_test_') + temp_fn = os.path.join(tempdir, 'export.tar.gz') + try: + with open(temp_fn, 'wb') as fo: + self.hist.download(jeha_id, fo) + self.assertTrue(tarfile.is_tarfile(temp_fn)) + finally: + shutil.rmtree(tempdir) + + def test_update(self): + new_name = 'test_%s' % uuid.uuid4().hex + new_annotation = 'Annotation for %s' % new_name + new_tags = ['tag1', 'tag2'] + updated_hist = self.hist.update(name=new_name, annotation=new_annotation, tags=new_tags) + self.assertEqual(self.hist.id, updated_hist.id) + self.assertEqual(self.hist.name, new_name) + self.assertEqual(self.hist.annotation, new_annotation) + self.assertEqual(self.hist.tags, new_tags) + updated_hist = self.hist.update(published=True) + self.assertEqual(self.hist.id, updated_hist.id) + self.assertTrue(self.hist.published) + + def test_create_dataset_collection(self): + self._create_collection_description() + hdca = self.hist.create_dataset_collection(self.collection_description) + self.assertIsInstance(hdca, wrappers.HistoryDatasetCollectionAssociation) + self.assertEqual(hdca.collection_type, 'list') + self.assertIs(hdca.container, self.hist) + self.assertEqual(len(hdca.elements), 2) + self.assertEqual(self.dataset1.id, hdca.elements[0]['object']['id']) + self.assertEqual(self.dataset2.id, hdca.elements[1]['object']['id']) + + def test_delete_dataset_collection(self): + self._create_collection_description() + hdca = self.hist.create_dataset_collection(self.collection_description) + hdca.delete() + self.assertTrue(hdca.deleted) + + def _create_collection_description(self): + self.dataset1 = self.hist.paste_content(FOO_DATA) + self.dataset2 = self.hist.paste_content(FOO_DATA_2) + self.collection_description = dataset_collections.CollectionDescription( + name="MyDatasetList", + elements=[ + dataset_collections.HistoryDatasetElement(name="sample1", id=self.dataset1.id), + dataset_collections.HistoryDatasetElement(name="sample2", id=self.dataset2.id), + ] + ) + + +class TestHDAContents(GalaxyObjectsTestBase): + + def setUp(self): + super().setUp() + self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) + self.ds = self.hist.paste_content(FOO_DATA) + self.ds.wait() + + def tearDown(self): + self.hist.delete(purge=True) + + def test_dataset_get_stream(self): + for idx, c in enumerate(self.ds.get_stream(chunk_size=1)): + self.assertEqual(FOO_DATA[idx].encode(), c) + + def test_dataset_peek(self): + fetched_data = self.ds.peek(chunk_size=4) + self.assertEqual(FOO_DATA[0:4].encode(), fetched_data) + + def test_dataset_download(self): + with tempfile.TemporaryFile() as f: + self.ds.download(f) + f.seek(0) + self.assertEqual(FOO_DATA.encode(), f.read()) + + def test_dataset_get_contents(self): + self.assertEqual(FOO_DATA.encode(), self.ds.get_contents()) + + def test_dataset_update(self): + new_name = 'test_%s' % uuid.uuid4().hex + new_annotation = 'Annotation for %s' % new_name + new_genome_build = 'hg19' + updated_hda = self.ds.update(name=new_name, annotation=new_annotation, genome_build=new_genome_build) + self.assertEqual(self.ds.id, updated_hda.id) + self.assertEqual(self.ds.name, new_name) + self.assertEqual(self.ds.annotation, new_annotation) + self.assertEqual(self.ds.genome_build, new_genome_build) + + def test_dataset_delete(self): + self.ds.delete() + self.assertTrue(self.ds.deleted) + self.assertFalse(self.ds.purged) + + def test_dataset_purge(self): + self.ds.delete(purge=True) + self.assertTrue(self.ds.deleted) + self.assertTrue(self.ds.purged) + + +class TestRunWorkflow(GalaxyObjectsTestBase): + + def setUp(self): + super().setUp() + self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex) + with open(SAMPLE_FN) as f: + self.wf = self.gi.workflows.import_new(f.read()) + self.contents = ['one\ntwo\n', '1\n2\n'] + self.inputs = [self.lib.upload_data(_) for _ in self.contents] + + def tearDown(self): + self.wf.delete() + self.lib.delete() + + def _test(self, existing_hist=False, params=False): + hist_name = 'test_%s' % uuid.uuid4().hex + if existing_hist: + hist = self.gi.histories.create(hist_name) + else: + hist = hist_name + if params: + params = {'Paste1': {'delimiter': 'U'}} + sep = '_' # 'U' maps to '_' in the paste tool + else: + params = None + sep = '\t' # default + input_map = {'Input 1': self.inputs[0], 'Input 2': self.inputs[1]} + sys.stderr.write(os.linesep) + outputs, out_hist = self.wf.run( + input_map, hist, params=params, wait=True, polling_interval=1) + self.assertEqual(len(outputs), 1) + out_ds = outputs[0] + self.assertIn(out_ds.id, out_hist.dataset_ids) + res = out_ds.get_contents() + exp_rows = zip(*(_.splitlines() for _ in self.contents)) + exp_res = ("\n".join(sep.join(t) for t in exp_rows) + "\n").encode() + self.assertEqual(res, exp_res) + if existing_hist: + self.assertEqual(out_hist.id, hist.id) + out_hist.delete(purge=True) + + def test_existing_history(self): + self._test(existing_hist=True) + + def test_new_history(self): + self._test(existing_hist=False) + + def test_params(self): + self._test(params=True) + + +class TestRunDatasetCollectionWorkflow(GalaxyObjectsTestBase): + + def setUp(self): + super().setUp() + with open(SAMPLE_WF_COLL_FN) as f: + self.wf = self.gi.workflows.import_new(f.read()) + self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex) + + def tearDown(self): + self.wf.delete() + self.hist.delete(purge=True) + + def test_run_workflow_with_dataset_collection(self): + dataset1 = self.hist.paste_content(FOO_DATA) + dataset2 = self.hist.paste_content(FOO_DATA_2) + collection_description = dataset_collections.CollectionDescription( + name="MyDatasetList", + elements=[ + dataset_collections.HistoryDatasetElement(name="sample1", id=dataset1.id), + dataset_collections.HistoryDatasetElement(name="sample2", id=dataset2.id), + ] + ) + dataset_collection = self.hist.create_dataset_collection(collection_description) + input_map = {"Input Dataset Collection": dataset_collection, + "Input 2": dataset1} + outputs, out_hist = self.wf.run(input_map, self.hist, wait=True) + self.assertEqual(len(outputs), 1) + out_hdca = outputs[0] + self.assertIsInstance(out_hdca, wrappers.HistoryDatasetCollectionAssociation) + self.assertEqual(out_hdca.collection_type, 'list') + self.assertEqual(len(out_hdca.elements), 2) + self.assertEqual(out_hist.id, self.hist.id) + + +class TestJob(GalaxyObjectsTestBase): + + def test_get(self): + job_prevs = self.gi.jobs.get_previews() + if len(job_prevs) > 0: + job_prev = job_prevs[0] + self.assertIsInstance(job_prev, wrappers.JobPreview) + job = self.gi.jobs.get(job_prev.id) + self.assertIsInstance(job, wrappers.Job) + self.assertEqual(job.id, job_prev.id) + for job in self.gi.jobs.list(): + self.assertIsInstance(job, wrappers.Job) + + +def suite(): + loader = unittest.TestLoader() + s = unittest.TestSuite() + s.addTests([loader.loadTestsFromTestCase(c) for c in ( + TestWrapper, + TestWorkflow, + TestGalaxyInstance, + TestLibrary, + TestLDContents, + TestHistory, + TestHDAContents, + TestRunWorkflow, + )]) + return s + + +if __name__ == '__main__': + tests = suite() + RUNNER = unittest.TextTestRunner(verbosity=2) + RUNNER.run(tests)