diff env/lib/python3.7/site-packages/galaxy/containers/docker_model.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400 (2020-06-01)
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/galaxy/containers/docker_model.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,780 +0,0 @@
-"""
-Model objects for docker objects
-"""
-from __future__ import absolute_import
-
-import logging
-
-try:
-    import docker
-except ImportError:
-    from galaxy.util.bunch import Bunch
-    docker = Bunch(errors=Bunch(NotFound=None))
-from six.moves import shlex_quote
-
-from galaxy.containers import (
-    Container,
-    ContainerPort,
-    ContainerVolume
-)
-from galaxy.util import (
-    pretty_print_time_interval,
-    unicodify,
-)
-
-
-CPUS_LABEL = '_galaxy_cpus'
-IMAGE_LABEL = '_galaxy_image'
-CPUS_CONSTRAINT = 'node.labels.' + CPUS_LABEL
-IMAGE_CONSTRAINT = 'node.labels.' + IMAGE_LABEL
-
-log = logging.getLogger(__name__)
-
-
-class DockerAttributeContainer(object):
-
-    def __init__(self, members=None):
-        if members is None:
-            members = set()
-        self._members = members
-
-    def __eq__(self, other):
-        return self.members == other.members
-
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __hash__(self):
-        return hash(tuple(sorted([repr(x) for x in self._members])))
-
-    def __str__(self):
-        return ', '.join([str(x) for x in self._members]) or 'None'
-
-    def __iter__(self):
-        return iter(self._members)
-
-    def __getitem__(self, name):
-        for member in self._members:
-            if member.name == name:
-                return member
-        else:
-            raise KeyError(name)
-
-    def __contains__(self, item):
-        return item in self._members
-
-    @property
-    def members(self):
-        return frozenset(self._members)
-
-    def hash(self):
-        return hex(self.__hash__())[2:]
-
-    def get(self, name, default):
-        try:
-            return self[name]
-        except KeyError:
-            return default
-
-
-class DockerVolume(ContainerVolume):
-    @classmethod
-    def from_str(cls, as_str):
-        """Construct an instance from a string as would be passed to `docker run --volume`.
-
-        A string in the format ``<host_path>:<mode>`` is supported for legacy purposes even though it is not valid
-        Docker volume syntax.
-        """
-        if not as_str:
-            raise ValueError("Failed to parse Docker volume from %s" % as_str)
-        parts = as_str.split(":", 2)
-        kwds = dict(host_path=parts[0])
-        if len(parts) == 1:
-            # auto-generated volume
-            kwds["path"] = kwds["host_path"]
-        elif len(parts) == 2:
-            # /host_path:mode is not (or is no longer?) valid Docker volume syntax
-            if parts[1] in DockerVolume.valid_modes:
-                kwds["mode"] = parts[1]
-                kwds["path"] = kwds["host_path"]
-            else:
-                kwds["path"] = parts[1]
-        elif len(parts) == 3:
-            kwds["path"] = parts[1]
-            kwds["mode"] = parts[2]
-        return cls(**kwds)
-
-    def __str__(self):
-        volume_str = ":".join(filter(lambda x: x is not None, (self.host_path, self.path, self.mode)))
-        if "$" not in volume_str:
-            volume_for_cmd_line = shlex_quote(volume_str)
-        else:
-            # e.g. $_GALAXY_JOB_TMP_DIR:$_GALAXY_JOB_TMP_DIR:rw so don't single quote.
-            volume_for_cmd_line = '"%s"' % volume_str
-        return volume_for_cmd_line
-
-    def to_native(self):
-        host_path = self.host_path or self.path
-        return (self.path, {host_path: {'bind': self.path, 'mode': self.mode}})
-
-
-class DockerContainer(Container):
-
-    def __init__(self, interface, id, name=None, inspect=None):
-        super(DockerContainer, self).__init__(interface, id, name=name)
-        self._inspect = inspect
-
-    @classmethod
-    def from_id(cls, interface, id):
-        inspect = interface.inspect(id)
-        return cls(interface, id, name=inspect['Name'], inspect=inspect)
-
-    @property
-    def ports(self):
-        # {
-        #     "NetworkSettings" : {
-        #         "Ports" : {
-        #             "3306/tcp" : [
-        #                 {
-        #                     "HostIp" : "127.0.0.1",
-        #                     "HostPort" : "3306"
-        #                 }
-        #             ]
-        rval = []
-        try:
-            port_mappings = self.inspect['NetworkSettings']['Ports']
-        except KeyError:
-            log.warning("Failed to get ports for container %s from `docker inspect` output at "
-                        "['NetworkSettings']['Ports']: %s: %s", self.id, exc_info=True)
-            return None
-        for port_name in port_mappings:
-            for binding in port_mappings[port_name]:
-                rval.append(ContainerPort(
-                    int(port_name.split('/')[0]),
-                    port_name.split('/')[1],
-                    self.address,
-                    int(binding['HostPort']),
-                ))
-        return rval
-
-    @property
-    def address(self):
-        if self._interface.host and self._interface.host.startswith('tcp://'):
-            return self._interface.host.replace('tcp://', '').split(':', 1)[0]
-        else:
-            return 'localhost'
-
-    def is_ready(self):
-        return self.inspect['State']['Running']
-
-    def __eq__(self, other):
-        return self._id == other.id
-
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __hash__(self):
-        return hash(self._id)
-
-    @property
-    def inspect(self):
-        if not self._inspect:
-            self._inspect = self._interface.inspect(self._id)
-        return self._inspect
-
-
-class DockerService(Container):
-
-    def __init__(self, interface, id, name=None, image=None, inspect=None):
-        super(DockerService, self).__init__(interface, id, name=name)
-        self._image = image
-        self._inspect = inspect
-        self._env = {}
-        self._tasks = []
-        if inspect:
-            self._name = name or inspect['Spec']['Name']
-            self._image = image or inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
-
-    @classmethod
-    def from_cli(cls, interface, s, task_list):
-        service = cls(interface, s['ID'], name=s['NAME'], image=s['IMAGE'])
-        for task_dict in task_list:
-            if task_dict['NAME'].strip().startswith(r'\_'):
-                continue    # historical task
-            service.task_add(DockerTask.from_cli(interface, task_dict, service=service))
-        return service
-
-    @classmethod
-    def from_id(cls, interface, id):
-        inspect = interface.service_inspect(id)
-        service = cls(interface, id, inspect=inspect)
-        for task in interface.service_tasks(service):
-            service.task_add(task)
-        return service
-
-    @property
-    def ports(self):
-        # {
-        #     "Endpoint": {
-        #         "Ports": [
-        #             {
-        #                 "Protocol": "tcp",
-        #                 "TargetPort": 8888,
-        #                 "PublishedPort": 30000,
-        #                 "PublishMode": "ingress"
-        #             }
-        #         ]
-        rval = []
-        try:
-            port_mappings = self.inspect['Endpoint']['Ports']
-        except (IndexError, KeyError):
-            log.warning("Failed to get ports for container %s from `docker service inspect` output at "
-                        "['Endpoint']['Ports']: %s: %s", self.id, exc_info=True)
-            return None
-        for binding in port_mappings:
-            rval.append(ContainerPort(
-                binding['TargetPort'],
-                binding['Protocol'],
-                self.address,               # use the routing mesh
-                binding['PublishedPort']
-            ))
-        return rval
-
-    @property
-    def address(self):
-        if self._interface.host and self._interface.host.startswith('tcp://'):
-            return self._interface.host.replace('tcp://', '').split(':', 1)[0]
-        else:
-            return 'localhost'
-
-    def is_ready(self):
-        return self.in_state('Running', 'Running')
-
-    def __eq__(self, other):
-        return self._id == other.id
-
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __hash__(self):
-        return hash(self._id)
-
-    def task_add(self, task):
-        self._tasks.append(task)
-
-    @property
-    def inspect(self):
-        if not self._inspect:
-            self._inspect = self._interface.service_inspect(self._id)
-        return self._inspect
-
-    @property
-    def state(self):
-        """If one of this service's tasks desired state is running, return that task state, otherwise, return the state
-        of a non-running task.
-
-        This is imperfect because it doesn't attempt to provide useful information for replicas > 1 tasks, but it suits
-        our purposes for now.
-        """
-        state = None
-        for task in self.tasks:
-            state = task.state
-            if task.desired_state == 'running':
-                break
-        return state
-
-    @property
-    def env(self):
-        if not self._env:
-            try:
-                for env_str in self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Env']:
-                    try:
-                        self._env.update([env_str.split('=', 1)])
-                    except ValueError:
-                        self._env[env_str] = None
-            except KeyError as exc:
-                log.debug('Cannot retrieve container environment: KeyError: %s', unicodify(exc))
-        return self._env
-
-    @property
-    def terminal(self):
-        """Same caveats as :meth:`state`.
-        """
-        for task in self.tasks:
-            if task.desired_state == 'running':
-                return False
-        return True
-
-    @property
-    def node(self):
-        """Same caveats as :meth:`state`.
-        """
-        for task in self.tasks:
-            if task.node is not None:
-                return task.node
-        return None
-
-    @property
-    def image(self):
-        if self._image is None:
-            self._image = self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
-        return self._image
-
-    @property
-    def cpus(self):
-        try:
-            cpus = self.inspect['Spec']['TaskTemplate']['Resources']['Limits']['NanoCPUs'] / 1000000000.0
-            if cpus == int(cpus):
-                cpus = int(cpus)
-            return cpus
-        except KeyError:
-            return 0
-
-    @property
-    def constraints(self):
-        constraints = self.inspect['Spec']['TaskTemplate']['Placement'].get('Constraints', [])
-        return DockerServiceConstraints.from_constraint_string_list(constraints)
-
-    @property
-    def tasks(self):
-        """A list of *all* tasks, including terminal ones.
-        """
-        if not self._tasks:
-            self._tasks = []
-            for task in self._interface.service_tasks(self):
-                self.task_add(task)
-        return self._tasks
-
-    @property
-    def task_count(self):
-        """A count of *all* tasks, including terminal ones.
-        """
-        return len(self.tasks)
-
-    def in_state(self, desired, current, tasks='any'):
-        """Indicate if one of this service's tasks matches the desired state.
-        """
-        for task in self.tasks:
-            if task.in_state(desired, current):
-                if tasks == 'any':
-                    # at least 1 task in desired state
-                    return True
-            elif tasks == 'all':
-                # at least 1 task not in desired state
-                return False
-        else:
-            return False if tasks == 'any' else True
-
-    def constraint_add(self, name, op, value):
-        self._interface.service_constraint_add(self.id, name, op, value)
-
-    def set_cpus(self):
-        self.constraint_add(CPUS_LABEL, '==', self.cpus)
-
-    def set_image(self):
-        self.constraint_add(IMAGE_LABEL, '==', self.image)
-
-
-class DockerServiceConstraint(object):
-
-    def __init__(self, name=None, op=None, value=None):
-        self._name = name
-        self._op = op
-        self._value = value
-
-    def __eq__(self, other):
-        return self._name == other._name and \
-            self._op == other._op and \
-            self._value == other._value
-
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __hash__(self):
-        return hash((self._name, self._op, self._value))
-
-    def __repr__(self):
-        return '%s(%s%s%s)' % (self.__class__.__name__, self._name, self._op, self._value)
-
-    def __str__(self):
-        return '%s%s%s' % (self._name, self._op, self._value)
-
-    @staticmethod
-    def split_constraint_string(constraint_str):
-        constraint = (constraint_str, '', '')
-        for op in '==', '!=':
-            t = constraint_str.partition(op)
-            if len(t[0]) < len(constraint[0]):
-                constraint = t
-        if constraint[0] == constraint_str:
-            raise Exception('Unable to parse constraint string: %s' % constraint_str)
-        return [x.strip() for x in constraint]
-
-    @classmethod
-    def from_str(cls, constraint_str):
-        name, op, value = DockerServiceConstraint.split_constraint_string(constraint_str)
-        return cls(name=name, op=op, value=value)
-
-    @property
-    def name(self):
-        return self._name
-
-    @property
-    def op(self):
-        return self._op
-
-    @property
-    def value(self):
-        return self._value
-
-    @property
-    def label(self):
-        return DockerNodeLabel(
-            name=self.name.replace('node.labels.', ''),
-            value=self.value
-        )
-
-
-class DockerServiceConstraints(DockerAttributeContainer):
-
-    member_class = DockerServiceConstraint
-
-    @classmethod
-    def from_constraint_string_list(cls, inspect):
-        members = []
-        for member_str in inspect:
-            members.append(cls.member_class.from_str(member_str))
-        return cls(members=members)
-
-    @property
-    def labels(self):
-        return DockerNodeLabels(members=[x.label for x in self.members])
-
-
-class DockerNode(object):
-
-    def __init__(self, interface, id=None, name=None, status=None,
-                 availability=None, manager=False, inspect=None):
-        self._interface = interface
-        self._id = id
-        self._name = name
-        self._status = status
-        self._availability = availability
-        self._manager = manager
-        self._inspect = inspect
-        if inspect:
-            self._name = name or inspect['Description']['Hostname']
-            self._status = status or inspect['Status']['State']
-            self._availability = inspect['Spec']['Availability']
-            self._manager = manager or inspect['Spec']['Role'] == 'manager'
-        self._tasks = []
-
-    @classmethod
-    def from_cli(cls, interface, n, task_list):
-        node = cls(interface, id=n['ID'], name=n['HOSTNAME'], status=n['STATUS'],
-                   availability=n['AVAILABILITY'], manager=True if n['MANAGER STATUS'] else False)
-        for task_dict in task_list:
-            node.task_add(DockerTask.from_cli(interface, task_dict, node=node))
-        return node
-
-    @classmethod
-    def from_id(cls, interface, id):
-        inspect = interface.node_inspect(id)
-        node = cls(interface, id, inspect=inspect)
-        for task in interface.node_tasks(node):
-            node.task_add(task)
-        return node
-
-    def task_add(self, task):
-        self._tasks.append(task)
-
-    @property
-    def id(self):
-        return self._id
-
-    @property
-    def name(self):
-        return self._name
-
-    @property
-    def version(self):
-        # this changes on update so don't cache
-        return self._interface.node_inspect(self._id or self._name)['Version']['Index']
-
-    @property
-    def inspect(self):
-        if not self._inspect:
-            self._inspect = self._interface.node_inspect(self._id or self._name)
-        return self._inspect
-
-    @property
-    def state(self):
-        return ('%s-%s' % (self._status, self._availability)).lower()
-
-    @property
-    def cpus(self):
-        return self.inspect['Description']['Resources']['NanoCPUs'] / 1000000000
-
-    @property
-    def labels(self):
-        labels = self.inspect['Spec'].get('Labels', {}) or {}
-        return DockerNodeLabels.from_label_dictionary(labels)
-
-    def label_add(self, label, value):
-        self._interface.node_update(self.id, label_add={label: value})
-
-    @property
-    def labels_as_constraints(self):
-        constraints_strings = [x.constraint_string for x in self.labels]
-        return DockerServiceConstraints.from_constraint_string_list(constraints_strings)
-
-    def set_labels_for_constraints(self, constraints):
-        for label in self._constraints_to_label_args(constraints):
-            if label not in self.labels:
-                log.info("setting node '%s' label '%s' to '%s'", self.name, label.name, label.value)
-                self.label_add(label.name, label.value)
-
-    def _constraints_to_label_args(self, constraints):
-        constraints = filter(lambda x: x.name.startswith('node.labels.') and x.op == '==', constraints)
-        labels = map(lambda x: DockerNodeLabel(name=x.name.replace('node.labels.', '', 1), value=x.value), constraints)
-        return labels
-
-    @property
-    def tasks(self):
-        """A list of *all* tasks, including terminal ones.
-        """
-        if not self._tasks:
-            self._tasks = []
-            for task in self._interface.node_tasks(self):
-                self.task_add(task)
-        return self._tasks
-
-    @property
-    def non_terminal_tasks(self):
-        r = []
-        for task in self.tasks:
-            # ensure the task has a service - it is possible for "phantom" tasks to exist (service is removed, no
-            # container is running, but the task still shows up in the node's task list)
-            if not task.terminal and task.service is not None:
-                r.append(task)
-        return r
-
-    @property
-    def task_count(self):
-        """A count of *all* tasks, including terminal ones.
-        """
-        return len(self.tasks)
-
-    def in_state(self, status, availability):
-        return self._status.lower() == status.lower() and self._availability.lower() == availability.lower()
-
-    def is_ok(self):
-        return self.in_state('Ready', 'Active')
-
-    def is_managed(self):
-        return not self._manager
-
-    def destroyable(self):
-        return not self._manager and self.is_ok() and self.task_count == 0
-
-    def drain(self):
-        self._interface.node_update(self.id, availability='drain')
-
-
-class DockerNodeLabel(object):
-
-    def __init__(self, name=None, value=None):
-        self._name = name
-        self._value = value
-
-    def __eq__(self, other):
-        return self._name == other._name and \
-            self._value == other._value
-
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __hash__(self):
-        return hash((self._name, self._value))
-
-    def __repr__(self):
-        return '%s(%s: %s)' % (self.__class__.__name__, self._name, self._value)
-
-    def __str__(self):
-        return '%s: %s' % (self._name, self._value)
-
-    @property
-    def name(self):
-        return self._name
-
-    @property
-    def value(self):
-        return self._value
-
-    @property
-    def constraint_string(self):
-        return 'node.labels.{name}=={value}'.format(name=self.name, value=self.value)
-
-    @property
-    def constraint(self):
-        return DockerServiceConstraint(
-            name='node.labels.{name}'.format(name=self.name),
-            op='==',
-            value=self.value
-        )
-
-
-class DockerNodeLabels(DockerAttributeContainer):
-
-    member_class = DockerNodeLabel
-
-    @classmethod
-    def from_label_dictionary(cls, inspect):
-        members = []
-        for k, v in inspect.items():
-            members.append(cls.member_class(name=k, value=v))
-        return cls(members=members)
-
-    @property
-    def constraints(self):
-        return DockerServiceConstraints(members=[x.constraint for x in self.members])
-
-
-class DockerTask(object):
-
-    # these are the possible *current* state terminal states
-    terminal_states = (
-        'shutdown',  # this is normally only a desired state but I've seen a task with it as current as well
-        'complete',
-        'failed',
-        'rejected',
-        'orphaned',
-    )
-
-    def __init__(self, interface, id=None, name=None, image=None, desired_state=None,
-                 state=None, error=None, ports=None, service=None, node=None):
-        self._interface = interface
-        self._id = id
-        self._name = name
-        self._image = image
-        self._desired_state = desired_state
-        self._state = state
-        self._error = error
-        self._ports = ports
-        self._service = service
-        self._node = node
-        self._inspect = None
-
-    @classmethod
-    def from_cli(cls, interface, t, service=None, node=None):
-        state = t['CURRENT STATE'].split()[0]
-        return cls(interface, id=t['ID'], name=t['NAME'], image=t['IMAGE'],
-                   desired_state=t['DESIRED STATE'], state=state, error=t['ERROR'],
-                   ports=t['PORTS'], service=service, node=node)
-
-    @classmethod
-    def from_api(cls, interface, t, service=None, node=None):
-        service = service or interface.service(id=t.get('ServiceID'))
-        node = node or interface.node(id=t.get('NodeID'))
-        if service:
-            name = service.name + '.' + str(t['Slot'])
-        else:
-            name = t['ID']
-        image = t['Spec']['ContainerSpec']['Image'].split('@', 1)[0],  # remove pin
-        return cls(interface, id=t['ID'], name=name, image=image, desired_state=t['DesiredState'],
-                   state=t['Status']['State'], ports=t['Status']['PortStatus'], error=t['Status']['Message'],
-                   service=service, node=node)
-
-    @property
-    def id(self):
-        return self._id
-
-    @property
-    def name(self):
-        return self._name
-
-    @property
-    def inspect(self):
-        if not self._inspect:
-            try:
-                self._inspect = self._interface.task_inspect(self._id)
-            except docker.errors.NotFound:
-                # This shouldn't be possible, appears to be some kind of Swarm bug (the node claims to have a task that
-                # does not actually exist anymore, nor does its service exist).
-                log.error('Task could not be inspected because Docker claims it does not exist: %s (%s)',
-                          self.name, self.id)
-                return None
-        return self._inspect
-
-    @property
-    def slot(self):
-        try:
-            return self.inspect['Slot']
-        except TypeError:
-            return None
-
-    @property
-    def node(self):
-        if not self._node:
-            try:
-                self._node = self._interface.node(id=self.inspect['NodeID'])
-            except TypeError:
-                return None
-        return self._node
-
-    @property
-    def service(self):
-        if not self._service:
-            try:
-                self._service = self._interface.service(id=self.inspect['ServiceID'])
-            except TypeError:
-                return None
-        return self._service
-
-    @property
-    def cpus(self):
-        try:
-            cpus = self.inspect['Spec']['Resources']['Reservations']['NanoCPUs'] / 1000000000.0
-            if cpus == int(cpus):
-                cpus = int(cpus)
-            return cpus
-        except TypeError:
-            return None
-        except KeyError:
-            return 0
-
-    @property
-    def state(self):
-        return ('%s-%s' % (self._desired_state, self._state)).lower()
-
-    @property
-    def current_state(self):
-        try:
-            return self._state.lower()
-        except TypeError:
-            log.warning("Current state of %s (%s) is not a string: %s", self.name, self.id, str(self._state))
-            return None
-
-    @property
-    def current_state_time(self):
-        # Docker API returns a stamp w/ higher second precision than Python takes
-        try:
-            stamp = self.inspect['Status']['Timestamp']
-        except TypeError:
-            return None
-        return pretty_print_time_interval(time=stamp[:stamp.index('.') + 7], precise=True, utc=stamp[-1] == 'Z')
-
-    @property
-    def desired_state(self):
-        try:
-            return self._desired_state.lower()
-        except TypeError:
-            log.warning("Desired state of %s (%s) is not a string: %s", self.name, self.id, str(self._desired_state))
-            return None
-
-    @property
-    def terminal(self):
-        return self.desired_state == 'shutdown' and self.current_state in self.terminal_states
-
-    def in_state(self, desired, current):
-        return self.desired_state == desired.lower() and self.current_state == current.lower()