view planemo/lib/python3.7/site-packages/bioblend/galaxy/objects/wrappers.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
line wrap: on
line source

# pylint: disable=W0622,E1101

"""
A basic object-oriented interface for Galaxy entities.
"""

import abc
import json
from collections.abc import (
    Iterable,
    Mapping,
    Sequence,
)

import bioblend


__all__ = (
    'Wrapper',
    'Step',
    'Workflow',
    'ContentInfo',
    'LibraryContentInfo',
    'HistoryContentInfo',
    'DatasetContainer',
    'History',
    'Library',
    'Folder',
    'Dataset',
    'HistoryDatasetAssociation',
    'DatasetCollection',
    'HistoryDatasetCollectionAssociation',
    'LibraryDatasetDatasetAssociation',
    'LibraryDataset',
    'Tool',
    'Job',
    'Preview',
    'LibraryPreview',
    'HistoryPreview',
    'WorkflowPreview',
)


class Wrapper(object, metaclass=abc.ABCMeta):
    """
    Abstract base class for Galaxy entity wrappers.

    Wrapper instances wrap deserialized JSON dictionaries such as the
    ones obtained by the Galaxy web API, converting key-based access to
    attribute-based access (e.g., ``library['name'] -> library.name``).

    Dict keys that are converted to attributes are listed in the
    ``BASE_ATTRS`` class variable: this is the 'stable' interface.
    Note that the wrapped dictionary is accessible via the ``wrapped``
    attribute.
    """
    BASE_ATTRS = ('id', 'name')

    @abc.abstractmethod
    def __init__(self, wrapped, parent=None, gi=None):
        """
        :type wrapped: dict
        :param wrapped: JSON-serializable dictionary

        :type parent: :class:`Wrapper`
        :param parent: the parent of this wrapper

        :type gi: :class:`GalaxyInstance`
        :param gi: the GalaxyInstance through which we can access this wrapper
        """
        if not isinstance(wrapped, Mapping):
            raise TypeError('wrapped object must be a mapping type')
        # loads(dumps(x)) is a bit faster than deepcopy and allows type checks
        try:
            dumped = json.dumps(wrapped)
        except (TypeError, ValueError):
            raise ValueError('wrapped object must be JSON-serializable')
        object.__setattr__(self, 'wrapped', json.loads(dumped))
        for k in self.BASE_ATTRS:
            object.__setattr__(self, k, self.wrapped.get(k))
        object.__setattr__(self, '_cached_parent', parent)
        object.__setattr__(self, 'is_modified', False)
        object.__setattr__(self, 'gi', gi)

    @abc.abstractproperty
    def gi_module(self):
        """
        The GalaxyInstance module that deals with objects of this type.
        """
        pass

    @property
    def parent(self):
        """
        The parent of this wrapper.
        """
        return self._cached_parent

    @property
    def is_mapped(self):
        """
        ``True`` if this wrapper is mapped to an actual Galaxy entity.
        """
        return self.id is not None

    def unmap(self):
        """
        Disconnect this wrapper from Galaxy.
        """
        object.__setattr__(self, 'id', None)

    def clone(self):
        """
        Return an independent copy of this wrapper.
        """
        return self.__class__(self.wrapped)

    def touch(self):
        """
        Mark this wrapper as having been modified since its creation.
        """
        object.__setattr__(self, 'is_modified', True)
        if self.parent:
            self.parent.touch()

    def to_json(self):
        """
        Return a JSON dump of this wrapper.
        """
        return json.dumps(self.wrapped)

    @classmethod
    def from_json(cls, jdef):
        """
        Build a new wrapper from a JSON dump.
        """
        return cls(json.loads(jdef))

    # FIXME: things like self.x[0] = 'y' do NOT call self.__setattr__
    def __setattr__(self, name, value):
        if name not in self.wrapped:
            raise AttributeError("can't set attribute")
        else:
            self.wrapped[name] = value
            object.__setattr__(self, name, value)
            self.touch()

    def __repr__(self):
        return "%s(%r)" % (self.__class__.__name__, self.wrapped)


class Step(Wrapper):
    """
    Abstract base class for workflow steps.

    Steps are the main building blocks of a Galaxy workflow. A step can be: an
    input (type ``data_collection_input``, ``data_input`` or
    ``parameter_input``), a computational tool (type ``tool``), a subworkflow
    (type ``subworkflow``) or a pause (type ``pause``).
    """
    BASE_ATTRS = Wrapper.BASE_ATTRS + (
        'input_steps', 'tool_id', 'tool_inputs', 'tool_version', 'type'
    )

    def __init__(self, step_dict, parent):
        super().__init__(step_dict, parent=parent, gi=parent.gi)
        try:
            stype = step_dict['type']
        except KeyError:
            raise ValueError('not a step dict')
        if stype not in {'data_collection_input', 'data_input', 'parameter_input', 'pause', 'subworkflow', 'tool'}:
            raise ValueError('Unknown step type: %r' % stype)

    @property
    def gi_module(self):
        return self.gi.workflows


class Workflow(Wrapper):
    """
    Workflows represent ordered sequences of computations on Galaxy.

    A workflow defines a sequence of steps that produce one or more
    results from an input dataset.
    """
    BASE_ATTRS = Wrapper.BASE_ATTRS + (
        'deleted', 'inputs', 'published', 'steps', 'tags'
    )
    POLLING_INTERVAL = 10  # for output state monitoring

    def __init__(self, wf_dict, gi=None):
        super().__init__(wf_dict, gi=gi)
        missing_ids = []
        if gi:
            tools_list_by_id = [t.id for t in gi.tools.get_previews()]
        else:
            tools_list_by_id = []
        tool_labels_to_ids = {}
        for k, v in self.steps.items():
            # convert step ids to str for consistency with outer keys
            v['id'] = str(v['id'])
            for i in v['input_steps'].values():
                i['source_step'] = str(i['source_step'])
            step = Step(v, self)
            self.steps[k] = step
            if step.type == 'tool':
                if not step.tool_inputs or step.tool_id not in tools_list_by_id:
                    missing_ids.append(k)
                tool_labels_to_ids.setdefault(step.tool_id, set()).add(step.id)
        input_labels_to_ids = {}
        for id_, d in self.inputs.items():
            input_labels_to_ids.setdefault(d['label'], set()).add(id_)
        object.__setattr__(self, 'input_labels_to_ids', input_labels_to_ids)
        object.__setattr__(self, 'tool_labels_to_ids', tool_labels_to_ids)
        dag, inv_dag = self._get_dag()
        heads, tails = set(dag), set(inv_dag)
        object.__setattr__(self, 'dag', dag)
        object.__setattr__(self, 'inv_dag', inv_dag)
        object.__setattr__(self, 'source_ids', heads - tails)
        assert set(self.inputs) == self.data_collection_input_ids | self.data_input_ids | self.parameter_input_ids, \
            "inputs is %r, while data_collection_input_ids is %r, data_input_ids is %r and parameter_input_ids is %r" % (
                self.inputs, self.data_collection_input_ids, self.data_input_ids, self.parameter_input_ids)
        object.__setattr__(self, 'sink_ids', tails - heads)
        object.__setattr__(self, 'missing_ids', missing_ids)

    @property
    def gi_module(self):
        return self.gi.workflows

    def _get_dag(self):
        """
        Return the workflow's DAG.

        For convenience, this method computes a 'direct' (step =>
        successors) and an 'inverse' (step => predecessors)
        representation of the same DAG.

        For instance, a workflow with a single tool *c*, two inputs
        *a, b* and three outputs *d, e, f* is represented by (direct)::

          {'a': {'c'}, 'b': {'c'}, 'c': {'d', 'e', 'f'}}

        and by (inverse)::

          {'c': {'a', 'b'}, 'd': {'c'}, 'e': {'c'}, 'f': {'c'}}
        """
        dag, inv_dag = {}, {}
        for s in self.steps.values():
            for i in s.input_steps.values():
                head, tail = i['source_step'], s.id
                dag.setdefault(head, set()).add(tail)
                inv_dag.setdefault(tail, set()).add(head)
        return dag, inv_dag

    def sorted_step_ids(self):
        """
        Return a topological sort of the workflow's DAG.
        """
        ids = []
        source_ids = self.source_ids.copy()
        inv_dag = dict((k, v.copy()) for k, v in self.inv_dag.items())
        while source_ids:
            head = source_ids.pop()
            ids.append(head)
            for tail in self.dag.get(head, []):
                incoming = inv_dag[tail]
                incoming.remove(head)
                if not incoming:
                    source_ids.add(tail)
        return ids

    @property
    def data_input_ids(self):
        """
        Return the ids of data input steps for this workflow.
        """
        return {id_ for id_, s in self.steps.items() if s.type == 'data_input'}

    @property
    def data_collection_input_ids(self):
        """
        Return the ids of data collection input steps for this workflow.
        """
        return {id_ for id_, s in self.steps.items() if s.type == 'data_collection_input'}

    @property
    def parameter_input_ids(self):
        """
        Return the ids of parameter input steps for this workflow.
        """
        return {id_ for id_, s in self.steps.items() if s.type == 'parameter_input'}

    @property
    def tool_ids(self):
        """
        Return the ids of tool steps for this workflow.
        """
        return {id_ for id_, s in self.steps.items() if s.type == 'tool'}

    @property
    def input_labels(self):
        """
        Return the labels of this workflow's input steps.
        """
        return set(self.input_labels_to_ids)

    @property
    def is_runnable(self):
        """
        Return True if the workflow can be run on Galaxy.

        A workflow is considered runnable on a Galaxy instance if all
        of the tools it uses are installed in that instance.
        """
        return not self.missing_ids

    def convert_input_map(self, input_map):
        """
        Convert ``input_map`` to the format required by the Galaxy web API.

        :type input_map: dict
        :param input_map: a mapping from input labels to datasets

        :rtype: dict
        :return: a mapping from input slot ids to dataset ids in the
          format required by the Galaxy web API.
        """
        m = {}
        for label, slot_ids in self.input_labels_to_ids.items():
            datasets = input_map.get(label, [])
            if not isinstance(datasets, Iterable):
                datasets = [datasets]
            if len(datasets) < len(slot_ids):
                raise RuntimeError('not enough datasets for "%s"' % label)
            for id_, ds in zip(slot_ids, datasets):
                m[id_] = {'id': ds.id, 'src': ds.SRC}
        return m

    def preview(self):
        getf = self.gi.workflows.get_previews
        try:
            p = [_ for _ in getf(published=True) if _.id == self.id][0]
        except IndexError:
            raise ValueError('no object for id %s' % self.id)
        return p

    def run(self, input_map=None, history='', params=None, import_inputs=False,
            replacement_params=None, wait=False,
            polling_interval=POLLING_INTERVAL, break_on_error=True):
        """
        Run the workflow in the current Galaxy instance.

        :type input_map: dict
        :param input_map: a mapping from workflow input labels to
          datasets, e.g.: ``dict(zip(workflow.input_labels,
          library.get_datasets()))``

        :type history: :class:`History` or str
        :param history: either a valid history object (results will be
          stored there) or a string (a new history will be created with
          the given name).

        :type params: dict
        :param params: a mapping of non-datasets tool parameters (see below)

        :type import_inputs: bool
        :param import_inputs: If ``True``, workflow inputs will be imported into
          the history; if ``False``, only workflow outputs will be visible in
          the history.

        :type replacement_params: dict
        :param replacement_params: pattern-based replacements for
          post-job actions (see the docs for
          :meth:`~bioblend.galaxy.workflows.WorkflowClient.invoke_workflow`)

        :type wait: bool
        :param wait: whether to wait while the returned datasets are
          in a pending state

        :type polling_interval: float
        :param polling_interval: polling interval in seconds

        :type break_on_error: bool
        :param break_on_error: whether to break as soon as at least one
          of the returned datasets is in the 'error' state

        :rtype: tuple
        :return: list of output datasets, output history

        The ``params`` dict should be specified as follows::

          {STEP_ID: PARAM_DICT, ...}

        where PARAM_DICT is::

          {PARAM_NAME: VALUE, ...}

        For backwards compatibility, the following (deprecated) format is
        also supported for ``params``::

          {TOOL_ID: PARAM_DICT, ...}

        in which case PARAM_DICT affects all steps with the given tool id.
        If both by-tool-id and by-step-id specifications are used, the
        latter takes precedence.

        Finally (again, for backwards compatibility), PARAM_DICT can also
        be specified as::

          {'param': PARAM_NAME, 'value': VALUE}

        Note that this format allows only one parameter to be set per step.

        Example: set 'a' to 1 for the third workflow step::

          params = {workflow.steps[2].id: {'a': 1}}

        .. warning::

          This is a blocking operation that can take a very long time. If
          ``wait`` is set to ``False``, the method will return as soon as the
          workflow has been *scheduled*, otherwise it will wait until the
          workflow has been *run*. With a large number of steps, however, the
          delay may not be negligible even in the former case (e.g. minutes for
          100 steps).
        """
        if not self.is_mapped:
            raise RuntimeError('workflow is not mapped to a Galaxy object')
        if not self.is_runnable:
            raise RuntimeError('workflow has missing tools: %s' % ', '.join(
                '%s[%s]' % (self.steps[_].tool_id, _)
                for _ in self.missing_ids))
        kwargs = {
            'dataset_map': self.convert_input_map(input_map or {}),
            'params': params,
            'import_inputs_to_history': import_inputs,
            'replacement_params': replacement_params,
        }
        if isinstance(history, History):
            try:
                kwargs['history_id'] = history.id
            except AttributeError:
                raise RuntimeError('history does not have an id')
        elif isinstance(history, str):
            kwargs['history_name'] = history
        else:
            raise TypeError(
                'history must be either a history wrapper or a string')
        res = self.gi.gi.workflows.run_workflow(self.id, **kwargs)
        # res structure: {'history': HIST_ID, 'outputs': [CI_ID, CI_ID, ...]}
        out_hist = self.gi.histories.get(res['history'])
        content_infos_dict = dict()
        for ci in out_hist.content_infos:
            content_infos_dict[ci.id] = ci
        outputs = []
        for output_id in res['outputs']:
            if content_infos_dict[output_id].type == 'file':
                outputs.append(out_hist.get_dataset(output_id))
            elif content_infos_dict[output_id].type == 'collection':
                outputs.append(out_hist.get_dataset_collection(output_id))

        if wait:
            self.gi._wait_datasets(outputs, polling_interval=polling_interval,
                                   break_on_error=break_on_error)
        return outputs, out_hist

    def export(self):
        """
        Export a re-importable representation of the workflow.

        :rtype: dict
        :return: a JSON-serializable dump of the workflow
        """
        return self.gi.gi.workflows.export_workflow_dict(self.id)

    def delete(self):
        """
        Delete this workflow.

        .. warning::
          Deleting a workflow is irreversible - all of the data from
          the workflow will be permanently deleted.
        """
        self.gi.workflows.delete(id_=self.id)
        self.unmap()


class Dataset(Wrapper, metaclass=abc.ABCMeta):
    """
    Abstract base class for Galaxy datasets.
    """
    BASE_ATTRS = Wrapper.BASE_ATTRS + (
        'data_type', 'file_ext', 'file_name', 'file_size', 'genome_build', 'misc_info', 'state'
    )
    POLLING_INTERVAL = 1  # for state monitoring

    @abc.abstractmethod
    def __init__(self, ds_dict, container, gi=None):
        super().__init__(ds_dict, gi=gi)
        object.__setattr__(self, 'container', container)

    @property
    def container_id(self):
        """
        Deprecated property.

        Id of the dataset container. Use :attr:`.container.id` instead.
        """
        return self.container.id

    @abc.abstractproperty
    def _stream_url(self):
        """
        Return the URL to stream this dataset.
        """
        pass

    def get_stream(self, chunk_size=bioblend.CHUNK_SIZE):
        """
        Open dataset for reading and return an iterator over its contents.

        :type chunk_size: int
        :param chunk_size: read this amount of bytes at a time
        """
        kwargs = {'stream': True}
        if isinstance(self, LibraryDataset):
            kwargs['params'] = {'ld_ids%5B%5D': self.id}
        r = self.gi.gi.make_get_request(self._stream_url, **kwargs)
        if isinstance(self, LibraryDataset) and r.status_code == 500:
            # compatibility with older Galaxy releases
            kwargs['params'] = {'ldda_ids%5B%5D': self.id}
            r = self.gi.gi.make_get_request(self._stream_url, **kwargs)
        r.raise_for_status()
        return r.iter_content(chunk_size)  # FIXME: client can't close r

    def peek(self, chunk_size=bioblend.CHUNK_SIZE):
        """
        Open dataset for reading and return the first chunk.

        See :meth:`.get_stream` for param info.
        """
        try:
            return next(self.get_stream(chunk_size=chunk_size))
        except StopIteration:
            return b''

    def download(self, file_object, chunk_size=bioblend.CHUNK_SIZE):
        """
        Open dataset for reading and save its contents to ``file_object``.

        :type file_object: file
        :param file_object: output file object

        See :meth:`.get_stream` for info on other params.
        """
        for chunk in self.get_stream(chunk_size=chunk_size):
            file_object.write(chunk)

    def get_contents(self, chunk_size=bioblend.CHUNK_SIZE):
        """
        Open dataset for reading and return its **full** contents.

        See :meth:`.get_stream` for param info.
        """
        return b''.join(self.get_stream(chunk_size=chunk_size))

    def refresh(self):
        """
        Re-fetch the attributes pertaining to this object.

        Returns: self
        """
        gi_client = getattr(self.gi.gi, self.container.API_MODULE)
        ds_dict = gi_client.show_dataset(self.container.id, self.id)
        self.__init__(ds_dict, self.container, self.gi)
        return self

    def wait(self, polling_interval=POLLING_INTERVAL, break_on_error=True):
        """
        Wait for this dataset to come out of the pending states.

        :type polling_interval: float
        :param polling_interval: polling interval in seconds

        :type break_on_error: bool
        :param break_on_error: if ``True``, raise a RuntimeError exception if
          the dataset ends in the 'error' state.

        .. warning::

          This is a blocking operation that can take a very long time. Also,
          note that this method does not return anything; however, this dataset
          is refreshed (possibly multiple times) during the execution.
        """
        self.gi._wait_datasets([self], polling_interval=polling_interval,
                               break_on_error=break_on_error)


class HistoryDatasetAssociation(Dataset):
    """
    Maps to a Galaxy ``HistoryDatasetAssociation``.
    """
    BASE_ATTRS = Dataset.BASE_ATTRS + ('annotation', 'deleted', 'purged', 'tags', 'visible')
    SRC = 'hda'

    def __init__(self, ds_dict, container, gi=None):
        super().__init__(ds_dict, container, gi=gi)

    @property
    def gi_module(self):
        return self.gi.histories

    @property
    def _stream_url(self):
        base_url = self.gi.gi.histories._make_url(module_id=self.container.id, contents=True)
        return "%s/%s/display" % (base_url, self.id)

    def update(self, **kwds):
        """
        Update this history dataset metadata. Some of the attributes that can be
        modified are documented below.

        :type name: str
        :param name: Replace history dataset name with the given string

        :type genome_build: str
        :param genome_build: Replace history dataset genome build (dbkey)

        :type annotation: str
        :param annotation: Replace history dataset annotation with given string

        :type deleted: bool
        :param deleted: Mark or unmark history dataset as deleted

        :type visible: bool
        :param visible: Mark or unmark history dataset as visible
        """
        res = self.gi.gi.histories.update_dataset(self.container.id, self.id, **kwds)
        # Refresh also the history because the dataset may have been (un)deleted
        self.container.refresh()
        self.__init__(res, self.container, gi=self.gi)
        return self

    def delete(self, purge=False):
        """
        Delete this history dataset.

        :type purge: bool
        :param purge: if ``True``, also purge (permanently delete) the dataset

        .. note::
            For the purge option to work, the Galaxy instance must have the
            ``allow_user_dataset_purge`` option set to ``true`` in the
            ``config/galaxy.yml`` configuration file.
        """
        self.gi.gi.histories.delete_dataset(self.container.id, self.id, purge=purge)
        self.container.refresh()
        self.refresh()


class DatasetCollection(Wrapper, metaclass=abc.ABCMeta):
    """
    Abstract base class for Galaxy dataset collections.
    """
    BASE_ATTRS = Wrapper.BASE_ATTRS + (
        'state', 'deleted', 'collection_type'
    )

    @abc.abstractmethod
    def __init__(self, dsc_dict, container, gi=None):
        super().__init__(dsc_dict, gi=gi)
        object.__setattr__(self, 'container', container)

    def refresh(self):
        """
        Re-fetch the attributes pertaining to this object.

        Returns: self
        """
        gi_client = getattr(self.gi.gi, self.container.API_MODULE)
        dsc_dict = gi_client.show_dataset_collection(self.container.id, self.id)
        self.__init__(dsc_dict, self.container, self.gi)
        return self


class HistoryDatasetCollectionAssociation(DatasetCollection):
    """
    Maps to a Galaxy ``HistoryDatasetCollectionAssociation``.
    """
    BASE_ATTRS = DatasetCollection.BASE_ATTRS + ('tags', 'visible', 'elements')
    SRC = 'hdca'

    def __init__(self, dsc_dict, container, gi=None):
        super().__init__(dsc_dict, container, gi=gi)

    @property
    def gi_module(self):
        return self.gi.histories

    def delete(self):
        """
        Delete this dataset collection.
        """
        self.gi.gi.histories.delete_dataset_collection(self.container.id, self.id)
        self.container.refresh()
        self.refresh()


class LibRelatedDataset(Dataset):
    """
    Base class for LibraryDatasetDatasetAssociation and LibraryDataset classes.
    """

    def __init__(self, ds_dict, container, gi=None):
        super().__init__(ds_dict, container, gi=gi)

    @property
    def gi_module(self):
        return self.gi.libraries

    @property
    def _stream_url(self):
        base_url = self.gi.gi.libraries._make_url()
        return "%s/datasets/download/uncompressed" % base_url


class LibraryDatasetDatasetAssociation(LibRelatedDataset):
    """
    Maps to a Galaxy ``LibraryDatasetDatasetAssociation``.
    """
    BASE_ATTRS = LibRelatedDataset.BASE_ATTRS + ('deleted',)
    SRC = 'ldda'


class LibraryDataset(LibRelatedDataset):
    """
    Maps to a Galaxy ``LibraryDataset``.
    """
    SRC = 'ld'

    def delete(self, purged=False):
        """
        Delete this library dataset.

        :type purged: bool
        :param purged: if ``True``, also purge (permanently delete) the dataset
        """
        self.gi.gi.libraries.delete_library_dataset(
            self.container.id, self.id, purged=purged)
        self.container.refresh()
        self.refresh()

    def update(self, **kwds):
        """
        Update this library dataset metadata. Some of the attributes that can be
        modified are documented below.

        :type name: str
        :param name: Replace history dataset name with the given string

        :type genome_build: str
        :param genome_build: Replace history dataset genome build (dbkey)
        """
        res = self.gi.gi.libraries.update_library_dataset(self.id, **kwds)
        self.container.refresh()
        self.__init__(res, self.container, gi=self.gi)
        return self


class ContentInfo(Wrapper, metaclass=abc.ABCMeta):
    """
    Instances of this class wrap dictionaries obtained by getting
    ``/api/{histories,libraries}/<ID>/contents`` from Galaxy.
    """
    BASE_ATTRS = Wrapper.BASE_ATTRS + ('type',)

    @abc.abstractmethod
    def __init__(self, info_dict, gi=None):
        super().__init__(info_dict, gi=gi)


class LibraryContentInfo(ContentInfo):
    """
    Instances of this class wrap dictionaries obtained by getting
    ``/api/libraries/<ID>/contents`` from Galaxy.
    """
    def __init__(self, info_dict, gi=None):
        super().__init__(info_dict, gi=gi)

    @property
    def gi_module(self):
        return self.gi.libraries


class HistoryContentInfo(ContentInfo):
    """
    Instances of this class wrap dictionaries obtained by getting
    ``/api/histories/<ID>/contents`` from Galaxy.
    """
    BASE_ATTRS = ContentInfo.BASE_ATTRS + ('deleted', 'state', 'visible')

    def __init__(self, info_dict, gi=None):
        super().__init__(info_dict, gi=gi)

    @property
    def gi_module(self):
        return self.gi.histories


class DatasetContainer(Wrapper, metaclass=abc.ABCMeta):
    """
    Abstract base class for dataset containers (histories and libraries).
    """
    BASE_ATTRS = Wrapper.BASE_ATTRS + ('deleted',)

    @abc.abstractmethod
    def __init__(self, c_dict, content_infos=None, gi=None):
        """
        :type content_infos: list of :class:`ContentInfo`
        :param content_infos: info objects for the container's contents
        """
        super().__init__(c_dict, gi=gi)
        if content_infos is None:
            content_infos = []
        object.__setattr__(self, 'content_infos', content_infos)

    @property
    def dataset_ids(self):
        """
        Return the ids of the contained datasets.
        """
        return [_.id for _ in self.content_infos if _.type == 'file']

    def preview(self):
        getf = self.gi_module.get_previews
        # self.state could be stale: check both regular and deleted containers
        try:
            p = [_ for _ in getf() if _.id == self.id][0]
        except IndexError:
            try:
                p = [_ for _ in getf(deleted=True) if _.id == self.id][0]
            except IndexError:
                raise ValueError('no object for id %s' % self.id)
        return p

    def refresh(self):
        """
        Re-fetch the attributes pertaining to this object.

        Returns: self
        """
        fresh = self.gi_module.get(self.id)
        self.__init__(
            fresh.wrapped, content_infos=fresh.content_infos, gi=self.gi)
        return self

    def get_dataset(self, ds_id):
        """
        Retrieve the dataset corresponding to the given id.

        :type ds_id: str
        :param ds_id: dataset id

        :rtype: :class:`~.HistoryDatasetAssociation` or
          :class:`~.LibraryDataset`
        :return: the dataset corresponding to ``ds_id``
        """
        gi_client = getattr(self.gi.gi, self.API_MODULE)
        ds_dict = gi_client.show_dataset(self.id, ds_id)
        return self.DS_TYPE(ds_dict, self, gi=self.gi)

    def get_datasets(self, name=None):
        """
        Get all datasets contained inside this dataset container.

        :type name: str
        :param name: return only datasets with this name

        :rtype: list of :class:`~.HistoryDatasetAssociation` or list of
          :class:`~.LibraryDataset`
        :return: datasets with the given name contained inside this
          container

        .. note::

          when filtering library datasets by name, specify their full
          paths starting from the library's root folder, e.g.,
          ``/seqdata/reads.fastq``.  Full paths are available through
          the ``content_infos`` attribute of
          :class:`~.Library` objects.
        """
        if name is None:
            ds_ids = self.dataset_ids
        else:
            ds_ids = [_.id for _ in self.content_infos if _.name == name]
        return [self.get_dataset(_) for _ in ds_ids]


class History(DatasetContainer):
    """
    Maps to a Galaxy history.
    """
    BASE_ATTRS = DatasetContainer.BASE_ATTRS + ('annotation', 'published', 'state', 'state_ids', 'state_details', 'tags')
    DS_TYPE = HistoryDatasetAssociation
    DSC_TYPE = HistoryDatasetCollectionAssociation
    CONTENT_INFO_TYPE = HistoryContentInfo
    API_MODULE = 'histories'

    def __init__(self, hist_dict, content_infos=None, gi=None):
        super().__init__(hist_dict, content_infos=content_infos, gi=gi)

    @property
    def gi_module(self):
        return self.gi.histories

    def update(self, **kwds):
        """
        Update history metadata information. Some of the attributes that can be
        modified are documented below.

        :type name: str
        :param name: Replace history name with the given string

        :type annotation: str
        :param annotation: Replace history annotation with the given string

        :type deleted: bool
        :param deleted: Mark or unmark history as deleted

        :type purged: bool
        :param purged: If True, mark history as purged (permanently deleted).

        :type published: bool
        :param published: Mark or unmark history as published

        :type importable: bool
        :param importable: Mark or unmark history as importable

        :type tags: list
        :param tags: Replace history tags with the given list
        """
        # TODO: wouldn't it be better if name and annotation were attributes?
        self.gi.gi.histories.update_history(self.id, **kwds)
        self.refresh()
        return self

    def delete(self, purge=False):
        """
        Delete this history.

        :type purge: bool
        :param purge: if ``True``, also purge (permanently delete) the history

        .. note::
          For the purge option to work, the Galaxy instance must have the
          ``allow_user_dataset_purge`` option set to ``true`` in the
          ``config/galaxy.yml`` configuration file.
        """
        self.gi.histories.delete(id_=self.id, purge=purge)
        self.refresh()
        self.unmap()

    def import_dataset(self, lds):
        """
        Import a dataset into the history from a library.

        :type lds: :class:`~.LibraryDataset`
        :param lds: the library dataset to import

        :rtype: :class:`~.HistoryDatasetAssociation`
        :return: the imported history dataset
        """
        if not self.is_mapped:
            raise RuntimeError('history is not mapped to a Galaxy object')
        if not isinstance(lds, LibraryDataset):
            raise TypeError('lds is not a LibraryDataset')
        res = self.gi.gi.histories.upload_dataset_from_library(self.id, lds.id)
        if not isinstance(res, Mapping):
            raise RuntimeError(
                'upload_dataset_from_library: unexpected reply: %r' % res)
        self.refresh()
        return self.get_dataset(res['id'])

    def upload_file(self, path, **kwargs):
        """
        Upload the file specified by ``path`` to this history.

        :type path: str
        :param path: path of the file to upload

        See :meth:`~bioblend.galaxy.tools.ToolClient.upload_file` for
        the optional parameters.

        :rtype: :class:`~.HistoryDatasetAssociation`
        :return: the uploaded dataset
        """
        out_dict = self.gi.gi.tools.upload_file(path, self.id, **kwargs)
        self.refresh()
        return self.get_dataset(out_dict['outputs'][0]['id'])

    upload_dataset = upload_file

    def upload_from_ftp(self, path, **kwargs):
        """
        Upload the file specified by ``path`` from the user's FTP directory to
        this history.

        :type path: str
        :param path: path of the file in the user's FTP directory

        See :meth:`~bioblend.galaxy.tools.ToolClient.upload_file` for
        the optional parameters.

        :rtype: :class:`~.HistoryDatasetAssociation`
        :return: the uploaded dataset
        """
        out_dict = self.gi.gi.tools.upload_from_ftp(path, self.id, **kwargs)
        self.refresh()
        return self.get_dataset(out_dict['outputs'][0]['id'])

    def paste_content(self, content, **kwargs):
        """
        Upload a string to a new dataset in this history.

        :type content: str
        :param content: content of the new dataset to upload

        See :meth:`~bioblend.galaxy.tools.ToolClient.upload_file` for
        the optional parameters (except file_name).

        :rtype: :class:`~.HistoryDatasetAssociation`
        :return: the uploaded dataset
        """
        out_dict = self.gi.gi.tools.paste_content(content, self.id, **kwargs)
        self.refresh()
        return self.get_dataset(out_dict['outputs'][0]['id'])

    def export(self, gzip=True, include_hidden=False, include_deleted=False,
               wait=False, maxwait=None):
        """
        Start a job to create an export archive for this history.  See
        :meth:`~bioblend.galaxy.histories.HistoryClient.export_history`
        for parameter and return value info.
        """
        return self.gi.gi.histories.export_history(
            self.id, gzip=gzip, include_hidden=include_hidden,
            include_deleted=include_deleted, wait=wait, maxwait=maxwait)

    def download(self, jeha_id, outf, chunk_size=bioblend.CHUNK_SIZE):
        """
        Download an export archive for this history.  Use :meth:`export`
        to create an export and get the required ``jeha_id``.  See
        :meth:`~bioblend.galaxy.histories.HistoryClient.download_history`
        for parameter and return value info.
        """
        return self.gi.gi.histories.download_history(
            self.id, jeha_id, outf, chunk_size=chunk_size)

    def create_dataset_collection(self, collection_description):
        """
        Create a new dataset collection in the history by providing a collection description.

        :type collection_description: bioblend.galaxy.dataset_collections.CollectionDescription
        :param collection_description: a description of the dataset collection

        :rtype: :class:`~.HistoryDatasetCollectionAssociation`
        :return: the new dataset collection
        """
        dataset_collection = self.gi.gi.histories.create_dataset_collection(self.id, collection_description)
        self.refresh()
        return self.get_dataset_collection(dataset_collection['id'])

    def get_dataset_collection(self, dsc_id):
        """
        Retrieve the dataset collection corresponding to the given id.

        :type dsc_id: str
        :param dsc_id: dataset collection id

        :rtype: :class:`~.HistoryDatasetCollectionAssociation`
        :return: the dataset collection corresponding to ``dsc_id``
        """
        dsc_dict = self.gi.gi.histories.show_dataset_collection(self.id, dsc_id)
        return self.DSC_TYPE(dsc_dict, self, gi=self.gi)


class Library(DatasetContainer):
    """
    Maps to a Galaxy library.
    """
    BASE_ATTRS = DatasetContainer.BASE_ATTRS + ('description', 'synopsis')
    DS_TYPE = LibraryDataset
    CONTENT_INFO_TYPE = LibraryContentInfo
    API_MODULE = 'libraries'

    def __init__(self, lib_dict, content_infos=None, gi=None):
        super().__init__(lib_dict, content_infos=content_infos, gi=gi)

    @property
    def gi_module(self):
        return self.gi.libraries

    @property
    def folder_ids(self):
        """
        Return the ids of the contained folders.
        """
        return [_.id for _ in self.content_infos if _.type == 'folder']

    def delete(self):
        """
        Delete this library.
        """
        self.gi.libraries.delete(id_=self.id)
        self.refresh()
        self.unmap()

    def _pre_upload(self, folder):
        """
        Return the id of the given folder, after sanity checking.
        """
        if not self.is_mapped:
            raise RuntimeError('library is not mapped to a Galaxy object')
        return None if folder is None else folder.id

    def upload_data(self, data, folder=None, **kwargs):
        """
        Upload data to this library.

        :type data: str
        :param data: dataset contents

        :type folder: :class:`~.Folder`
        :param folder: a folder object, or ``None`` to upload to the root folder

        :rtype: :class:`~.LibraryDataset`
        :return: the dataset object that represents the uploaded content

        Optional keyword arguments: ``file_type``, ``dbkey``.
        """
        fid = self._pre_upload(folder)
        res = self.gi.gi.libraries.upload_file_contents(
            self.id, data, folder_id=fid, **kwargs)
        self.refresh()
        return self.get_dataset(res[0]['id'])

    def upload_from_url(self, url, folder=None, **kwargs):
        """
        Upload data to this library from the given URL.

        :type url: str
        :param url: URL from which data should be read

        See :meth:`.upload_data` for info on other params.
        """
        fid = self._pre_upload(folder)
        res = self.gi.gi.libraries.upload_file_from_url(
            self.id, url, folder_id=fid, **kwargs)
        self.refresh()
        return self.get_dataset(res[0]['id'])

    def upload_from_local(self, path, folder=None, **kwargs):
        """
        Upload data to this library from a local file.

        :type path: str
        :param path: local file path from which data should be read

        See :meth:`.upload_data` for info on other params.
        """
        fid = self._pre_upload(folder)
        res = self.gi.gi.libraries.upload_file_from_local_path(
            self.id, path, folder_id=fid, **kwargs)
        self.refresh()
        return self.get_dataset(res[0]['id'])

    def upload_from_galaxy_fs(self, paths, folder=None, link_data_only=None, **kwargs):
        """
        Upload data to this library from filesystem paths on the server.

        .. note::
          For this method to work, the Galaxy instance must have the
          ``allow_path_paste`` option set to ``true`` in the
          ``config/galaxy.yml`` configuration file.

        :type paths: str or :class:`~collections.abc.Iterable` of str
        :param paths: server-side file paths from which data should be read

        :type link_data_only: str
        :param link_data_only: either 'copy_files' (default) or
          'link_to_files'. Setting to 'link_to_files' symlinks instead of
          copying the files

        :rtype: list of :class:`~.LibraryDataset`
        :return: the dataset objects that represent the uploaded content

        See :meth:`.upload_data` for info on other params.
        """
        fid = self._pre_upload(folder)
        if isinstance(paths, str):
            paths = (paths,)
        paths = '\n'.join(paths)
        res = self.gi.gi.libraries.upload_from_galaxy_filesystem(
            self.id, paths, folder_id=fid, link_data_only=link_data_only,
            **kwargs)
        if res is None:
            raise RuntimeError('upload_from_galaxy_filesystem: no reply')
        if not isinstance(res, Sequence):
            raise RuntimeError(
                'upload_from_galaxy_filesystem: unexpected reply: %r' % res)
        new_datasets = [
            self.get_dataset(ds_info['id']) for ds_info in res
        ]
        self.refresh()
        return new_datasets

    def copy_from_dataset(self, hda, folder=None, message=''):
        """
        Copy a history dataset into this library.

        :type hda: :class:`~.HistoryDatasetAssociation`
        :param hda: history dataset to copy into the library

        See :meth:`.upload_data` for info on other params.
        """
        fid = self._pre_upload(folder)
        res = self.gi.gi.libraries.copy_from_dataset(
            self.id, hda.id, folder_id=fid, message=message)
        self.refresh()
        return self.get_dataset(res['library_dataset_id'])

    def create_folder(self, name, description=None, base_folder=None):
        """
        Create a folder in this library.

        :type name: str
        :param name: folder name

        :type description: str
        :param description: optional folder description

        :type base_folder: :class:`~.Folder`
        :param base_folder: parent folder, or ``None`` to create in the root
          folder

        :rtype: :class:`~.Folder`
        :return: the folder just created
        """
        bfid = None if base_folder is None else base_folder.id
        res = self.gi.gi.libraries.create_folder(
            self.id, name, description=description, base_folder_id=bfid)
        self.refresh()
        return self.get_folder(res[0]['id'])

    def get_folder(self, f_id):
        """
        Retrieve the folder corresponding to the given id.

        :rtype: :class:`~.Folder`
        :return: the folder corresponding to ``f_id``
        """
        f_dict = self.gi.gi.libraries.show_folder(self.id, f_id)
        return Folder(f_dict, self, gi=self.gi)

    @property
    def root_folder(self):
        """
        The root folder of this library.

        :rtype: :class:`~.Folder`
        :return: the root folder of this library
        """
        return self.get_folder(self.gi.gi.libraries._get_root_folder_id(self.id))


class Folder(Wrapper):
    """
    Maps to a folder in a Galaxy library.
    """
    BASE_ATTRS = Wrapper.BASE_ATTRS + ('description', 'deleted', 'item_count')

    def __init__(self, f_dict, container, gi=None):
        super().__init__(f_dict, gi=gi)
        object.__setattr__(self, 'container', container)

    @property
    def parent(self):
        """
        The parent folder of this folder. The parent of the root folder is
        ``None``.

        :rtype: :class:`~.Folder`
        :return: the parent of this folder
        """
        if self._cached_parent is None:
            object.__setattr__(self,
                               '_cached_parent',
                               self._get_parent())
        return self._cached_parent

    def _get_parent(self):
        """
        Return the parent folder of this folder.
        """
        parent_id = self.wrapped['parent_id']
        if parent_id is None:
            return None
        return self.container.get_folder(parent_id)

    @property
    def gi_module(self):
        return self.gi.libraries

    @property
    def container_id(self):
        """
        Deprecated property.

        Id of the folder container. Use :attr:`.container.id` instead.
        """
        return self.container.id

    def refresh(self):
        """
        Re-fetch the attributes pertaining to this object.

        Returns: self
        """
        f_dict = self.gi.gi.libraries.show_folder(self.container.id, self.id)
        self.__init__(f_dict, self.container, gi=self.gi)
        return self


class Tool(Wrapper):
    """
    Maps to a Galaxy tool.
    """
    BASE_ATTRS = Wrapper.BASE_ATTRS + ('version',)
    POLLING_INTERVAL = 10  # for output state monitoring

    def __init__(self, t_dict, gi=None):
        super().__init__(t_dict, gi=gi)

    @property
    def gi_module(self):
        return self.gi.tools

    def run(self, inputs, history, wait=False,
            polling_interval=POLLING_INTERVAL):
        """
        Execute this tool in the given history with inputs from dict
        ``inputs``.

        :type inputs: dict
        :param inputs: dictionary of input datasets and parameters for
          the tool (see below)

        :type history: :class:`History`
        :param history: the history where to execute the tool

        :type wait: bool
        :param wait: whether to wait while the returned datasets are
          in a pending state

        :type polling_interval: float
        :param polling_interval: polling interval in seconds

        :rtype: list of :class:`HistoryDatasetAssociation`
        :return: list of output datasets

        The ``inputs`` dict should contain input datasets and parameters
        in the (largely undocumented) format used by the Galaxy API.
        Some examples can be found in `Galaxy's API test suite
        <https://github.com/galaxyproject/galaxy/blob/dev/lib/galaxy_test/api/test_tools.py>`_.
        The value of an input dataset can also be a :class:`Dataset`
        object, which will be automatically converted to the needed
        format.
        """
        for k, v in inputs.items():
            if isinstance(v, Dataset):
                inputs[k] = {'src': v.SRC, 'id': v.id}
        out_dict = self.gi.gi.tools.run_tool(history.id, self.id, inputs)
        outputs = [history.get_dataset(_['id']) for _ in out_dict['outputs']]
        if wait:
            self.gi._wait_datasets(outputs, polling_interval=polling_interval)
        return outputs


class Job(Wrapper):
    """
    Maps to a Galaxy job.
    """
    BASE_ATTRS = ('id', 'state')

    def __init__(self, j_dict, gi=None):
        super().__init__(j_dict, gi=gi)

    @property
    def gi_module(self):
        return self.gi.jobs


class Preview(Wrapper, metaclass=abc.ABCMeta):
    """
    Abstract base class for Galaxy entity 'previews'.

    Classes derived from this one model the short summaries returned
    by global getters such as ``/api/libraries``.
    """
    BASE_ATTRS = Wrapper.BASE_ATTRS + ('deleted',)

    @abc.abstractmethod
    def __init__(self, pw_dict, gi=None):
        super().__init__(pw_dict, gi=gi)


class LibraryPreview(Preview):
    """
    Models Galaxy library 'previews'.

    Instances of this class wrap dictionaries obtained by getting
    ``/api/libraries`` from Galaxy.
    """
    def __init__(self, pw_dict, gi=None):
        super().__init__(pw_dict, gi=gi)

    @property
    def gi_module(self):
        return self.gi.libraries


class HistoryPreview(Preview):
    """
    Models Galaxy history 'previews'.

    Instances of this class wrap dictionaries obtained by getting
    ``/api/histories`` from Galaxy.
    """
    BASE_ATTRS = Preview.BASE_ATTRS + ('tags',)

    def __init__(self, pw_dict, gi=None):
        super().__init__(pw_dict, gi=gi)

    @property
    def gi_module(self):
        return self.gi.histories


class WorkflowPreview(Preview):
    """
    Models Galaxy workflow 'previews'.

    Instances of this class wrap dictionaries obtained by getting
    ``/api/workflows`` from Galaxy.
    """
    BASE_ATTRS = Preview.BASE_ATTRS + ('published', 'tags')

    def __init__(self, pw_dict, gi=None):
        super().__init__(pw_dict, gi=gi)

    @property
    def gi_module(self):
        return self.gi.workflows


class JobPreview(Preview):
    """
    Models Galaxy job 'previews'.

    Instances of this class wrap dictionaries obtained by getting
    ``/api/jobs`` from Galaxy.
    """
    BASE_ATTRS = ('id', 'state')

    def __init__(self, pw_dict, gi=None):
        super().__init__(pw_dict, gi=gi)

    @property
    def gi_module(self):
        return self.gi.jobs