comparison planemo/lib/python3.7/site-packages/galaxy/containers/docker.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d30785e31577
1 """
2 Interface to Docker
3 """
4 from __future__ import absolute_import
5
6 import logging
7 import os
8 from functools import partial
9 from itertools import cycle, repeat
10 from time import sleep
11
12 try:
13 import docker
14 except ImportError:
15 docker = None
16
17 try:
18 from requests.exceptions import ConnectionError, ReadTimeout
19 except ImportError:
20 ConnectionError = None
21 ReadTimeout = None
22 from six import string_types
23 from six.moves import shlex_quote
24
25 from galaxy.containers import ContainerInterface
26 from galaxy.containers.docker_decorators import (
27 docker_columns,
28 docker_json
29 )
30 from galaxy.containers.docker_model import (
31 DockerContainer,
32 DockerVolume
33 )
34 from galaxy.exceptions import (
35 ContainerCLIError,
36 ContainerImageNotFound,
37 ContainerNotFound
38 )
39 from galaxy.util.json import safe_dumps_formatted
40
41 log = logging.getLogger(__name__)
42
43
44 class DockerInterface(ContainerInterface):
45
46 container_class = DockerContainer
47 volume_class = DockerVolume
48 conf_defaults = {
49 'host': None,
50 'tls': False,
51 'force_tlsverify': False,
52 'auto_remove': True,
53 'image': None,
54 'cpus': None,
55 'memory': None,
56 }
57 # These values are inserted into kwopts for run commands
58 conf_run_kwopts = (
59 'cpus',
60 'memory',
61 )
62
63 def validate_config(self):
64 super(DockerInterface, self).validate_config()
65 self.__host_iter = None
66 if self._conf.host is None or isinstance(self._conf.host, string_types):
67 self.__host_iter = repeat(self._conf.host)
68 else:
69 self.__host_iter = cycle(self._conf.host)
70
71 @property
72 def _default_image(self):
73 assert self._conf.image is not None, "No default image for this docker interface"
74 return self._conf.image
75
76 def run_in_container(self, command, image=None, **kwopts):
77 for opt in self.conf_run_kwopts:
78 if self._conf[opt]:
79 kwopts[opt] = self._conf[opt]
80 self.set_kwopts_name(kwopts)
81 return self.run(command, image=image, **kwopts)
82
83 def image_repodigest(self, image):
84 """Get the digest image string for an image.
85
86 :type image: str
87 :param image: image id or image name and optionally, tag
88
89 :returns: digest string, having the format `<name>@<hash_alg>:<digest>`, e.g.:
90 `'bgruening/docker-jupyter-notebook@sha256:3ec0bc9abc9d511aa602ee4fff2534d80dd9b1564482de52cb5de36cce6debae'`
91 or, the original image name if the digest cannot be
92 determined (the image has not been pulled)
93 """
94 try:
95 inspect = self.image_inspect(image)
96 return inspect['RepoDigests'][0]
97 except ContainerImageNotFound:
98 return image
99
100 @property
101 def host(self):
102 return next(self.__host_iter)
103
104 @property
105 def host_iter(self):
106 return self.__host_iter
107
108
109 class DockerCLIInterface(DockerInterface):
110
111 container_type = 'docker_cli'
112 conf_defaults = {
113 'command_template': '{executable} {global_kwopts} {subcommand} {args}',
114 'executable': 'docker',
115 }
116 option_map = {
117 # `run` options
118 'environment': {'flag': '--env', 'type': 'list_of_kvpairs'},
119 'volumes': {'flag': '--volume', 'type': 'docker_volumes'},
120 'name': {'flag': '--name', 'type': 'string'},
121 'detach': {'flag': '--detach', 'type': 'boolean'},
122 'publish_all_ports': {'flag': '--publish-all', 'type': 'boolean'},
123 'publish_port_random': {'flag': '--publish', 'type': 'string'},
124 'auto_remove': {'flag': '--rm', 'type': 'boolean'},
125 'cpus': {'flag': '--cpus', 'type': 'string'},
126 'memory': {'flag': '--memory', 'type': 'string'},
127 }
128
129 def validate_config(self):
130 log.warning('The `docker_cli` interface is deprecated and will be removed in Galaxy 18.09, please use `docker`')
131 super(DockerCLIInterface, self).validate_config()
132 global_kwopts = []
133 if self._conf.host:
134 global_kwopts.append('--host')
135 global_kwopts.append(shlex_quote(self._conf.host))
136 if self._conf.force_tlsverify:
137 global_kwopts.append('--tlsverify')
138 self._docker_command = self._conf['command_template'].format(
139 executable=self._conf['executable'],
140 global_kwopts=' '.join(global_kwopts),
141 subcommand='{subcommand}',
142 args='{args}'
143 )
144
145 def _filter_by_id_or_name(self, id, name):
146 if id:
147 return '--filter id={}'.format(id)
148 elif name:
149 return '--filter name={}'.format(name)
150 return None
151
152 def _stringify_kwopt_docker_volumes(self, flag, val):
153 """The docker API will take a volumes argument in many formats, try to
154 deal with that for the command line
155 """
156 l = []
157 if isinstance(val, list):
158 # ['/host/vol']
159 l = val
160 else:
161 for hostvol, guestopts in val.items():
162 if isinstance(guestopts, string_types):
163 # {'/host/vol': '/container/vol'}
164 l.append('{}:{}'.format(hostvol, guestopts))
165 else:
166 # {'/host/vol': {'bind': '/container/vol'}}
167 # {'/host/vol': {'bind': '/container/vol', 'mode': 'rw'}}
168 mode = guestopts.get('mode', '')
169 l.append('{vol}:{bind}{mode}'.format(
170 vol=hostvol,
171 bind=guestopts['bind'],
172 mode=':' + mode if mode else ''
173 ))
174 return self._stringify_kwopt_list(flag, l)
175
176 def _run_docker(self, subcommand, args=None, verbose=False):
177 command = self._docker_command.format(subcommand=subcommand, args=args or '')
178 return self._run_command(command, verbose=verbose)
179
180 #
181 # docker subcommands
182 #
183
184 @docker_columns
185 def ps(self, id=None, name=None):
186 return self._run_docker(subcommand='ps', args=self._filter_by_id_or_name(id, name))
187
188 def run(self, command, image=None, **kwopts):
189 args = '{kwopts} {image} {command}'.format(
190 kwopts=self._stringify_kwopts(kwopts),
191 image=image or self._default_image,
192 command=command if command else ''
193 ).strip()
194 container_id = self._run_docker(subcommand='run', args=args, verbose=True)
195 return DockerContainer.from_id(self, container_id)
196
197 @docker_json
198 def inspect(self, container_id):
199 try:
200 return self._run_docker(subcommand='inspect', args=container_id)[0]
201 except (IndexError, ContainerCLIError) as exc:
202 msg = "Invalid container id: %s" % container_id
203 if exc.stdout == '[]' and exc.stderr == 'Error: no such object: {container_id}'.format(container_id=container_id):
204 log.warning(msg)
205 return []
206 else:
207 raise ContainerNotFound(msg, container_id=container_id)
208
209 @docker_json
210 def image_inspect(self, image):
211 try:
212 return self._run_docker(subcommand='image inspect', args=image)[0]
213 except (IndexError, ContainerCLIError) as exc:
214 msg = "%s not pulled, cannot get digest" % image
215 if exc.stdout == '[]' and exc.stderr == 'Error: no such image: {image}'.format(image=image):
216 log.warning(msg, image)
217 return []
218 else:
219 raise ContainerImageNotFound(msg, image=image)
220
221
222 class DockerAPIClient(object):
223 """Wraps a ``docker.APIClient`` to catch exceptions.
224 """
225
226 _exception_retry_time = 5
227 _default_max_tries = 10
228 _host_iter = None
229 _client = None
230 _client_args = ()
231 _client_kwargs = {}
232
233 @staticmethod
234 def _qualname(f):
235 if isinstance(f, partial):
236 f = f.func
237 try:
238 return getattr(f, '__qualname__', f.im_class.__name__ + '.' + f.__name__)
239 except AttributeError:
240 return f.__name__
241
242 @staticmethod
243 def _should_retry_request(response_code):
244 return response_code >= 500 or response_code in (404, 408, 409, 429)
245
246 @staticmethod
247 def _nonfatal_error(response_code):
248 return response_code in (404,)
249
250 @staticmethod
251 def _unwrapped_attr(attr):
252 return getattr(DockerAPIClient._client, attr)
253
254 @staticmethod
255 def _init_client():
256 kwargs = DockerAPIClient._client_kwargs.copy()
257 if DockerAPIClient._host_iter is not None and 'base_url' not in kwargs:
258 kwargs['base_url'] = next(DockerAPIClient._host_iter)
259 DockerAPIClient._client = docker.APIClient(*DockerAPIClient._client_args, **kwargs)
260 log.info('Initialized Docker API client for server: %s', kwargs.get('base_url', 'localhost'))
261
262 @staticmethod
263 def _default_client_handler(fname, *args, **kwargs):
264 success_test = kwargs.pop('success_test', None)
265 max_tries = kwargs.pop('max_tries', DockerAPIClient._default_max_tries)
266 for tries in range(1, max_tries + 1):
267 retry_time = DockerAPIClient._exception_retry_time
268 reinit = False
269 exc = None
270 # re-get the APIClient method every time as a different caller (such as the success test function) may have
271 # already reinitialized the client, and we always want to use the current client
272 f = DockerAPIClient._unwrapped_attr(fname)
273 qualname = DockerAPIClient._qualname(f)
274 try:
275 r = f(*args, **kwargs)
276 if tries > 1:
277 log.info('%s() succeeded on attempt %s', qualname, tries)
278 return r
279 except ConnectionError:
280 reinit = True
281 except docker.errors.APIError as exc:
282 if not DockerAPIClient._should_retry_request(exc.response.status_code):
283 raise
284 except ReadTimeout:
285 reinit = True
286 retry_time = 0
287 finally:
288 # this is inside the finally context so we can do a bare raise when we give up (so the real stack for
289 # the exception is raised)
290 if exc is not None:
291 log.warning("Caught exception on %s(): %s: %s",
292 DockerAPIClient._qualname(f), exc.__class__.__name__, exc)
293 if reinit:
294 log.warning("Reinitializing Docker API client due to connection-oriented failure")
295 DockerAPIClient._init_client()
296 f = DockerAPIClient._unwrapped_attr(fname)
297 qualname = DockerAPIClient._qualname(f)
298 r = None
299 if success_test is not None:
300 log.info("Testing if %s() succeeded despite the exception", qualname)
301 r = success_test()
302 if r:
303 log.warning("The request appears to have succeeded, will not retry. Response is: %s", str(r))
304 return r
305 elif tries >= max_tries:
306 log.error("Maximum number of attempts (%s) exceeded", max_tries)
307 if 'response' in exc and DockerAPIClient._nonfatal_error(exc.response.status_code):
308 return None
309 else:
310 raise
311 else:
312 log.error("Retrying %s() in %s seconds (attempt: %s of %s)", qualname, retry_time, tries,
313 max_tries)
314 sleep(retry_time)
315
316 def __init__(self, *args, **kwargs):
317 # Only initialize the host iterator once
318 host_iter = kwargs.pop('host_iter', None)
319 DockerAPIClient._host_iter = DockerAPIClient._host_iter or host_iter
320 DockerAPIClient._client_args = args
321 DockerAPIClient._client_kwargs = kwargs
322 DockerAPIClient._init_client()
323
324 def __getattr__(self, attr):
325 """Allow the calling of methods on this class as if it were a docker.APIClient instance.
326 """
327 cattr = DockerAPIClient._unwrapped_attr(attr)
328 if callable(cattr):
329 return partial(DockerAPIClient._default_client_handler, attr)
330 else:
331 return cattr
332
333
334 class DockerAPIInterface(DockerInterface):
335
336 container_type = 'docker'
337
338 # 'publish_port_random' and 'volumes' are special cases handled in _create_host_config()
339 host_config_option_map = {
340 'auto_remove': {},
341 'publish_all_ports': {},
342 'cpus': {'param': 'nano_cpus', 'map': lambda x: int(x * 1000000000)},
343 'memory': {'param': 'mem_limit'},
344 'binds': {},
345 'port_bindings': {},
346 }
347
348 def validate_config(self):
349 assert docker is not None, "Docker module could not be imported, DockerAPIInterface unavailable"
350 super(DockerAPIInterface, self).validate_config()
351 self.__client = None
352
353 @property
354 def _client(self):
355 # TODO: add cert options to containers conf
356 cert_path = os.environ.get('DOCKER_CERT_PATH') or None
357 if not cert_path:
358 cert_path = os.path.join(os.path.expanduser('~'), '.docker')
359 if self._conf.force_tlsverify or self._conf.tls:
360 tls_config = docker.tls.TLSConfig(
361 client_cert=(os.path.join(cert_path, 'cert.pem'),
362 os.path.join(cert_path, 'key.pem')),
363 ca_cert=os.path.join(cert_path, 'ca.pem'),
364 verify=self._conf.force_tlsverify,
365 )
366 else:
367 tls_config = False
368 if not self.__client:
369 self.__client = DockerAPIClient(
370 host_iter=self.host_iter,
371 tls=tls_config,
372 )
373 return self.__client
374
375 @staticmethod
376 def _first(f, *args, **kwargs):
377 try:
378 return f(*args, **kwargs)[0]
379 except IndexError:
380 return None
381
382 @staticmethod
383 def _filter_by_id_or_name(id, name):
384 if id:
385 return {'id': id}
386 elif name:
387 return {'name': name}
388 return None
389
390 @staticmethod
391 def _kwopt_to_param_names(map_spec, key):
392 """For a given containers lib method parameter name, return the matching docker-py parameter name(s).
393
394 See :meth:`_create_docker_api_spec`.
395 """
396 params = []
397 if 'param' not in map_spec and 'params' not in map_spec:
398 params.append(key)
399 elif 'param' in map_spec:
400 params.append(map_spec['param'])
401 params.extend(map_spec.get('params', ()))
402 return params
403
404 @staticmethod
405 def _kwopt_to_params(map_spec, key, value):
406 """For a given containers lib method parameter name and value, return the matching docker-py parameters with
407 values set (including transformation with an optional map function).
408
409 See :meth:`_create_docker_api_spec`.
410 """
411 params = {}
412 if 'map' in map_spec:
413 value = map_spec['map'](value)
414 for param in DockerAPIInterface._kwopt_to_param_names(map_spec, key):
415 params[param] = value
416 return params
417
418 def _create_docker_api_spec(self, option_map_name, spec_class, kwopts):
419 """Create docker-py objects used as arguments to docker-py methods.
420
421 This method modifies ``kwopts`` by removing options that match the spec.
422
423 An option map is a class-level variable with name ``<map_name>_option_map`` and is a dict with format:
424
425 .. code-block:: python
426
427 sample_option_map = {
428 'containers_lib_option_name': {
429 'param': docker_lib_positional_argument_int or 'docker_lib_keyword_argument_name',
430 'params': like 'param' but an iterable containing multiple docker lib params to set,
431 'default': default value,
432 'map': function with with to transform the value,
433 'required': True if this param is required, else False (default),
434 },
435 '_spec_param': {
436 'spec_class': class of param value,
437 }
438 }
439
440 All members of the mapping value are optional.
441
442 For example, a spec map for (some of) the possible values of the :class:`docker.types.TaskTemplate`, which is
443 used as the ``task_template`` argument to :meth:`docker.APIClient.create_service`, and the possible values of
444 the :class`:docker.types.ContainerSpec`, which is used as the ``container_spec`` argument to the
445 ``TaskTemplate`` would be:
446
447 .. code-block:: python
448
449 task_template_option_map = {
450 # TaskTemplate's 'container_spec' param is a ContainerSpec
451 '_container_spec': {
452 'spec_class': docker.types.ContainerSpec,
453 'required': True
454 }
455 }
456 container_spec_option_map = {
457 'image': {'param': 0}, # positional argument 0 to ContainerSpec()
458 'command': {}, # 'command' keyword argument to ContainerSpec()
459 'environment': { # 'env' keyword argument to ContainerSpec(), 'environment' keyword argument
460 'param': 'env' # to ContainerInterface.run_in_container()
461 },
462 }
463
464 Thus, calling ``DockerInterface.run_in_contaner('true', image='busybox', environment={'FOO': 'foo'}`` will
465 essentially do this (for example, if using Docker Swarm mode):
466
467 .. code-block:: python
468
469 container_spec = docker.types.ContainerSpec('busybox', command='true', env={'FOO': 'foo'})
470 task_template = docker.types.TaskTemplate(container_spec=container_spec)
471 docker.APIClient().create_service(task_template)
472
473 :param option_map_name: Name of option map class variable (``_option_map`` is automatically appended)
474 :type option_map_name: str
475 :param spec_class: docker-py specification class or callable returning an instance
476 :type spec_class: :class:`docker.types.Resources`, :class:`docker.types.ContainerSpec`, etc. or
477 callable
478 :param kwopts: Keyword options passed to calling method (e.g.
479 :meth:`DockerInterface.run_in_container`)
480 :type kwopts: dict
481 :returns: Instantiated ``spec_class`` object
482 :rtype: ``type(spec_class)``
483 """
484 def _kwopt_to_arg(map_spec, key, value, param=None):
485 # determines whether the given param is a positional or keyword argument in docker-py and adds it to the
486 # list of arguments
487 if isinstance(map_spec.get('param'), int):
488 spec_opts.append((map_spec.get('param'), value))
489 elif param is not None:
490 spec_kwopts[param] = value
491 else:
492 spec_kwopts.update(DockerAPIInterface._kwopt_to_params(map_spec, key, value))
493 # positional arguments
494 spec_opts = []
495 # keyword arguments
496 spec_kwopts = {}
497 # retrieve the option map for the docker-py object we're creating
498 option_map = getattr(self, option_map_name + '_option_map')
499 # set defaults
500 for key in filter(lambda k: option_map[k].get('default'), option_map.keys()):
501 map_spec = option_map[key]
502 _kwopt_to_arg(map_spec, key, map_spec['default'])
503 # don't allow kwopts that start with _, those are reserved for "child" object params
504 for kwopt in filter(lambda k: not k.startswith('_') and k in option_map, list(kwopts.keys())):
505 map_spec = option_map[kwopt]
506 _v = kwopts.pop(kwopt)
507 _kwopt_to_arg(map_spec, kwopt, _v)
508 # find any child objects that need to be created and recurse to create them
509 for _sub_k in filter(lambda k: k.startswith('_') and 'spec_class' in option_map[k], option_map.keys()):
510 map_spec = option_map[_sub_k]
511 param = _sub_k.lstrip('_')
512 _sub_v = self._create_docker_api_spec(param, map_spec['spec_class'], kwopts)
513 if _sub_v is not None or map_spec.get('required') or isinstance(map_spec.get('param'), int):
514 _kwopt_to_arg(map_spec, None, _sub_v, param=param)
515 # sort positional args and make into a flat tuple
516 if spec_opts:
517 spec_opts = sorted(spec_opts, key=lambda x: x[0])
518 spec_opts = [i[1] for i in spec_opts]
519 # create spec object
520 if spec_opts or spec_kwopts:
521 return spec_class(*spec_opts, **spec_kwopts)
522 else:
523 return None
524
525 def _volumes_to_native(self, volumes):
526 """Convert a list of volume definitions to the docker-py container creation method parameters.
527
528 :param volumes: List of volumes to translate
529 :type volumes: list of :class:`galaxy.containers.docker_model.DockerVolume`s
530 """
531 paths = []
532 binds = {}
533 for v in volumes:
534 path, bind = v.to_native()
535 paths.append(path)
536 binds.update(bind)
537 return (paths, binds)
538
539 def _create_host_config(self, kwopts):
540 """Build the host configuration parameter for docker-py container creation.
541
542 This method modifies ``kwopts`` by removing host config options and potentially setting the ``ports`` and
543 ``volumes`` keys.
544
545 :param kwopts: Keyword options passed to calling method (e.g. :method:`DockerInterface.run()`)
546 :type kwopts: dict
547 :returns: The return value of `docker.APIClient.create_host_config()`
548 :rtype: dict
549 """
550 if 'publish_port_random' in kwopts:
551 port = int(kwopts.pop('publish_port_random'))
552 kwopts['port_bindings'] = {port: None}
553 kwopts['ports'] = [port]
554 if 'volumes' in kwopts:
555 paths, binds = self._volumes_to_native(kwopts.pop('volumes'))
556 kwopts['binds'] = binds
557 kwopts['volumes'] = paths
558 return self._create_docker_api_spec('host_config', self._client.create_host_config, kwopts)
559
560 #
561 # docker subcommands
562 #
563
564 def ps(self, id=None, name=None, running=True):
565 return self._client.containers(all=not running, filters=self._filter_by_id_or_name(id, name))
566
567 def run(self, command, image=None, **kwopts):
568 image = image or self._default_image
569 command = command or None
570 log.debug("Creating docker container with image '%s' for command: %s", image, command)
571 host_config = self._create_host_config(kwopts)
572 log.debug("Docker container host configuration:\n%s", safe_dumps_formatted(host_config))
573 log.debug("Docker container creation parameters:\n%s", safe_dumps_formatted(kwopts))
574 success_test = partial(self._first, self.ps, name=kwopts['name'], running=False)
575 # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class
576 container = self._client.create_container(
577 image,
578 command=command if command else None,
579 host_config=host_config,
580 success_test=success_test,
581 max_tries=5,
582 **kwopts
583 )
584 container_id = container.get('Id')
585 log.debug("Starting container: %s (%s)", kwopts['name'], str(container_id))
586 # start can safely be run more than once
587 self._client.start(container=container_id)
588 return DockerContainer.from_id(self, container_id)
589
590 def inspect(self, container_id):
591 try:
592 return self._client.inspect_container(container_id)
593 except docker.errors.NotFound:
594 raise ContainerNotFound("Invalid container id: %s" % container_id, container_id=container_id)
595
596 def image_inspect(self, image):
597 try:
598 return self._client.inspect_image(image)
599 except docker.errors.NotFound:
600 raise ContainerImageNotFound("%s not pulled, cannot get digest" % image, image=image)