Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/galaxy/containers/docker_swarm.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
comparison
equal
deleted
inserted
replaced
4:79f47841a781 | 5:9b1c78e6ba9c |
---|---|
1 """ | |
2 Docker Swarm mode interface | |
3 """ | |
4 from __future__ import absolute_import | |
5 | |
6 import logging | |
7 import os.path | |
8 import subprocess | |
9 from functools import partial | |
10 | |
11 try: | |
12 import docker.types | |
13 except ImportError: | |
14 from galaxy.util.bunch import Bunch | |
15 docker = Bunch(types=Bunch( | |
16 ContainerSpec=None, | |
17 RestartPolicy=None, | |
18 Resources=None, | |
19 Placement=None, | |
20 )) | |
21 | |
22 from galaxy.containers.docker import ( | |
23 DockerAPIInterface, | |
24 DockerCLIInterface, | |
25 DockerInterface | |
26 ) | |
27 from galaxy.containers.docker_decorators import docker_columns, docker_json | |
28 from galaxy.containers.docker_model import ( | |
29 CPUS_CONSTRAINT, | |
30 DockerNode, | |
31 DockerService, | |
32 DockerTask, | |
33 IMAGE_CONSTRAINT | |
34 ) | |
35 from galaxy.exceptions import ContainerRunError | |
36 from galaxy.util import unicodify | |
37 from galaxy.util.json import safe_dumps_formatted | |
38 | |
39 log = logging.getLogger(__name__) | |
40 | |
41 SWARM_MANAGER_PATH = os.path.abspath( | |
42 os.path.join( | |
43 os.path.dirname(__file__), | |
44 os.path.pardir, | |
45 os.path.pardir, | |
46 os.path.pardir, | |
47 'scripts', | |
48 'docker_swarm_manager.py')) | |
49 | |
50 | |
51 class DockerSwarmInterface(DockerInterface): | |
52 | |
53 container_class = DockerService | |
54 conf_defaults = { | |
55 'ignore_volumes': False, | |
56 'node_prefix': None, | |
57 'service_create_image_constraint': False, | |
58 'service_create_cpus_constraint': False, | |
59 'resolve_image_digest': False, | |
60 'managed': True, | |
61 'manager_autostart': True, | |
62 } | |
63 publish_port_list_required = True | |
64 supports_volumes = False | |
65 | |
66 def validate_config(self): | |
67 super(DockerSwarmInterface, self).validate_config() | |
68 self._node_prefix = self._conf.node_prefix | |
69 | |
70 def run_in_container(self, command, image=None, **kwopts): | |
71 """Run a service like a detached container | |
72 """ | |
73 kwopts['replicas'] = 1 | |
74 kwopts['restart_condition'] = 'none' | |
75 if kwopts.get('publish_all_ports', False): | |
76 # not supported for services | |
77 # TODO: inspect image (or query registry if possible) for port list | |
78 if kwopts.get('publish_port_random', False) or kwopts.get('ports', False): | |
79 # assume this covers for publish_all_ports | |
80 del kwopts['publish_all_ports'] | |
81 else: | |
82 raise ContainerRunError( | |
83 "Publishing all ports is not supported in Docker swarm" | |
84 " mode, use `publish_port_random` or `ports`", | |
85 image=image, | |
86 command=command | |
87 ) | |
88 if not kwopts.get('detach', True): | |
89 raise ContainerRunError( | |
90 "Running attached containers is not supported in Docker swarm mode", | |
91 image=image, | |
92 command=command | |
93 ) | |
94 elif kwopts.get('detach', None): | |
95 del kwopts['detach'] | |
96 if kwopts.get('volumes', None): | |
97 if self._conf.ignore_volumes: | |
98 log.warning( | |
99 "'volumes' kwopt is set and not supported in Docker swarm " | |
100 "mode, volumes will not be passed (set 'ignore_volumes: " | |
101 "False' in containers config to fail instead): %s" % kwopts['volumes'] | |
102 ) | |
103 else: | |
104 raise ContainerRunError( | |
105 "'volumes' kwopt is set and not supported in Docker swarm " | |
106 "mode (set 'ignore_volumes: True' in containers config to " | |
107 "warn instead): %s" % kwopts['volumes'], | |
108 image=image, | |
109 command=command | |
110 ) | |
111 # ensure the volumes key is removed from kwopts | |
112 kwopts.pop('volumes', None) | |
113 service = self.service_create(command, image=image, **kwopts) | |
114 self._run_swarm_manager() | |
115 return service | |
116 | |
117 # | |
118 # helpers | |
119 # | |
120 | |
121 def _run_swarm_manager(self): | |
122 if self._conf.managed and self._conf.manager_autostart: | |
123 try: | |
124 # sys.exectuable would be preferable to using $PATH, but sys.executable is probably uwsgi | |
125 subprocess.check_call(['python', SWARM_MANAGER_PATH, '--containers-config-file', | |
126 self.containers_config_file, '--swarm', self.key]) | |
127 except subprocess.CalledProcessError as exc: | |
128 log.error('Failed to launch swarm manager: %s', unicodify(exc)) | |
129 | |
130 def _get_image(self, image): | |
131 """Get the image string, either from the argument, or from the | |
132 configured interface default if ``image`` is ``None``. Optionally | |
133 resolve the image to its digest if ``resolve_image_digest`` is set in | |
134 the interface configuration. | |
135 | |
136 If the image has not been pulled, the repo digest cannot be determined | |
137 and the image name will be returned. | |
138 | |
139 :type image: str or None | |
140 :param image: image id or name | |
141 | |
142 :returns: image name or image repo digest | |
143 """ | |
144 if not image: | |
145 image = self._conf.image | |
146 assert image is not None, "No image supplied as parameter and no image set as default in config, cannot create service" | |
147 if self._conf.resolve_image_digest: | |
148 image = self.image_repodigest(image) | |
149 return image | |
150 | |
151 def _objects_by_attribute(self, generator, attribute_name): | |
152 rval = {} | |
153 for obj in generator: | |
154 attr = getattr(obj, attribute_name) | |
155 if attr not in rval: | |
156 rval[attr] = [] | |
157 rval[attr].append(obj) | |
158 return rval | |
159 | |
160 # | |
161 # docker object generators | |
162 # | |
163 | |
164 def services(self, id=None, name=None): | |
165 for service_dict in self.service_ls(id=id, name=name): | |
166 service_id = service_dict['ID'] | |
167 service = DockerService(self, service_id, inspect=service_dict) | |
168 if service.name.startswith(self._name_prefix): | |
169 yield service | |
170 | |
171 def service(self, id=None, name=None): | |
172 try: | |
173 return self.services(id=id, name=name).next() | |
174 except StopIteration: | |
175 return None | |
176 | |
177 def services_in_state(self, desired, current, tasks='any'): | |
178 for service in self.services(): | |
179 if service.in_state(desired, current, tasks=tasks): | |
180 yield service | |
181 | |
182 def service_tasks(self, service): | |
183 for task_dict in self.service_ps(service.id): | |
184 yield DockerTask.from_api(self, task_dict, service=service) | |
185 | |
186 def nodes(self, id=None, name=None): | |
187 for node_dict in self.node_ls(id=id, name=name): | |
188 node_id = node_dict['ID'] | |
189 node = DockerNode(self, node_id, inspect=node_dict) | |
190 if self._node_prefix and not node.name.startswith(self._node_prefix): | |
191 continue | |
192 yield node | |
193 | |
194 def node(self, id=None, name=None): | |
195 try: | |
196 return self.nodes(id=id, name=name).next() | |
197 except StopIteration: | |
198 return None | |
199 | |
200 def nodes_in_state(self, status, availability): | |
201 for node in self.nodes(): | |
202 if node.in_state(status, availability): | |
203 yield node | |
204 | |
205 def node_tasks(self, node): | |
206 for task_dict in self.node_ps(node.id): | |
207 yield DockerTask.from_api(self, task_dict, node=node) | |
208 | |
209 # | |
210 # higher level queries | |
211 # | |
212 | |
213 def services_waiting(self): | |
214 return self.services_in_state('Running', 'Pending') | |
215 | |
216 def services_waiting_by_constraints(self): | |
217 return self._objects_by_attribute(self.services_waiting(), 'constraints') | |
218 | |
219 def services_completed(self): | |
220 return self.services_in_state('Shutdown', 'Complete', tasks='all') | |
221 | |
222 def services_terminal(self): | |
223 return [s for s in self.services() if s.terminal] | |
224 | |
225 def nodes_active(self): | |
226 return self.nodes_in_state('Ready', 'Active') | |
227 | |
228 def nodes_active_by_constraints(self): | |
229 return self._objects_by_attribute(self.nodes_active(), 'labels_as_constraints') | |
230 | |
231 # | |
232 # operations | |
233 # | |
234 | |
235 def services_clean(self): | |
236 cleaned_service_ids = [] | |
237 completed_services = list(self.services_completed()) # returns a generator, should probably fix this | |
238 if completed_services: | |
239 cleaned_service_ids.extend(self.service_rm([x.id for x in completed_services])) | |
240 terminal_services = list(self.services_terminal()) | |
241 for service in terminal_services: | |
242 log.warning('cleaned service in abnormal terminal state: %s (%s). state: %s', service.name, service.id, service.state) | |
243 if terminal_services: | |
244 cleaned_service_ids.extend(self.service_rm([x.id for x in terminal_services])) | |
245 return filter(lambda x: x.id in cleaned_service_ids, completed_services + terminal_services) | |
246 | |
247 | |
248 class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface): | |
249 | |
250 container_type = 'docker_swarm_cli' | |
251 option_map = { | |
252 # `service create` options | |
253 'constraint': {'flag': '--constraint', 'type': 'list_of_kovtrips'}, | |
254 'replicas': {'flag': '--replicas', 'type': 'string'}, | |
255 'restart_condition': {'flag': '--restart-condition', 'type': 'string'}, | |
256 'environment': {'flag': '--env', 'type': 'list_of_kvpairs'}, | |
257 'name': {'flag': '--name', 'type': 'string'}, | |
258 'publish_port_random': {'flag': '--publish', 'type': 'string'}, | |
259 'cpu_limit': {'flag': '--limit-cpu', 'type': 'string'}, | |
260 'mem_limit': {'flag': '--limit-memory', 'type': 'string'}, | |
261 'cpu_reservation': {'flag': '--reserve-cpu', 'type': 'string'}, | |
262 'mem_reservation': {'flag': '--reserve-memory', 'type': 'string'}, | |
263 # `service update` options | |
264 'label_add': {'flag': '--label-add', 'type': 'list_of_kvpairs'}, | |
265 'label_rm': {'flag': '--label-rm', 'type': 'list_of_kvpairs'}, | |
266 'availability': {'flag': '--availability', 'type': 'string'}, | |
267 } | |
268 | |
269 # | |
270 # docker object generators | |
271 # | |
272 | |
273 def services(self, id=None, name=None): | |
274 for service_dict in self.service_ls(id=id, name=name): | |
275 service_id = service_dict['ID'] | |
276 service_name = service_dict['NAME'] | |
277 if not service_name.startswith(self._name_prefix): | |
278 continue | |
279 task_list = self.service_ps(service_id) | |
280 yield DockerService.from_cli(self, service_dict, task_list) | |
281 | |
282 def service_tasks(self, service): | |
283 for task_dict in self.service_ps(service.id): | |
284 if task_dict['NAME'].strip().startswith(r'\_'): | |
285 continue # historical task | |
286 yield DockerTask.from_cli(self, task_dict, service=service) | |
287 | |
288 def nodes(self, id=None, name=None): | |
289 for node_dict in self.node_ls(id=id, name=name): | |
290 node_id = node_dict['ID'].strip(' *') | |
291 node_name = node_dict['HOSTNAME'] | |
292 if self._node_prefix and not node_name.startswith(self._node_prefix): | |
293 continue | |
294 task_list = filter(lambda x: x['NAME'].startswith(self._name_prefix), self.node_ps(node_id)) | |
295 yield DockerNode.from_cli(self, node_dict, task_list) | |
296 | |
297 # | |
298 # docker subcommands | |
299 # | |
300 | |
301 def service_create(self, command, image=None, **kwopts): | |
302 if ('service_create_image_constraint' in self._conf or 'service_create_cpus_constraint' in self._conf) and 'constraint' not in kwopts: | |
303 kwopts['constraint'] = [] | |
304 image = self._get_image(image) | |
305 if self._conf.service_create_image_constraint: | |
306 kwopts['constraint'].append((IMAGE_CONSTRAINT, '==', image)) | |
307 if self._conf.service_create_cpus_constraint: | |
308 cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) | |
309 kwopts['constraint'].append((CPUS_CONSTRAINT, '==', cpus)) | |
310 if self._conf.cpus: | |
311 kwopts['cpu_limit'] = self._conf.cpus | |
312 kwopts['cpu_reservation'] = self._conf.cpus | |
313 if self._conf.memory: | |
314 kwopts['mem_limit'] = self._conf.memory | |
315 kwopts['mem_reservation'] = self._conf.memory | |
316 self.set_kwopts_name(kwopts) | |
317 args = '{kwopts} {image} {command}'.format( | |
318 kwopts=self._stringify_kwopts(kwopts), | |
319 image=image if image else '', | |
320 command=command if command else '', | |
321 ).strip() | |
322 service_id = self._run_docker(subcommand='service create', args=args, verbose=True) | |
323 return DockerService.from_id(self, service_id) | |
324 | |
325 @docker_json | |
326 def service_inspect(self, service_id): | |
327 return self._run_docker(subcommand='service inspect', args=service_id)[0] | |
328 | |
329 @docker_columns | |
330 def service_ls(self, id=None, name=None): | |
331 return self._run_docker(subcommand='service ls', args=self._filter_by_id_or_name(id, name)) | |
332 | |
333 @docker_columns | |
334 def service_ps(self, service_id): | |
335 return self._run_docker(subcommand='service ps', args='--no-trunc {}'.format(service_id)) | |
336 | |
337 def service_rm(self, service_ids): | |
338 service_ids = ' '.join(service_ids) | |
339 return self._run_docker(subcommand='service rm', args=service_ids).splitlines() | |
340 | |
341 @docker_json | |
342 def node_inspect(self, node_id): | |
343 return self._run_docker(subcommand='node inspect', args=node_id)[0] | |
344 | |
345 @docker_columns | |
346 def node_ls(self, id=None, name=None): | |
347 return self._run_docker(subcommand='node ls', args=self._filter_by_id_or_name(id, name)) | |
348 | |
349 @docker_columns | |
350 def node_ps(self, node_id): | |
351 return self._run_docker(subcommand='node ps', args='--no-trunc {}'.format(node_id)) | |
352 | |
353 def node_update(self, node_id, **kwopts): | |
354 return self._run_docker(subcommand='node update', args='{kwopts} {node_id}'.format( | |
355 kwopts=self._stringify_kwopts(kwopts), | |
356 node_id=node_id | |
357 )) | |
358 | |
359 @docker_json | |
360 def task_inspect(self, task_id): | |
361 return self._run_docker(subcommand="inspect", args=task_id) | |
362 | |
363 | |
364 class DockerSwarmAPIInterface(DockerSwarmInterface, DockerAPIInterface): | |
365 | |
366 container_type = 'docker_swarm' | |
367 placement_option_map = { | |
368 'constraint': {'param': 'constraints'}, | |
369 } | |
370 service_mode_option_map = { | |
371 'service_mode': {'param': 0, 'default': 'replicated'}, | |
372 'replicas': {'default': 1}, | |
373 } | |
374 endpoint_spec_option_map = { | |
375 'ports': {}, | |
376 } | |
377 resources_option_map = { | |
378 'cpus': {'params': ('cpu_limit', 'cpu_reservation'), 'map': lambda x: int(x * 1000000000)}, | |
379 'memory': {'params': ('mem_limit', 'mem_reservation')}, | |
380 } | |
381 container_spec_option_map = { | |
382 'image': {'param': 0}, | |
383 'command': {}, | |
384 'environment': {'param': 'env'}, | |
385 'labels': {}, | |
386 } | |
387 restart_policy_option_map = { | |
388 'restart_condition': {'param': 'condition', 'default': 'none'}, | |
389 'restart_delay': {'param': 'delay'}, | |
390 'restart_max_attempts': {'param': 'max_attemps'}, | |
391 } | |
392 task_template_option_map = { | |
393 '_container_spec': {'spec_class': docker.types.ContainerSpec, 'required': True}, | |
394 '_resources': {'spec_class': docker.types.Resources}, | |
395 '_restart_policy': {'spec_class': docker.types.RestartPolicy}, | |
396 '_placement': {'spec_class': docker.types.Placement}, | |
397 } | |
398 node_spec_option_map = { | |
399 'availability': {'param': 'Availability'}, | |
400 'name': {'param': 'Name'}, | |
401 'role': {'param': 'Role'}, | |
402 'labels': {'param': 'Labels'}, | |
403 } | |
404 | |
405 @staticmethod | |
406 def create_random_port_spec(port): | |
407 return { | |
408 'Protocol': 'tcp', | |
409 'PublishedPort': None, | |
410 'TargetPort': port, | |
411 } | |
412 | |
413 # | |
414 # docker subcommands | |
415 # | |
416 | |
417 def service_create(self, command, image=None, **kwopts): | |
418 # TODO: some of this should probably move to run_in_container when the CLI interface is removed | |
419 log.debug("Creating docker service with image '%s' for command: %s", image, command) | |
420 # insert run kwopts from config | |
421 for opt in self.conf_run_kwopts: | |
422 if self._conf[opt]: | |
423 kwopts[opt] = self._conf[opt] | |
424 # image is part of the container spec | |
425 kwopts['image'] = self._get_image(image) | |
426 # service constraints | |
427 kwopts['constraint'] = kwopts.get('constraint', []) | |
428 if self._conf.service_create_image_constraint: | |
429 kwopts['constraint'].append((IMAGE_CONSTRAINT + '==' + image)) | |
430 if self._conf.service_create_cpus_constraint: | |
431 cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) | |
432 kwopts['constraint'].append((CPUS_CONSTRAINT + '==' + cpus)) | |
433 # ports | |
434 if 'publish_port_random' in kwopts: | |
435 kwopts['ports'] = [DockerSwarmAPIInterface.create_random_port_spec(kwopts.pop('publish_port_random'))] | |
436 # create specs | |
437 service_mode = self._create_docker_api_spec('service_mode', docker.types.ServiceMode, kwopts) | |
438 endpoint_spec = self._create_docker_api_spec('endpoint_spec', docker.types.EndpointSpec, kwopts) | |
439 task_template = self._create_docker_api_spec('task_template', docker.types.TaskTemplate, kwopts) | |
440 self.set_kwopts_name(kwopts) | |
441 log.debug("Docker service task template:\n%s", safe_dumps_formatted(task_template)) | |
442 log.debug("Docker service endpoint specification:\n%s", safe_dumps_formatted(endpoint_spec)) | |
443 log.debug("Docker service mode:\n%s", safe_dumps_formatted(service_mode)) | |
444 log.debug("Docker service creation parameters:\n%s", safe_dumps_formatted(kwopts)) | |
445 success_test = partial(self._first, self.service_ls, name=kwopts['name']) | |
446 # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class | |
447 service = self._client.create_service( | |
448 task_template, | |
449 mode=service_mode, | |
450 endpoint_spec=endpoint_spec, | |
451 success_test=success_test, | |
452 max_tries=5, | |
453 **kwopts) | |
454 service_id = service.get('ID') | |
455 log.debug('Created service: %s (%s)', kwopts['name'], service_id) | |
456 return DockerService.from_id(self, service_id) | |
457 | |
458 def service_inspect(self, service_id): | |
459 return self._client.inspect_service(service_id) | |
460 | |
461 def service_ls(self, id=None, name=None): | |
462 return self._client.services(filters=self._filter_by_id_or_name(id, name)) | |
463 | |
464 # roughly `docker service ps` | |
465 def service_ps(self, service_id): | |
466 return self.task_ls(filters={'service': service_id}) | |
467 | |
468 def service_rm(self, service_ids): | |
469 r = [] | |
470 for service_id in service_ids: | |
471 self._client.remove_service(service_id) | |
472 r.append(service_id) | |
473 return r | |
474 | |
475 def node_inspect(self, node_id): | |
476 return self._client.inspect_node(node_id) | |
477 | |
478 def node_ls(self, id=None, name=None): | |
479 return self._client.nodes(filters=self._filter_by_id_or_name(id, name)) | |
480 | |
481 # roughly `docker node ps` | |
482 def node_ps(self, node_id): | |
483 return self.task_ls(filters={'node': node_id}) | |
484 | |
485 def node_update(self, node_id, **kwopts): | |
486 node = DockerNode.from_id(self, node_id) | |
487 spec = node.inspect['Spec'] | |
488 if 'label_add' in kwopts: | |
489 kwopts['labels'] = spec.get('Labels', {}) | |
490 kwopts['labels'].update(kwopts.pop('label_add')) | |
491 spec.update(self._create_docker_api_spec('node_spec', dict, kwopts)) | |
492 return self._client.update_node(node.id, node.version, node_spec=spec) | |
493 | |
494 def task_inspect(self, task_id): | |
495 return self._client.inspect_task(task_id) | |
496 | |
497 def task_ls(self, filters=None): | |
498 return self._client.tasks(filters=filters) |