Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/galaxy/containers/docker_swarm.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/galaxy/containers/docker_swarm.py Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,498 @@ +""" +Docker Swarm mode interface +""" +from __future__ import absolute_import + +import logging +import os.path +import subprocess +from functools import partial + +try: + import docker.types +except ImportError: + from galaxy.util.bunch import Bunch + docker = Bunch(types=Bunch( + ContainerSpec=None, + RestartPolicy=None, + Resources=None, + Placement=None, + )) + +from galaxy.containers.docker import ( + DockerAPIInterface, + DockerCLIInterface, + DockerInterface +) +from galaxy.containers.docker_decorators import docker_columns, docker_json +from galaxy.containers.docker_model import ( + CPUS_CONSTRAINT, + DockerNode, + DockerService, + DockerTask, + IMAGE_CONSTRAINT +) +from galaxy.exceptions import ContainerRunError +from galaxy.util import unicodify +from galaxy.util.json import safe_dumps_formatted + +log = logging.getLogger(__name__) + +SWARM_MANAGER_PATH = os.path.abspath( + os.path.join( + os.path.dirname(__file__), + os.path.pardir, + os.path.pardir, + os.path.pardir, + 'scripts', + 'docker_swarm_manager.py')) + + +class DockerSwarmInterface(DockerInterface): + + container_class = DockerService + conf_defaults = { + 'ignore_volumes': False, + 'node_prefix': None, + 'service_create_image_constraint': False, + 'service_create_cpus_constraint': False, + 'resolve_image_digest': False, + 'managed': True, + 'manager_autostart': True, + } + publish_port_list_required = True + supports_volumes = False + + def validate_config(self): + super(DockerSwarmInterface, self).validate_config() + self._node_prefix = self._conf.node_prefix + + def run_in_container(self, command, image=None, **kwopts): + """Run a service like a detached container + """ + kwopts['replicas'] = 1 + kwopts['restart_condition'] = 'none' + if kwopts.get('publish_all_ports', False): + # not supported for services + # TODO: inspect image (or query registry if possible) for port list + if kwopts.get('publish_port_random', False) or kwopts.get('ports', False): + # assume this covers for publish_all_ports + del kwopts['publish_all_ports'] + else: + raise ContainerRunError( + "Publishing all ports is not supported in Docker swarm" + " mode, use `publish_port_random` or `ports`", + image=image, + command=command + ) + if not kwopts.get('detach', True): + raise ContainerRunError( + "Running attached containers is not supported in Docker swarm mode", + image=image, + command=command + ) + elif kwopts.get('detach', None): + del kwopts['detach'] + if kwopts.get('volumes', None): + if self._conf.ignore_volumes: + log.warning( + "'volumes' kwopt is set and not supported in Docker swarm " + "mode, volumes will not be passed (set 'ignore_volumes: " + "False' in containers config to fail instead): %s" % kwopts['volumes'] + ) + else: + raise ContainerRunError( + "'volumes' kwopt is set and not supported in Docker swarm " + "mode (set 'ignore_volumes: True' in containers config to " + "warn instead): %s" % kwopts['volumes'], + image=image, + command=command + ) + # ensure the volumes key is removed from kwopts + kwopts.pop('volumes', None) + service = self.service_create(command, image=image, **kwopts) + self._run_swarm_manager() + return service + + # + # helpers + # + + def _run_swarm_manager(self): + if self._conf.managed and self._conf.manager_autostart: + try: + # sys.exectuable would be preferable to using $PATH, but sys.executable is probably uwsgi + subprocess.check_call(['python', SWARM_MANAGER_PATH, '--containers-config-file', + self.containers_config_file, '--swarm', self.key]) + except subprocess.CalledProcessError as exc: + log.error('Failed to launch swarm manager: %s', unicodify(exc)) + + def _get_image(self, image): + """Get the image string, either from the argument, or from the + configured interface default if ``image`` is ``None``. Optionally + resolve the image to its digest if ``resolve_image_digest`` is set in + the interface configuration. + + If the image has not been pulled, the repo digest cannot be determined + and the image name will be returned. + + :type image: str or None + :param image: image id or name + + :returns: image name or image repo digest + """ + if not image: + image = self._conf.image + assert image is not None, "No image supplied as parameter and no image set as default in config, cannot create service" + if self._conf.resolve_image_digest: + image = self.image_repodigest(image) + return image + + def _objects_by_attribute(self, generator, attribute_name): + rval = {} + for obj in generator: + attr = getattr(obj, attribute_name) + if attr not in rval: + rval[attr] = [] + rval[attr].append(obj) + return rval + + # + # docker object generators + # + + def services(self, id=None, name=None): + for service_dict in self.service_ls(id=id, name=name): + service_id = service_dict['ID'] + service = DockerService(self, service_id, inspect=service_dict) + if service.name.startswith(self._name_prefix): + yield service + + def service(self, id=None, name=None): + try: + return next(self.services(id=id, name=name)) + except StopIteration: + return None + + def services_in_state(self, desired, current, tasks='any'): + for service in self.services(): + if service.in_state(desired, current, tasks=tasks): + yield service + + def service_tasks(self, service): + for task_dict in self.service_ps(service.id): + yield DockerTask.from_api(self, task_dict, service=service) + + def nodes(self, id=None, name=None): + for node_dict in self.node_ls(id=id, name=name): + node_id = node_dict['ID'] + node = DockerNode(self, node_id, inspect=node_dict) + if self._node_prefix and not node.name.startswith(self._node_prefix): + continue + yield node + + def node(self, id=None, name=None): + try: + return next(self.nodes(id=id, name=name)) + except StopIteration: + return None + + def nodes_in_state(self, status, availability): + for node in self.nodes(): + if node.in_state(status, availability): + yield node + + def node_tasks(self, node): + for task_dict in self.node_ps(node.id): + yield DockerTask.from_api(self, task_dict, node=node) + + # + # higher level queries + # + + def services_waiting(self): + return self.services_in_state('Running', 'Pending') + + def services_waiting_by_constraints(self): + return self._objects_by_attribute(self.services_waiting(), 'constraints') + + def services_completed(self): + return self.services_in_state('Shutdown', 'Complete', tasks='all') + + def services_terminal(self): + return [s for s in self.services() if s.terminal] + + def nodes_active(self): + return self.nodes_in_state('Ready', 'Active') + + def nodes_active_by_constraints(self): + return self._objects_by_attribute(self.nodes_active(), 'labels_as_constraints') + + # + # operations + # + + def services_clean(self): + cleaned_service_ids = [] + completed_services = list(self.services_completed()) # returns a generator, should probably fix this + if completed_services: + cleaned_service_ids.extend(self.service_rm([x.id for x in completed_services])) + terminal_services = list(self.services_terminal()) + for service in terminal_services: + log.warning('cleaned service in abnormal terminal state: %s (%s). state: %s', service.name, service.id, service.state) + if terminal_services: + cleaned_service_ids.extend(self.service_rm([x.id for x in terminal_services])) + return filter(lambda x: x.id in cleaned_service_ids, completed_services + terminal_services) + + +class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface): + + container_type = 'docker_swarm_cli' + option_map = { + # `service create` options + 'constraint': {'flag': '--constraint', 'type': 'list_of_kovtrips'}, + 'replicas': {'flag': '--replicas', 'type': 'string'}, + 'restart_condition': {'flag': '--restart-condition', 'type': 'string'}, + 'environment': {'flag': '--env', 'type': 'list_of_kvpairs'}, + 'name': {'flag': '--name', 'type': 'string'}, + 'publish_port_random': {'flag': '--publish', 'type': 'string'}, + 'cpu_limit': {'flag': '--limit-cpu', 'type': 'string'}, + 'mem_limit': {'flag': '--limit-memory', 'type': 'string'}, + 'cpu_reservation': {'flag': '--reserve-cpu', 'type': 'string'}, + 'mem_reservation': {'flag': '--reserve-memory', 'type': 'string'}, + # `service update` options + 'label_add': {'flag': '--label-add', 'type': 'list_of_kvpairs'}, + 'label_rm': {'flag': '--label-rm', 'type': 'list_of_kvpairs'}, + 'availability': {'flag': '--availability', 'type': 'string'}, + } + + # + # docker object generators + # + + def services(self, id=None, name=None): + for service_dict in self.service_ls(id=id, name=name): + service_id = service_dict['ID'] + service_name = service_dict['NAME'] + if not service_name.startswith(self._name_prefix): + continue + task_list = self.service_ps(service_id) + yield DockerService.from_cli(self, service_dict, task_list) + + def service_tasks(self, service): + for task_dict in self.service_ps(service.id): + if task_dict['NAME'].strip().startswith(r'\_'): + continue # historical task + yield DockerTask.from_cli(self, task_dict, service=service) + + def nodes(self, id=None, name=None): + for node_dict in self.node_ls(id=id, name=name): + node_id = node_dict['ID'].strip(' *') + node_name = node_dict['HOSTNAME'] + if self._node_prefix and not node_name.startswith(self._node_prefix): + continue + task_list = filter(lambda x: x['NAME'].startswith(self._name_prefix), self.node_ps(node_id)) + yield DockerNode.from_cli(self, node_dict, task_list) + + # + # docker subcommands + # + + def service_create(self, command, image=None, **kwopts): + if ('service_create_image_constraint' in self._conf or 'service_create_cpus_constraint' in self._conf) and 'constraint' not in kwopts: + kwopts['constraint'] = [] + image = self._get_image(image) + if self._conf.service_create_image_constraint: + kwopts['constraint'].append((IMAGE_CONSTRAINT, '==', image)) + if self._conf.service_create_cpus_constraint: + cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) + kwopts['constraint'].append((CPUS_CONSTRAINT, '==', cpus)) + if self._conf.cpus: + kwopts['cpu_limit'] = self._conf.cpus + kwopts['cpu_reservation'] = self._conf.cpus + if self._conf.memory: + kwopts['mem_limit'] = self._conf.memory + kwopts['mem_reservation'] = self._conf.memory + self.set_kwopts_name(kwopts) + args = '{kwopts} {image} {command}'.format( + kwopts=self._stringify_kwopts(kwopts), + image=image if image else '', + command=command if command else '', + ).strip() + service_id = self._run_docker(subcommand='service create', args=args, verbose=True) + return DockerService.from_id(self, service_id) + + @docker_json + def service_inspect(self, service_id): + return self._run_docker(subcommand='service inspect', args=service_id)[0] + + @docker_columns + def service_ls(self, id=None, name=None): + return self._run_docker(subcommand='service ls', args=self._filter_by_id_or_name(id, name)) + + @docker_columns + def service_ps(self, service_id): + return self._run_docker(subcommand='service ps', args='--no-trunc {}'.format(service_id)) + + def service_rm(self, service_ids): + service_ids = ' '.join(service_ids) + return self._run_docker(subcommand='service rm', args=service_ids).splitlines() + + @docker_json + def node_inspect(self, node_id): + return self._run_docker(subcommand='node inspect', args=node_id)[0] + + @docker_columns + def node_ls(self, id=None, name=None): + return self._run_docker(subcommand='node ls', args=self._filter_by_id_or_name(id, name)) + + @docker_columns + def node_ps(self, node_id): + return self._run_docker(subcommand='node ps', args='--no-trunc {}'.format(node_id)) + + def node_update(self, node_id, **kwopts): + return self._run_docker(subcommand='node update', args='{kwopts} {node_id}'.format( + kwopts=self._stringify_kwopts(kwopts), + node_id=node_id + )) + + @docker_json + def task_inspect(self, task_id): + return self._run_docker(subcommand="inspect", args=task_id) + + +class DockerSwarmAPIInterface(DockerSwarmInterface, DockerAPIInterface): + + container_type = 'docker_swarm' + placement_option_map = { + 'constraint': {'param': 'constraints'}, + } + service_mode_option_map = { + 'service_mode': {'param': 0, 'default': 'replicated'}, + 'replicas': {'default': 1}, + } + endpoint_spec_option_map = { + 'ports': {}, + } + resources_option_map = { + 'cpus': {'params': ('cpu_limit', 'cpu_reservation'), 'map': lambda x: int(x * 1000000000)}, + 'memory': {'params': ('mem_limit', 'mem_reservation')}, + } + container_spec_option_map = { + 'image': {'param': 0}, + 'command': {}, + 'environment': {'param': 'env'}, + 'labels': {}, + } + restart_policy_option_map = { + 'restart_condition': {'param': 'condition', 'default': 'none'}, + 'restart_delay': {'param': 'delay'}, + 'restart_max_attempts': {'param': 'max_attemps'}, + } + task_template_option_map = { + '_container_spec': {'spec_class': docker.types.ContainerSpec, 'required': True}, + '_resources': {'spec_class': docker.types.Resources}, + '_restart_policy': {'spec_class': docker.types.RestartPolicy}, + '_placement': {'spec_class': docker.types.Placement}, + } + node_spec_option_map = { + 'availability': {'param': 'Availability'}, + 'name': {'param': 'Name'}, + 'role': {'param': 'Role'}, + 'labels': {'param': 'Labels'}, + } + + @staticmethod + def create_random_port_spec(port): + return { + 'Protocol': 'tcp', + 'PublishedPort': None, + 'TargetPort': port, + } + + # + # docker subcommands + # + + def service_create(self, command, image=None, **kwopts): + # TODO: some of this should probably move to run_in_container when the CLI interface is removed + log.debug("Creating docker service with image '%s' for command: %s", image, command) + # insert run kwopts from config + for opt in self.conf_run_kwopts: + if self._conf[opt]: + kwopts[opt] = self._conf[opt] + # image is part of the container spec + kwopts['image'] = self._get_image(image) + # service constraints + kwopts['constraint'] = kwopts.get('constraint', []) + if self._conf.service_create_image_constraint: + kwopts['constraint'].append((IMAGE_CONSTRAINT + '==' + image)) + if self._conf.service_create_cpus_constraint: + cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) + kwopts['constraint'].append((CPUS_CONSTRAINT + '==' + cpus)) + # ports + if 'publish_port_random' in kwopts: + kwopts['ports'] = [DockerSwarmAPIInterface.create_random_port_spec(kwopts.pop('publish_port_random'))] + # create specs + service_mode = self._create_docker_api_spec('service_mode', docker.types.ServiceMode, kwopts) + endpoint_spec = self._create_docker_api_spec('endpoint_spec', docker.types.EndpointSpec, kwopts) + task_template = self._create_docker_api_spec('task_template', docker.types.TaskTemplate, kwopts) + self.set_kwopts_name(kwopts) + log.debug("Docker service task template:\n%s", safe_dumps_formatted(task_template)) + log.debug("Docker service endpoint specification:\n%s", safe_dumps_formatted(endpoint_spec)) + log.debug("Docker service mode:\n%s", safe_dumps_formatted(service_mode)) + log.debug("Docker service creation parameters:\n%s", safe_dumps_formatted(kwopts)) + success_test = partial(self._first, self.service_ls, name=kwopts['name']) + # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class + service = self._client.create_service( + task_template, + mode=service_mode, + endpoint_spec=endpoint_spec, + success_test=success_test, + max_tries=5, + **kwopts) + service_id = service.get('ID') + log.debug('Created service: %s (%s)', kwopts['name'], service_id) + return DockerService.from_id(self, service_id) + + def service_inspect(self, service_id): + return self._client.inspect_service(service_id) + + def service_ls(self, id=None, name=None): + return self._client.services(filters=self._filter_by_id_or_name(id, name)) + + # roughly `docker service ps` + def service_ps(self, service_id): + return self.task_ls(filters={'service': service_id}) + + def service_rm(self, service_ids): + r = [] + for service_id in service_ids: + self._client.remove_service(service_id) + r.append(service_id) + return r + + def node_inspect(self, node_id): + return self._client.inspect_node(node_id) + + def node_ls(self, id=None, name=None): + return self._client.nodes(filters=self._filter_by_id_or_name(id, name)) + + # roughly `docker node ps` + def node_ps(self, node_id): + return self.task_ls(filters={'node': node_id}) + + def node_update(self, node_id, **kwopts): + node = DockerNode.from_id(self, node_id) + spec = node.inspect['Spec'] + if 'label_add' in kwopts: + kwopts['labels'] = spec.get('Labels', {}) + kwopts['labels'].update(kwopts.pop('label_add')) + spec.update(self._create_docker_api_spec('node_spec', dict, kwopts)) + return self._client.update_node(node.id, node.version, node_spec=spec) + + def task_inspect(self, task_id): + return self._client.inspect_task(task_id) + + def task_ls(self, filters=None): + return self._client.tasks(filters=filters)