Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/bioblend/galaxy/objects/wrappers.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
| author | shellac | 
|---|---|
| date | Thu, 14 May 2020 14:56:58 -0400 | 
| parents | 26e78fe6e8c4 | 
| children | 
line wrap: on
 line source
# pylint: disable=W0622,E1101 """ A basic object-oriented interface for Galaxy entities. """ import abc import collections import json import six import bioblend __all__ = ( 'Wrapper', 'Step', 'Workflow', 'ContentInfo', 'LibraryContentInfo', 'HistoryContentInfo', 'DatasetContainer', 'History', 'Library', 'Folder', 'Dataset', 'HistoryDatasetAssociation', 'DatasetCollection', 'HistoryDatasetCollectionAssociation', 'LibraryDatasetDatasetAssociation', 'LibraryDataset', 'Tool', 'Job', 'Preview', 'LibraryPreview', 'HistoryPreview', 'WorkflowPreview', ) @six.add_metaclass(abc.ABCMeta) class Wrapper(object): """ Abstract base class for Galaxy entity wrappers. Wrapper instances wrap deserialized JSON dictionaries such as the ones obtained by the Galaxy web API, converting key-based access to attribute-based access (e.g., ``library['name'] -> library.name``). Dict keys that are converted to attributes are listed in the ``BASE_ATTRS`` class variable: this is the 'stable' interface. Note that the wrapped dictionary is accessible via the ``wrapped`` attribute. """ BASE_ATTRS = ('id', 'name') @abc.abstractmethod def __init__(self, wrapped, parent=None, gi=None): """ :type wrapped: dict :param wrapped: JSON-serializable dictionary :type parent: :class:`Wrapper` :param parent: the parent of this wrapper :type gi: :class:`GalaxyInstance` :param gi: the GalaxyInstance through which we can access this wrapper """ if not isinstance(wrapped, collections.Mapping): raise TypeError('wrapped object must be a mapping type') # loads(dumps(x)) is a bit faster than deepcopy and allows type checks try: dumped = json.dumps(wrapped) except (TypeError, ValueError): raise ValueError('wrapped object must be JSON-serializable') object.__setattr__(self, 'wrapped', json.loads(dumped)) for k in self.BASE_ATTRS: object.__setattr__(self, k, self.wrapped.get(k)) object.__setattr__(self, '_cached_parent', parent) object.__setattr__(self, 'is_modified', False) object.__setattr__(self, 'gi', gi) @abc.abstractproperty def gi_module(self): """ The GalaxyInstance module that deals with objects of this type. """ pass @property def parent(self): """ The parent of this wrapper. """ return self._cached_parent @property def is_mapped(self): """ ``True`` if this wrapper is mapped to an actual Galaxy entity. """ return self.id is not None def unmap(self): """ Disconnect this wrapper from Galaxy. """ object.__setattr__(self, 'id', None) def clone(self): """ Return an independent copy of this wrapper. """ return self.__class__(self.wrapped) def touch(self): """ Mark this wrapper as having been modified since its creation. """ object.__setattr__(self, 'is_modified', True) if self.parent: self.parent.touch() def to_json(self): """ Return a JSON dump of this wrapper. """ return json.dumps(self.wrapped) @classmethod def from_json(cls, jdef): """ Build a new wrapper from a JSON dump. """ return cls(json.loads(jdef)) # FIXME: things like self.x[0] = 'y' do NOT call self.__setattr__ def __setattr__(self, name, value): if name not in self.wrapped: raise AttributeError("can't set attribute") else: self.wrapped[name] = value object.__setattr__(self, name, value) self.touch() def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self.wrapped) class Step(Wrapper): """ Abstract base class for workflow steps. Steps are the main building blocks of a Galaxy workflow. A step can be: an input (type 'data_collection_input` or 'data_input`), a computational tool (type 'tool`) or a pause (type 'pause`). """ BASE_ATTRS = Wrapper.BASE_ATTRS + ( 'input_steps', 'tool_id', 'tool_inputs', 'tool_version', 'type' ) def __init__(self, step_dict, parent): super(Step, self).__init__(step_dict, parent=parent, gi=parent.gi) try: stype = step_dict['type'] except KeyError: raise ValueError('not a step dict') if stype not in set(['data_collection_input', 'data_input', 'pause', 'tool']): raise ValueError('Unknown step type: %r' % stype) if self.type == 'tool' and self.tool_inputs: for k, v in six.iteritems(self.tool_inputs): # In Galaxy before release_17.05, v is a JSON-encoded string if not isinstance(v, six.string_types): break try: self.tool_inputs[k] = json.loads(v) except ValueError: break @property def gi_module(self): return self.gi.workflows class Workflow(Wrapper): """ Workflows represent ordered sequences of computations on Galaxy. A workflow defines a sequence of steps that produce one or more results from an input dataset. """ BASE_ATTRS = Wrapper.BASE_ATTRS + ( 'deleted', 'inputs', 'published', 'steps', 'tags' ) POLLING_INTERVAL = 10 # for output state monitoring def __init__(self, wf_dict, gi=None): super(Workflow, self).__init__(wf_dict, gi=gi) missing_ids = [] if gi: tools_list_by_id = [t.id for t in gi.tools.get_previews()] else: tools_list_by_id = [] tool_labels_to_ids = {} for k, v in six.iteritems(self.steps): # convert step ids to str for consistency with outer keys v['id'] = str(v['id']) for i in six.itervalues(v['input_steps']): i['source_step'] = str(i['source_step']) step = Step(v, self) self.steps[k] = step if step.type == 'tool': if not step.tool_inputs or step.tool_id not in tools_list_by_id: missing_ids.append(k) tool_labels_to_ids.setdefault(step.tool_id, set()).add(step.id) input_labels_to_ids = {} for id_, d in six.iteritems(self.inputs): input_labels_to_ids.setdefault(d['label'], set()).add(id_) object.__setattr__(self, 'input_labels_to_ids', input_labels_to_ids) object.__setattr__(self, 'tool_labels_to_ids', tool_labels_to_ids) dag, inv_dag = self._get_dag() heads, tails = set(dag), set(inv_dag) object.__setattr__(self, 'dag', dag) object.__setattr__(self, 'inv_dag', inv_dag) object.__setattr__(self, 'source_ids', heads - tails) assert set(self.inputs) == self.data_collection_input_ids | self.data_input_ids, \ "inputs is %r, while data_collection_input_ids is %r and data_input_ids is %r" % (self.inputs, self.data_collection_input_ids, self.data_input_ids) object.__setattr__(self, 'sink_ids', tails - heads) object.__setattr__(self, 'missing_ids', missing_ids) @property def gi_module(self): return self.gi.workflows def _get_dag(self): """ Return the workflow's DAG. For convenience, this method computes a 'direct' (step => successors) and an 'inverse' (step => predecessors) representation of the same DAG. For instance, a workflow with a single tool *c*, two inputs *a, b* and three outputs *d, e, f* is represented by (direct):: {'a': {'c'}, 'b': {'c'}, 'c': set(['d', 'e', 'f'])} and by (inverse):: {'c': set(['a', 'b']), 'd': {'c'}, 'e': {'c'}, 'f': {'c'}} """ dag, inv_dag = {}, {} for s in six.itervalues(self.steps): for i in six.itervalues(s.input_steps): head, tail = i['source_step'], s.id dag.setdefault(head, set()).add(tail) inv_dag.setdefault(tail, set()).add(head) return dag, inv_dag def sorted_step_ids(self): """ Return a topological sort of the workflow's DAG. """ ids = [] source_ids = self.source_ids.copy() inv_dag = dict((k, v.copy()) for k, v in six.iteritems(self.inv_dag)) while source_ids: head = source_ids.pop() ids.append(head) for tail in self.dag.get(head, []): incoming = inv_dag[tail] incoming.remove(head) if not incoming: source_ids.add(tail) return ids @property def data_input_ids(self): """ Return the ids of data input steps for this workflow. """ return set(id_ for id_, s in six.iteritems(self.steps) if s.type == 'data_input') @property def data_collection_input_ids(self): """ Return the ids of data collection input steps for this workflow. """ return set(id_ for id_, s in six.iteritems(self.steps) if s.type == 'data_collection_input') @property def tool_ids(self): """ Return the ids of tool steps for this workflow. """ return set(id_ for id_, s in six.iteritems(self.steps) if s.type == 'tool') @property def input_labels(self): """ Return the labels of this workflow's input steps. """ return set(self.input_labels_to_ids) @property def is_runnable(self): """ Return True if the workflow can be run on Galaxy. A workflow is considered runnable on a Galaxy instance if all of the tools it uses are installed in that instance. """ return not self.missing_ids def convert_input_map(self, input_map): """ Convert ``input_map`` to the format required by the Galaxy web API. :type input_map: dict :param input_map: a mapping from input labels to datasets :rtype: dict :return: a mapping from input slot ids to dataset ids in the format required by the Galaxy web API. """ m = {} for label, slot_ids in six.iteritems(self.input_labels_to_ids): datasets = input_map.get(label, []) if not isinstance(datasets, collections.Iterable): datasets = [datasets] if len(datasets) < len(slot_ids): raise RuntimeError('not enough datasets for "%s"' % label) for id_, ds in zip(slot_ids, datasets): m[id_] = {'id': ds.id, 'src': ds.SRC} return m def preview(self): getf = self.gi.workflows.get_previews try: p = [_ for _ in getf(published=True) if _.id == self.id][0] except IndexError: raise ValueError('no object for id %s' % self.id) return p def run(self, input_map=None, history='', params=None, import_inputs=False, replacement_params=None, wait=False, polling_interval=POLLING_INTERVAL, break_on_error=True): """ Run the workflow in the current Galaxy instance. :type input_map: dict :param input_map: a mapping from workflow input labels to datasets, e.g.: ``dict(zip(workflow.input_labels, library.get_datasets()))`` :type history: :class:`History` or str :param history: either a valid history object (results will be stored there) or a string (a new history will be created with the given name). :type params: dict :param params: a mapping of non-datasets tool parameters (see below) :type import_inputs: bool :param import_inputs: If ``True``, workflow inputs will be imported into the history; if ``False``, only workflow outputs will be visible in the history. :type replacement_params: dict :param replacement_params: pattern-based replacements for post-job actions (see the docs for :meth:`~bioblend.galaxy.workflows.WorkflowClient.invoke_workflow`) :type wait: bool :param wait: whether to wait while the returned datasets are in a pending state :type polling_interval: float :param polling_interval: polling interval in seconds :type break_on_error: bool :param break_on_error: whether to break as soon as at least one of the returned datasets is in the 'error' state :rtype: tuple :return: list of output datasets, output history The ``params`` dict should be specified as follows:: {STEP_ID: PARAM_DICT, ...} where PARAM_DICT is:: {PARAM_NAME: VALUE, ...} For backwards compatibility, the following (deprecated) format is also supported for ``params``:: {TOOL_ID: PARAM_DICT, ...} in which case PARAM_DICT affects all steps with the given tool id. If both by-tool-id and by-step-id specifications are used, the latter takes precedence. Finally (again, for backwards compatibility), PARAM_DICT can also be specified as:: {'param': PARAM_NAME, 'value': VALUE} Note that this format allows only one parameter to be set per step. Example: set 'a' to 1 for the third workflow step:: params = {workflow.steps[2].id: {'a': 1}} .. warning:: This is a blocking operation that can take a very long time. If ``wait`` is set to ``False``, the method will return as soon as the workflow has been *scheduled*, otherwise it will wait until the workflow has been *run*. With a large number of steps, however, the delay may not be negligible even in the former case (e.g. minutes for 100 steps). """ if not self.is_mapped: raise RuntimeError('workflow is not mapped to a Galaxy object') if not self.is_runnable: raise RuntimeError('workflow has missing tools: %s' % ', '.join( '%s[%s]' % (self.steps[_].tool_id, _) for _ in self.missing_ids)) kwargs = { 'dataset_map': self.convert_input_map(input_map or {}), 'params': params, 'import_inputs_to_history': import_inputs, 'replacement_params': replacement_params, } if isinstance(history, History): try: kwargs['history_id'] = history.id except AttributeError: raise RuntimeError('history does not have an id') elif isinstance(history, six.string_types): kwargs['history_name'] = history else: raise TypeError( 'history must be either a history wrapper or a string') res = self.gi.gi.workflows.run_workflow(self.id, **kwargs) # res structure: {'history': HIST_ID, 'outputs': [CI_ID, CI_ID, ...]} out_hist = self.gi.histories.get(res['history']) content_infos_dict = dict() for ci in out_hist.content_infos: content_infos_dict[ci.id] = ci outputs = [] for output_id in res['outputs']: if content_infos_dict[output_id].type == 'file': outputs.append(out_hist.get_dataset(output_id)) elif content_infos_dict[output_id].type == 'collection': outputs.append(out_hist.get_dataset_collection(output_id)) if wait: self.gi._wait_datasets(outputs, polling_interval=polling_interval, break_on_error=break_on_error) return outputs, out_hist def export(self): """ Export a re-importable representation of the workflow. :rtype: dict :return: a JSON-serializable dump of the workflow """ return self.gi.gi.workflows.export_workflow_dict(self.id) def delete(self): """ Delete this workflow. .. warning:: Deleting a workflow is irreversible - all of the data from the workflow will be permanently deleted. """ self.gi.workflows.delete(id_=self.id) self.unmap() @six.add_metaclass(abc.ABCMeta) class Dataset(Wrapper): """ Abstract base class for Galaxy datasets. """ BASE_ATTRS = Wrapper.BASE_ATTRS + ( 'data_type', 'file_ext', 'file_name', 'file_size', 'genome_build', 'misc_info', 'state' ) POLLING_INTERVAL = 1 # for state monitoring @abc.abstractmethod def __init__(self, ds_dict, container, gi=None): super(Dataset, self).__init__(ds_dict, gi=gi) object.__setattr__(self, 'container', container) @property def container_id(self): """ Deprecated property. Id of the dataset container. Use :attr:`.container.id` instead. """ return self.container.id @abc.abstractproperty def _stream_url(self): """ Return the URL to stream this dataset. """ pass def get_stream(self, chunk_size=bioblend.CHUNK_SIZE): """ Open dataset for reading and return an iterator over its contents. :type chunk_size: int :param chunk_size: read this amount of bytes at a time """ kwargs = {'stream': True} if isinstance(self, LibraryDataset): kwargs['params'] = {'ld_ids%5B%5D': self.id} r = self.gi.gi.make_get_request(self._stream_url, **kwargs) if isinstance(self, LibraryDataset) and r.status_code == 500: # compatibility with older Galaxy releases kwargs['params'] = {'ldda_ids%5B%5D': self.id} r = self.gi.gi.make_get_request(self._stream_url, **kwargs) r.raise_for_status() return r.iter_content(chunk_size) # FIXME: client can't close r def peek(self, chunk_size=bioblend.CHUNK_SIZE): """ Open dataset for reading and return the first chunk. See :meth:`.get_stream` for param info. """ try: return next(self.get_stream(chunk_size=chunk_size)) except StopIteration: return b'' def download(self, file_object, chunk_size=bioblend.CHUNK_SIZE): """ Open dataset for reading and save its contents to ``file_object``. :type file_object: file :param file_object: output file object See :meth:`.get_stream` for info on other params. """ for chunk in self.get_stream(chunk_size=chunk_size): file_object.write(chunk) def get_contents(self, chunk_size=bioblend.CHUNK_SIZE): """ Open dataset for reading and return its **full** contents. See :meth:`.get_stream` for param info. """ return b''.join(self.get_stream(chunk_size=chunk_size)) def refresh(self): """ Re-fetch the attributes pertaining to this object. Returns: self """ gi_client = getattr(self.gi.gi, self.container.API_MODULE) ds_dict = gi_client.show_dataset(self.container.id, self.id) self.__init__(ds_dict, self.container, self.gi) return self def wait(self, polling_interval=POLLING_INTERVAL, break_on_error=True): """ Wait for this dataset to come out of the pending states. :type polling_interval: float :param polling_interval: polling interval in seconds :type break_on_error: bool :param break_on_error: if ``True``, raise a RuntimeError exception if the dataset ends in the 'error' state. .. warning:: This is a blocking operation that can take a very long time. Also, note that this method does not return anything; however, this dataset is refreshed (possibly multiple times) during the execution. """ self.gi._wait_datasets([self], polling_interval=polling_interval, break_on_error=break_on_error) class HistoryDatasetAssociation(Dataset): """ Maps to a Galaxy ``HistoryDatasetAssociation``. """ BASE_ATTRS = Dataset.BASE_ATTRS + ('annotation', 'deleted', 'purged', 'tags', 'visible') SRC = 'hda' def __init__(self, ds_dict, container, gi=None): super(HistoryDatasetAssociation, self).__init__( ds_dict, container, gi=gi) @property def gi_module(self): return self.gi.histories @property def _stream_url(self): base_url = self.gi.gi._make_url( self.gi.gi.histories, module_id=self.container.id, contents=True) return "%s/%s/display" % (base_url, self.id) def update(self, **kwds): """ Update this history dataset metadata. Some of the attributes that can be modified are documented below. :type name: str :param name: Replace history dataset name with the given string :type genome_build: str :param genome_build: Replace history dataset genome build (dbkey) :type annotation: str :param annotation: Replace history dataset annotation with given string :type deleted: bool :param deleted: Mark or unmark history dataset as deleted :type visible: bool :param visible: Mark or unmark history dataset as visible """ res = self.gi.gi.histories.update_dataset(self.container.id, self.id, **kwds) # Refresh also the history because the dataset may have been (un)deleted self.container.refresh() if 'id' in res: self.__init__(res, self.container, gi=self.gi) else: # for Galaxy < release_15.03 res contains only the updated fields self.refresh() return self def delete(self, purge=False): """ Delete this history dataset. :type purge: bool :param purge: if ``True``, also purge (permanently delete) the dataset .. note:: For the purge option to work, the Galaxy instance must have the ``allow_user_dataset_purge`` option set to ``true`` in the ``config/galaxy.yml`` configuration file. .. warning:: If you purge a dataset which has not been previously deleted, Galaxy from release_15.03 to release_17.01 does not set the ``deleted`` attribute of the dataset to ``True``, see https://github.com/galaxyproject/galaxy/issues/3548 """ self.gi.gi.histories.delete_dataset(self.container.id, self.id, purge=purge) self.container.refresh() self.refresh() @six.add_metaclass(abc.ABCMeta) class DatasetCollection(Wrapper): """ Abstract base class for Galaxy dataset collections. """ BASE_ATTRS = Wrapper.BASE_ATTRS + ( 'state', 'deleted', 'collection_type' ) @abc.abstractmethod def __init__(self, dsc_dict, container, gi=None): super(DatasetCollection, self).__init__(dsc_dict, gi=gi) object.__setattr__(self, 'container', container) def refresh(self): """ Re-fetch the attributes pertaining to this object. Returns: self """ gi_client = getattr(self.gi.gi, self.container.API_MODULE) dsc_dict = gi_client.show_dataset_collection(self.container.id, self.id) self.__init__(dsc_dict, self.container, self.gi) return self class HistoryDatasetCollectionAssociation(DatasetCollection): """ Maps to a Galaxy ``HistoryDatasetCollectionAssociation``. """ BASE_ATTRS = DatasetCollection.BASE_ATTRS + ('tags', 'visible', 'elements') SRC = 'hdca' def __init__(self, dsc_dict, container, gi=None): super(HistoryDatasetCollectionAssociation, self).__init__( dsc_dict, container, gi=gi) @property def gi_module(self): return self.gi.histories def delete(self): """ Delete this dataset collection. """ self.gi.gi.histories.delete_dataset_collection(self.container.id, self.id) self.container.refresh() self.refresh() class LibRelatedDataset(Dataset): """ Base class for LibraryDatasetDatasetAssociation and LibraryDataset classes. """ def __init__(self, ds_dict, container, gi=None): super(LibRelatedDataset, self).__init__(ds_dict, container, gi=gi) @property def gi_module(self): return self.gi.libraries @property def _stream_url(self): base_url = self.gi.gi._make_url(self.gi.gi.libraries) return "%s/datasets/download/uncompressed" % base_url class LibraryDatasetDatasetAssociation(LibRelatedDataset): """ Maps to a Galaxy ``LibraryDatasetDatasetAssociation``. """ BASE_ATTRS = LibRelatedDataset.BASE_ATTRS + ('deleted',) SRC = 'ldda' class LibraryDataset(LibRelatedDataset): """ Maps to a Galaxy ``LibraryDataset``. """ SRC = 'ld' def delete(self, purged=False): """ Delete this library dataset. :type purged: bool :param purged: if ``True``, also purge (permanently delete) the dataset """ self.gi.gi.libraries.delete_library_dataset( self.container.id, self.id, purged=purged) self.container.refresh() self.refresh() def update(self, **kwds): """ Update this library dataset metadata. Some of the attributes that can be modified are documented below. :type name: str :param name: Replace history dataset name with the given string :type genome_build: str :param genome_build: Replace history dataset genome build (dbkey) """ res = self.gi.gi.libraries.update_library_dataset(self.id, **kwds) self.container.refresh() self.__init__(res, self.container, gi=self.gi) return self @six.add_metaclass(abc.ABCMeta) class ContentInfo(Wrapper): """ Instances of this class wrap dictionaries obtained by getting ``/api/{histories,libraries}/<ID>/contents`` from Galaxy. """ BASE_ATTRS = Wrapper.BASE_ATTRS + ('type',) @abc.abstractmethod def __init__(self, info_dict, gi=None): super(ContentInfo, self).__init__(info_dict, gi=gi) class LibraryContentInfo(ContentInfo): """ Instances of this class wrap dictionaries obtained by getting ``/api/libraries/<ID>/contents`` from Galaxy. """ def __init__(self, info_dict, gi=None): super(LibraryContentInfo, self).__init__(info_dict, gi=gi) @property def gi_module(self): return self.gi.libraries class HistoryContentInfo(ContentInfo): """ Instances of this class wrap dictionaries obtained by getting ``/api/histories/<ID>/contents`` from Galaxy. """ BASE_ATTRS = ContentInfo.BASE_ATTRS + ('deleted', 'state', 'visible') def __init__(self, info_dict, gi=None): super(HistoryContentInfo, self).__init__(info_dict, gi=gi) @property def gi_module(self): return self.gi.histories @six.add_metaclass(abc.ABCMeta) class DatasetContainer(Wrapper): """ Abstract base class for dataset containers (histories and libraries). """ BASE_ATTRS = Wrapper.BASE_ATTRS + ('deleted',) @abc.abstractmethod def __init__(self, c_dict, content_infos=None, gi=None): """ :type content_infos: list of :class:`ContentInfo` :param content_infos: info objects for the container's contents """ super(DatasetContainer, self).__init__(c_dict, gi=gi) if content_infos is None: content_infos = [] object.__setattr__(self, 'content_infos', content_infos) @property def dataset_ids(self): """ Return the ids of the contained datasets. """ return [_.id for _ in self.content_infos if _.type == 'file'] def preview(self): getf = self.gi_module.get_previews # self.state could be stale: check both regular and deleted containers try: p = [_ for _ in getf() if _.id == self.id][0] except IndexError: try: p = [_ for _ in getf(deleted=True) if _.id == self.id][0] except IndexError: raise ValueError('no object for id %s' % self.id) return p def refresh(self): """ Re-fetch the attributes pertaining to this object. Returns: self """ fresh = self.gi_module.get(self.id) self.__init__( fresh.wrapped, content_infos=fresh.content_infos, gi=self.gi) return self def get_dataset(self, ds_id): """ Retrieve the dataset corresponding to the given id. :type ds_id: str :param ds_id: dataset id :rtype: :class:`~.HistoryDatasetAssociation` or :class:`~.LibraryDataset` :return: the dataset corresponding to ``ds_id`` """ gi_client = getattr(self.gi.gi, self.API_MODULE) ds_dict = gi_client.show_dataset(self.id, ds_id) return self.DS_TYPE(ds_dict, self, gi=self.gi) def get_datasets(self, name=None): """ Get all datasets contained inside this dataset container. :type name: str :param name: return only datasets with this name :rtype: list of :class:`~.HistoryDatasetAssociation` or list of :class:`~.LibraryDataset` :return: datasets with the given name contained inside this container .. note:: when filtering library datasets by name, specify their full paths starting from the library's root folder, e.g., ``/seqdata/reads.fastq``. Full paths are available through the ``content_infos`` attribute of :class:`~.Library` objects. """ if name is None: ds_ids = self.dataset_ids else: ds_ids = [_.id for _ in self.content_infos if _.name == name] return [self.get_dataset(_) for _ in ds_ids] class History(DatasetContainer): """ Maps to a Galaxy history. """ BASE_ATTRS = DatasetContainer.BASE_ATTRS + ('annotation', 'published', 'state', 'state_ids', 'state_details', 'tags') DS_TYPE = HistoryDatasetAssociation DSC_TYPE = HistoryDatasetCollectionAssociation CONTENT_INFO_TYPE = HistoryContentInfo API_MODULE = 'histories' def __init__(self, hist_dict, content_infos=None, gi=None): super(History, self).__init__( hist_dict, content_infos=content_infos, gi=gi) @property def gi_module(self): return self.gi.histories def update(self, **kwds): """ Update history metadata information. Some of the attributes that can be modified are documented below. :type name: str :param name: Replace history name with the given string :type annotation: str :param annotation: Replace history annotation with the given string :type deleted: bool :param deleted: Mark or unmark history as deleted :type purged: bool :param purged: If True, mark history as purged (permanently deleted). Ignored on Galaxy release_15.01 and earlier :type published: bool :param published: Mark or unmark history as published :type importable: bool :param importable: Mark or unmark history as importable :type tags: list :param tags: Replace history tags with the given list """ # TODO: wouldn't it be better if name and annotation were attributes? self.gi.gi.histories.update_history(self.id, **kwds) self.refresh() return self def delete(self, purge=False): """ Delete this history. :type purge: bool :param purge: if ``True``, also purge (permanently delete) the history .. note:: For the purge option to work, the Galaxy instance must have the ``allow_user_dataset_purge`` option set to ``true`` in the ``config/galaxy.yml`` configuration file. """ self.gi.histories.delete(id_=self.id, purge=purge) try: self.refresh() except Exception: # Galaxy release_15.01 and earlier requires passing 'deleted=False' # when getting the details of a deleted history pass self.unmap() def import_dataset(self, lds): """ Import a dataset into the history from a library. :type lds: :class:`~.LibraryDataset` :param lds: the library dataset to import :rtype: :class:`~.HistoryDatasetAssociation` :return: the imported history dataset """ if not self.is_mapped: raise RuntimeError('history is not mapped to a Galaxy object') if not isinstance(lds, LibraryDataset): raise TypeError('lds is not a LibraryDataset') res = self.gi.gi.histories.upload_dataset_from_library(self.id, lds.id) if not isinstance(res, collections.Mapping): raise RuntimeError( 'upload_dataset_from_library: unexpected reply: %r' % res) self.refresh() return self.get_dataset(res['id']) def upload_file(self, path, **kwargs): """ Upload the file specified by ``path`` to this history. :type path: str :param path: path of the file to upload See :meth:`~bioblend.galaxy.tools.ToolClient.upload_file` for the optional parameters. :rtype: :class:`~.HistoryDatasetAssociation` :return: the uploaded dataset """ out_dict = self.gi.gi.tools.upload_file(path, self.id, **kwargs) self.refresh() return self.get_dataset(out_dict['outputs'][0]['id']) upload_dataset = upload_file def upload_from_ftp(self, path, **kwargs): """ Upload the file specified by ``path`` from the user's FTP directory to this history. :type path: str :param path: path of the file in the user's FTP directory See :meth:`~bioblend.galaxy.tools.ToolClient.upload_file` for the optional parameters. :rtype: :class:`~.HistoryDatasetAssociation` :return: the uploaded dataset """ out_dict = self.gi.gi.tools.upload_from_ftp(path, self.id, **kwargs) self.refresh() return self.get_dataset(out_dict['outputs'][0]['id']) def paste_content(self, content, **kwargs): """ Upload a string to a new dataset in this history. :type content: str :param content: content of the new dataset to upload See :meth:`~bioblend.galaxy.tools.ToolClient.upload_file` for the optional parameters (except file_name). :rtype: :class:`~.HistoryDatasetAssociation` :return: the uploaded dataset """ out_dict = self.gi.gi.tools.paste_content(content, self.id, **kwargs) self.refresh() return self.get_dataset(out_dict['outputs'][0]['id']) def export(self, gzip=True, include_hidden=False, include_deleted=False, wait=False, maxwait=None): """ Start a job to create an export archive for this history. See :meth:`~bioblend.galaxy.histories.HistoryClient.export_history` for parameter and return value info. """ return self.gi.gi.histories.export_history( self.id, gzip=gzip, include_hidden=include_hidden, include_deleted=include_deleted, wait=wait, maxwait=maxwait) def download(self, jeha_id, outf, chunk_size=bioblend.CHUNK_SIZE): """ Download an export archive for this history. Use :meth:`export` to create an export and get the required ``jeha_id``. See :meth:`~bioblend.galaxy.histories.HistoryClient.download_history` for parameter and return value info. """ return self.gi.gi.histories.download_history( self.id, jeha_id, outf, chunk_size=chunk_size) def create_dataset_collection(self, collection_description): """ Create a new dataset collection in the history by providing a collection description. :type collection_description: bioblend.galaxy.dataset_collections.CollectionDescription :param collection_description: a description of the dataset collection :rtype: :class:`~.HistoryDatasetCollectionAssociation` :return: the new dataset collection """ dataset_collection = self.gi.gi.histories.create_dataset_collection(self.id, collection_description) self.refresh() return self.get_dataset_collection(dataset_collection['id']) def get_dataset_collection(self, dsc_id): """ Retrieve the dataset collection corresponding to the given id. :type dsc_id: str :param dsc_id: dataset collection id :rtype: :class:`~.HistoryDatasetCollectionAssociation` :return: the dataset collection corresponding to ``dsc_id`` """ dsc_dict = self.gi.gi.histories.show_dataset_collection(self.id, dsc_id) return self.DSC_TYPE(dsc_dict, self, gi=self.gi) class Library(DatasetContainer): """ Maps to a Galaxy library. """ BASE_ATTRS = DatasetContainer.BASE_ATTRS + ('description', 'synopsis') DS_TYPE = LibraryDataset CONTENT_INFO_TYPE = LibraryContentInfo API_MODULE = 'libraries' def __init__(self, lib_dict, content_infos=None, gi=None): super(Library, self).__init__( lib_dict, content_infos=content_infos, gi=gi) @property def gi_module(self): return self.gi.libraries @property def folder_ids(self): """ Return the ids of the contained folders. """ return [_.id for _ in self.content_infos if _.type == 'folder'] def delete(self): """ Delete this library. """ self.gi.libraries.delete(id_=self.id) self.refresh() self.unmap() def _pre_upload(self, folder): """ Return the id of the given folder, after sanity checking. """ if not self.is_mapped: raise RuntimeError('library is not mapped to a Galaxy object') return None if folder is None else folder.id def upload_data(self, data, folder=None, **kwargs): """ Upload data to this library. :type data: str :param data: dataset contents :type folder: :class:`~.Folder` :param folder: a folder object, or ``None`` to upload to the root folder :rtype: :class:`~.LibraryDataset` :return: the dataset object that represents the uploaded content Optional keyword arguments: ``file_type``, ``dbkey``. """ fid = self._pre_upload(folder) res = self.gi.gi.libraries.upload_file_contents( self.id, data, folder_id=fid, **kwargs) self.refresh() return self.get_dataset(res[0]['id']) def upload_from_url(self, url, folder=None, **kwargs): """ Upload data to this library from the given URL. :type url: str :param url: URL from which data should be read See :meth:`.upload_data` for info on other params. """ fid = self._pre_upload(folder) res = self.gi.gi.libraries.upload_file_from_url( self.id, url, folder_id=fid, **kwargs) self.refresh() return self.get_dataset(res[0]['id']) def upload_from_local(self, path, folder=None, **kwargs): """ Upload data to this library from a local file. :type path: str :param path: local file path from which data should be read See :meth:`.upload_data` for info on other params. """ fid = self._pre_upload(folder) res = self.gi.gi.libraries.upload_file_from_local_path( self.id, path, folder_id=fid, **kwargs) self.refresh() return self.get_dataset(res[0]['id']) def upload_from_galaxy_fs(self, paths, folder=None, link_data_only=None, **kwargs): """ Upload data to this library from filesystem paths on the server. .. note:: For this method to work, the Galaxy instance must have the ``allow_path_paste`` (``allow_library_path_paste`` in Galaxy ``release_17.05`` and earlier) option set to ``true`` in the ``config/galaxy.yml`` configuration file. :type paths: str or :class:`~collections.Iterable` of str :param paths: server-side file paths from which data should be read :type link_data_only: str :param link_data_only: either 'copy_files' (default) or 'link_to_files'. Setting to 'link_to_files' symlinks instead of copying the files :rtype: list of :class:`~.LibraryDataset` :return: the dataset objects that represent the uploaded content See :meth:`.upload_data` for info on other params. """ fid = self._pre_upload(folder) if isinstance(paths, six.string_types): paths = (paths,) paths = '\n'.join(paths) res = self.gi.gi.libraries.upload_from_galaxy_filesystem( self.id, paths, folder_id=fid, link_data_only=link_data_only, **kwargs) if res is None: raise RuntimeError('upload_from_galaxy_filesystem: no reply') if not isinstance(res, collections.Sequence): raise RuntimeError( 'upload_from_galaxy_filesystem: unexpected reply: %r' % res) new_datasets = [ self.get_dataset(ds_info['id']) for ds_info in res ] self.refresh() return new_datasets def copy_from_dataset(self, hda, folder=None, message=''): """ Copy a history dataset into this library. :type hda: :class:`~.HistoryDatasetAssociation` :param hda: history dataset to copy into the library See :meth:`.upload_data` for info on other params. """ fid = self._pre_upload(folder) res = self.gi.gi.libraries.copy_from_dataset( self.id, hda.id, folder_id=fid, message=message) self.refresh() return self.get_dataset(res['library_dataset_id']) def create_folder(self, name, description=None, base_folder=None): """ Create a folder in this library. :type name: str :param name: folder name :type description: str :param description: optional folder description :type base_folder: :class:`~.Folder` :param base_folder: parent folder, or ``None`` to create in the root folder :rtype: :class:`~.Folder` :return: the folder just created """ bfid = None if base_folder is None else base_folder.id res = self.gi.gi.libraries.create_folder( self.id, name, description=description, base_folder_id=bfid) self.refresh() return self.get_folder(res[0]['id']) def get_folder(self, f_id): """ Retrieve the folder corresponding to the given id. :rtype: :class:`~.Folder` :return: the folder corresponding to ``f_id`` """ f_dict = self.gi.gi.libraries.show_folder(self.id, f_id) return Folder(f_dict, self, gi=self.gi) @property def root_folder(self): """ The root folder of this library. :rtype: :class:`~.Folder` :return: the root folder of this library """ return self.get_folder(self.gi.gi.libraries._get_root_folder_id(self.id)) class Folder(Wrapper): """ Maps to a folder in a Galaxy library. """ BASE_ATTRS = Wrapper.BASE_ATTRS + ('description', 'deleted', 'item_count') def __init__(self, f_dict, container, gi=None): super(Folder, self).__init__(f_dict, gi=gi) object.__setattr__(self, 'container', container) @property def parent(self): """ The parent folder of this folder. The parent of the root folder is ``None``. :rtype: :class:`~.Folder` :return: the parent of this folder """ if self._cached_parent is None: object.__setattr__(self, '_cached_parent', self._get_parent()) return self._cached_parent def _get_parent(self): """ Return the parent folder of this folder. """ # Galaxy release_13.04 and earlier does not have parent_id in the folder # dictionary, may be implemented by searching for the folder with the # correct name if 'parent_id' not in self.wrapped: raise NotImplementedError('This method has not been implemented for Galaxy release_13.04 and earlier') parent_id = self.wrapped['parent_id'] if parent_id is None: return None # Galaxy from release_14.02 to release_15.01 returns a dummy parent_id # for the root folder instead of None, so check if this is the root if self.id == self.gi.gi.libraries._get_root_folder_id(self.container.id): return None # Galaxy release_13.11 and earlier returns a parent_id without the # initial 'F' if not parent_id.startswith('F'): parent_id = 'F' + parent_id return self.container.get_folder(parent_id) @property def gi_module(self): return self.gi.libraries @property def container_id(self): """ Deprecated property. Id of the folder container. Use :attr:`.container.id` instead. """ return self.container.id def refresh(self): """ Re-fetch the attributes pertaining to this object. Returns: self """ f_dict = self.gi.gi.libraries.show_folder(self.container.id, self.id) self.__init__(f_dict, self.container, gi=self.gi) return self class Tool(Wrapper): """ Maps to a Galaxy tool. """ BASE_ATTRS = Wrapper.BASE_ATTRS + ('version',) POLLING_INTERVAL = 10 # for output state monitoring def __init__(self, t_dict, gi=None): super(Tool, self).__init__(t_dict, gi=gi) @property def gi_module(self): return self.gi.tools def run(self, inputs, history, wait=False, polling_interval=POLLING_INTERVAL): """ Execute this tool in the given history with inputs from dict ``inputs``. :type inputs: dict :param inputs: dictionary of input datasets and parameters for the tool (see below) :type history: :class:`History` :param history: the history where to execute the tool :type wait: bool :param wait: whether to wait while the returned datasets are in a pending state :type polling_interval: float :param polling_interval: polling interval in seconds :rtype: list of :class:`HistoryDatasetAssociation` :return: list of output datasets The ``inputs`` dict should contain input datasets and parameters in the (largely undocumented) format used by the Galaxy API. Some examples can be found in `Galaxy's API test suite <https://github.com/galaxyproject/galaxy/blob/dev/test/api/test_tools.py>`_. The value of an input dataset can also be a :class:`Dataset` object, which will be automatically converted to the needed format. """ for k, v in six.iteritems(inputs): if isinstance(v, Dataset): inputs[k] = {'src': v.SRC, 'id': v.id} out_dict = self.gi.gi.tools.run_tool(history.id, self.id, inputs) outputs = [history.get_dataset(_['id']) for _ in out_dict['outputs']] if wait: self.gi._wait_datasets(outputs, polling_interval=polling_interval) return outputs class Job(Wrapper): """ Maps to a Galaxy job. """ BASE_ATTRS = ('id', 'state') def __init__(self, j_dict, gi=None): super(Job, self).__init__(j_dict, gi=gi) @property def gi_module(self): return self.gi.jobs @six.add_metaclass(abc.ABCMeta) class Preview(Wrapper): """ Abstract base class for Galaxy entity 'previews'. Classes derived from this one model the short summaries returned by global getters such as ``/api/libraries``. """ BASE_ATTRS = Wrapper.BASE_ATTRS + ('deleted',) @abc.abstractmethod def __init__(self, pw_dict, gi=None): super(Preview, self).__init__(pw_dict, gi=gi) class LibraryPreview(Preview): """ Models Galaxy library 'previews'. Instances of this class wrap dictionaries obtained by getting ``/api/libraries`` from Galaxy. """ def __init__(self, pw_dict, gi=None): super(LibraryPreview, self).__init__(pw_dict, gi=gi) @property def gi_module(self): return self.gi.libraries class HistoryPreview(Preview): """ Models Galaxy history 'previews'. Instances of this class wrap dictionaries obtained by getting ``/api/histories`` from Galaxy. """ BASE_ATTRS = Preview.BASE_ATTRS + ('tags',) def __init__(self, pw_dict, gi=None): super(HistoryPreview, self).__init__(pw_dict, gi=gi) @property def gi_module(self): return self.gi.histories class WorkflowPreview(Preview): """ Models Galaxy workflow 'previews'. Instances of this class wrap dictionaries obtained by getting ``/api/workflows`` from Galaxy. """ BASE_ATTRS = Preview.BASE_ATTRS + ('published', 'tags') def __init__(self, pw_dict, gi=None): super(WorkflowPreview, self).__init__(pw_dict, gi=gi) @property def gi_module(self): return self.gi.workflows class JobPreview(Preview): """ Models Galaxy job 'previews'. Instances of this class wrap dictionaries obtained by getting ``/api/jobs`` from Galaxy. """ BASE_ATTRS = ('id', 'state') def __init__(self, pw_dict, gi=None): super(JobPreview, self).__init__(pw_dict, gi=gi) @property def gi_module(self): return self.gi.jobs
