comparison env/lib/python3.7/site-packages/galaxy/containers/docker_model.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """
2 Model objects for docker objects
3 """
4 from __future__ import absolute_import
5
6 import logging
7
8 try:
9 import docker
10 except ImportError:
11 from galaxy.util.bunch import Bunch
12 docker = Bunch(errors=Bunch(NotFound=None))
13 from six.moves import shlex_quote
14
15 from galaxy.containers import (
16 Container,
17 ContainerPort,
18 ContainerVolume
19 )
20 from galaxy.util import (
21 pretty_print_time_interval,
22 unicodify,
23 )
24
25
26 CPUS_LABEL = '_galaxy_cpus'
27 IMAGE_LABEL = '_galaxy_image'
28 CPUS_CONSTRAINT = 'node.labels.' + CPUS_LABEL
29 IMAGE_CONSTRAINT = 'node.labels.' + IMAGE_LABEL
30
31 log = logging.getLogger(__name__)
32
33
34 class DockerAttributeContainer(object):
35
36 def __init__(self, members=None):
37 if members is None:
38 members = set()
39 self._members = members
40
41 def __eq__(self, other):
42 return self.members == other.members
43
44 def __ne__(self, other):
45 return not self.__eq__(other)
46
47 def __hash__(self):
48 return hash(tuple(sorted([repr(x) for x in self._members])))
49
50 def __str__(self):
51 return ', '.join([str(x) for x in self._members]) or 'None'
52
53 def __iter__(self):
54 return iter(self._members)
55
56 def __getitem__(self, name):
57 for member in self._members:
58 if member.name == name:
59 return member
60 else:
61 raise KeyError(name)
62
63 def __contains__(self, item):
64 return item in self._members
65
66 @property
67 def members(self):
68 return frozenset(self._members)
69
70 def hash(self):
71 return hex(self.__hash__())[2:]
72
73 def get(self, name, default):
74 try:
75 return self[name]
76 except KeyError:
77 return default
78
79
80 class DockerVolume(ContainerVolume):
81 @classmethod
82 def from_str(cls, as_str):
83 """Construct an instance from a string as would be passed to `docker run --volume`.
84
85 A string in the format ``<host_path>:<mode>`` is supported for legacy purposes even though it is not valid
86 Docker volume syntax.
87 """
88 if not as_str:
89 raise ValueError("Failed to parse Docker volume from %s" % as_str)
90 parts = as_str.split(":", 2)
91 kwds = dict(host_path=parts[0])
92 if len(parts) == 1:
93 # auto-generated volume
94 kwds["path"] = kwds["host_path"]
95 elif len(parts) == 2:
96 # /host_path:mode is not (or is no longer?) valid Docker volume syntax
97 if parts[1] in DockerVolume.valid_modes:
98 kwds["mode"] = parts[1]
99 kwds["path"] = kwds["host_path"]
100 else:
101 kwds["path"] = parts[1]
102 elif len(parts) == 3:
103 kwds["path"] = parts[1]
104 kwds["mode"] = parts[2]
105 return cls(**kwds)
106
107 def __str__(self):
108 volume_str = ":".join(filter(lambda x: x is not None, (self.host_path, self.path, self.mode)))
109 if "$" not in volume_str:
110 volume_for_cmd_line = shlex_quote(volume_str)
111 else:
112 # e.g. $_GALAXY_JOB_TMP_DIR:$_GALAXY_JOB_TMP_DIR:rw so don't single quote.
113 volume_for_cmd_line = '"%s"' % volume_str
114 return volume_for_cmd_line
115
116 def to_native(self):
117 host_path = self.host_path or self.path
118 return (self.path, {host_path: {'bind': self.path, 'mode': self.mode}})
119
120
121 class DockerContainer(Container):
122
123 def __init__(self, interface, id, name=None, inspect=None):
124 super(DockerContainer, self).__init__(interface, id, name=name)
125 self._inspect = inspect
126
127 @classmethod
128 def from_id(cls, interface, id):
129 inspect = interface.inspect(id)
130 return cls(interface, id, name=inspect['Name'], inspect=inspect)
131
132 @property
133 def ports(self):
134 # {
135 # "NetworkSettings" : {
136 # "Ports" : {
137 # "3306/tcp" : [
138 # {
139 # "HostIp" : "127.0.0.1",
140 # "HostPort" : "3306"
141 # }
142 # ]
143 rval = []
144 try:
145 port_mappings = self.inspect['NetworkSettings']['Ports']
146 except KeyError:
147 log.warning("Failed to get ports for container %s from `docker inspect` output at "
148 "['NetworkSettings']['Ports']: %s: %s", self.id, exc_info=True)
149 return None
150 for port_name in port_mappings:
151 for binding in port_mappings[port_name]:
152 rval.append(ContainerPort(
153 int(port_name.split('/')[0]),
154 port_name.split('/')[1],
155 self.address,
156 int(binding['HostPort']),
157 ))
158 return rval
159
160 @property
161 def address(self):
162 if self._interface.host and self._interface.host.startswith('tcp://'):
163 return self._interface.host.replace('tcp://', '').split(':', 1)[0]
164 else:
165 return 'localhost'
166
167 def is_ready(self):
168 return self.inspect['State']['Running']
169
170 def __eq__(self, other):
171 return self._id == other.id
172
173 def __ne__(self, other):
174 return not self.__eq__(other)
175
176 def __hash__(self):
177 return hash(self._id)
178
179 @property
180 def inspect(self):
181 if not self._inspect:
182 self._inspect = self._interface.inspect(self._id)
183 return self._inspect
184
185
186 class DockerService(Container):
187
188 def __init__(self, interface, id, name=None, image=None, inspect=None):
189 super(DockerService, self).__init__(interface, id, name=name)
190 self._image = image
191 self._inspect = inspect
192 self._env = {}
193 self._tasks = []
194 if inspect:
195 self._name = name or inspect['Spec']['Name']
196 self._image = image or inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
197
198 @classmethod
199 def from_cli(cls, interface, s, task_list):
200 service = cls(interface, s['ID'], name=s['NAME'], image=s['IMAGE'])
201 for task_dict in task_list:
202 if task_dict['NAME'].strip().startswith(r'\_'):
203 continue # historical task
204 service.task_add(DockerTask.from_cli(interface, task_dict, service=service))
205 return service
206
207 @classmethod
208 def from_id(cls, interface, id):
209 inspect = interface.service_inspect(id)
210 service = cls(interface, id, inspect=inspect)
211 for task in interface.service_tasks(service):
212 service.task_add(task)
213 return service
214
215 @property
216 def ports(self):
217 # {
218 # "Endpoint": {
219 # "Ports": [
220 # {
221 # "Protocol": "tcp",
222 # "TargetPort": 8888,
223 # "PublishedPort": 30000,
224 # "PublishMode": "ingress"
225 # }
226 # ]
227 rval = []
228 try:
229 port_mappings = self.inspect['Endpoint']['Ports']
230 except (IndexError, KeyError):
231 log.warning("Failed to get ports for container %s from `docker service inspect` output at "
232 "['Endpoint']['Ports']: %s: %s", self.id, exc_info=True)
233 return None
234 for binding in port_mappings:
235 rval.append(ContainerPort(
236 binding['TargetPort'],
237 binding['Protocol'],
238 self.address, # use the routing mesh
239 binding['PublishedPort']
240 ))
241 return rval
242
243 @property
244 def address(self):
245 if self._interface.host and self._interface.host.startswith('tcp://'):
246 return self._interface.host.replace('tcp://', '').split(':', 1)[0]
247 else:
248 return 'localhost'
249
250 def is_ready(self):
251 return self.in_state('Running', 'Running')
252
253 def __eq__(self, other):
254 return self._id == other.id
255
256 def __ne__(self, other):
257 return not self.__eq__(other)
258
259 def __hash__(self):
260 return hash(self._id)
261
262 def task_add(self, task):
263 self._tasks.append(task)
264
265 @property
266 def inspect(self):
267 if not self._inspect:
268 self._inspect = self._interface.service_inspect(self._id)
269 return self._inspect
270
271 @property
272 def state(self):
273 """If one of this service's tasks desired state is running, return that task state, otherwise, return the state
274 of a non-running task.
275
276 This is imperfect because it doesn't attempt to provide useful information for replicas > 1 tasks, but it suits
277 our purposes for now.
278 """
279 state = None
280 for task in self.tasks:
281 state = task.state
282 if task.desired_state == 'running':
283 break
284 return state
285
286 @property
287 def env(self):
288 if not self._env:
289 try:
290 for env_str in self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Env']:
291 try:
292 self._env.update([env_str.split('=', 1)])
293 except ValueError:
294 self._env[env_str] = None
295 except KeyError as exc:
296 log.debug('Cannot retrieve container environment: KeyError: %s', unicodify(exc))
297 return self._env
298
299 @property
300 def terminal(self):
301 """Same caveats as :meth:`state`.
302 """
303 for task in self.tasks:
304 if task.desired_state == 'running':
305 return False
306 return True
307
308 @property
309 def node(self):
310 """Same caveats as :meth:`state`.
311 """
312 for task in self.tasks:
313 if task.node is not None:
314 return task.node
315 return None
316
317 @property
318 def image(self):
319 if self._image is None:
320 self._image = self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
321 return self._image
322
323 @property
324 def cpus(self):
325 try:
326 cpus = self.inspect['Spec']['TaskTemplate']['Resources']['Limits']['NanoCPUs'] / 1000000000.0
327 if cpus == int(cpus):
328 cpus = int(cpus)
329 return cpus
330 except KeyError:
331 return 0
332
333 @property
334 def constraints(self):
335 constraints = self.inspect['Spec']['TaskTemplate']['Placement'].get('Constraints', [])
336 return DockerServiceConstraints.from_constraint_string_list(constraints)
337
338 @property
339 def tasks(self):
340 """A list of *all* tasks, including terminal ones.
341 """
342 if not self._tasks:
343 self._tasks = []
344 for task in self._interface.service_tasks(self):
345 self.task_add(task)
346 return self._tasks
347
348 @property
349 def task_count(self):
350 """A count of *all* tasks, including terminal ones.
351 """
352 return len(self.tasks)
353
354 def in_state(self, desired, current, tasks='any'):
355 """Indicate if one of this service's tasks matches the desired state.
356 """
357 for task in self.tasks:
358 if task.in_state(desired, current):
359 if tasks == 'any':
360 # at least 1 task in desired state
361 return True
362 elif tasks == 'all':
363 # at least 1 task not in desired state
364 return False
365 else:
366 return False if tasks == 'any' else True
367
368 def constraint_add(self, name, op, value):
369 self._interface.service_constraint_add(self.id, name, op, value)
370
371 def set_cpus(self):
372 self.constraint_add(CPUS_LABEL, '==', self.cpus)
373
374 def set_image(self):
375 self.constraint_add(IMAGE_LABEL, '==', self.image)
376
377
378 class DockerServiceConstraint(object):
379
380 def __init__(self, name=None, op=None, value=None):
381 self._name = name
382 self._op = op
383 self._value = value
384
385 def __eq__(self, other):
386 return self._name == other._name and \
387 self._op == other._op and \
388 self._value == other._value
389
390 def __ne__(self, other):
391 return not self.__eq__(other)
392
393 def __hash__(self):
394 return hash((self._name, self._op, self._value))
395
396 def __repr__(self):
397 return '%s(%s%s%s)' % (self.__class__.__name__, self._name, self._op, self._value)
398
399 def __str__(self):
400 return '%s%s%s' % (self._name, self._op, self._value)
401
402 @staticmethod
403 def split_constraint_string(constraint_str):
404 constraint = (constraint_str, '', '')
405 for op in '==', '!=':
406 t = constraint_str.partition(op)
407 if len(t[0]) < len(constraint[0]):
408 constraint = t
409 if constraint[0] == constraint_str:
410 raise Exception('Unable to parse constraint string: %s' % constraint_str)
411 return [x.strip() for x in constraint]
412
413 @classmethod
414 def from_str(cls, constraint_str):
415 name, op, value = DockerServiceConstraint.split_constraint_string(constraint_str)
416 return cls(name=name, op=op, value=value)
417
418 @property
419 def name(self):
420 return self._name
421
422 @property
423 def op(self):
424 return self._op
425
426 @property
427 def value(self):
428 return self._value
429
430 @property
431 def label(self):
432 return DockerNodeLabel(
433 name=self.name.replace('node.labels.', ''),
434 value=self.value
435 )
436
437
438 class DockerServiceConstraints(DockerAttributeContainer):
439
440 member_class = DockerServiceConstraint
441
442 @classmethod
443 def from_constraint_string_list(cls, inspect):
444 members = []
445 for member_str in inspect:
446 members.append(cls.member_class.from_str(member_str))
447 return cls(members=members)
448
449 @property
450 def labels(self):
451 return DockerNodeLabels(members=[x.label for x in self.members])
452
453
454 class DockerNode(object):
455
456 def __init__(self, interface, id=None, name=None, status=None,
457 availability=None, manager=False, inspect=None):
458 self._interface = interface
459 self._id = id
460 self._name = name
461 self._status = status
462 self._availability = availability
463 self._manager = manager
464 self._inspect = inspect
465 if inspect:
466 self._name = name or inspect['Description']['Hostname']
467 self._status = status or inspect['Status']['State']
468 self._availability = inspect['Spec']['Availability']
469 self._manager = manager or inspect['Spec']['Role'] == 'manager'
470 self._tasks = []
471
472 @classmethod
473 def from_cli(cls, interface, n, task_list):
474 node = cls(interface, id=n['ID'], name=n['HOSTNAME'], status=n['STATUS'],
475 availability=n['AVAILABILITY'], manager=True if n['MANAGER STATUS'] else False)
476 for task_dict in task_list:
477 node.task_add(DockerTask.from_cli(interface, task_dict, node=node))
478 return node
479
480 @classmethod
481 def from_id(cls, interface, id):
482 inspect = interface.node_inspect(id)
483 node = cls(interface, id, inspect=inspect)
484 for task in interface.node_tasks(node):
485 node.task_add(task)
486 return node
487
488 def task_add(self, task):
489 self._tasks.append(task)
490
491 @property
492 def id(self):
493 return self._id
494
495 @property
496 def name(self):
497 return self._name
498
499 @property
500 def version(self):
501 # this changes on update so don't cache
502 return self._interface.node_inspect(self._id or self._name)['Version']['Index']
503
504 @property
505 def inspect(self):
506 if not self._inspect:
507 self._inspect = self._interface.node_inspect(self._id or self._name)
508 return self._inspect
509
510 @property
511 def state(self):
512 return ('%s-%s' % (self._status, self._availability)).lower()
513
514 @property
515 def cpus(self):
516 return self.inspect['Description']['Resources']['NanoCPUs'] / 1000000000
517
518 @property
519 def labels(self):
520 labels = self.inspect['Spec'].get('Labels', {}) or {}
521 return DockerNodeLabels.from_label_dictionary(labels)
522
523 def label_add(self, label, value):
524 self._interface.node_update(self.id, label_add={label: value})
525
526 @property
527 def labels_as_constraints(self):
528 constraints_strings = [x.constraint_string for x in self.labels]
529 return DockerServiceConstraints.from_constraint_string_list(constraints_strings)
530
531 def set_labels_for_constraints(self, constraints):
532 for label in self._constraints_to_label_args(constraints):
533 if label not in self.labels:
534 log.info("setting node '%s' label '%s' to '%s'", self.name, label.name, label.value)
535 self.label_add(label.name, label.value)
536
537 def _constraints_to_label_args(self, constraints):
538 constraints = filter(lambda x: x.name.startswith('node.labels.') and x.op == '==', constraints)
539 labels = map(lambda x: DockerNodeLabel(name=x.name.replace('node.labels.', '', 1), value=x.value), constraints)
540 return labels
541
542 @property
543 def tasks(self):
544 """A list of *all* tasks, including terminal ones.
545 """
546 if not self._tasks:
547 self._tasks = []
548 for task in self._interface.node_tasks(self):
549 self.task_add(task)
550 return self._tasks
551
552 @property
553 def non_terminal_tasks(self):
554 r = []
555 for task in self.tasks:
556 # ensure the task has a service - it is possible for "phantom" tasks to exist (service is removed, no
557 # container is running, but the task still shows up in the node's task list)
558 if not task.terminal and task.service is not None:
559 r.append(task)
560 return r
561
562 @property
563 def task_count(self):
564 """A count of *all* tasks, including terminal ones.
565 """
566 return len(self.tasks)
567
568 def in_state(self, status, availability):
569 return self._status.lower() == status.lower() and self._availability.lower() == availability.lower()
570
571 def is_ok(self):
572 return self.in_state('Ready', 'Active')
573
574 def is_managed(self):
575 return not self._manager
576
577 def destroyable(self):
578 return not self._manager and self.is_ok() and self.task_count == 0
579
580 def drain(self):
581 self._interface.node_update(self.id, availability='drain')
582
583
584 class DockerNodeLabel(object):
585
586 def __init__(self, name=None, value=None):
587 self._name = name
588 self._value = value
589
590 def __eq__(self, other):
591 return self._name == other._name and \
592 self._value == other._value
593
594 def __ne__(self, other):
595 return not self.__eq__(other)
596
597 def __hash__(self):
598 return hash((self._name, self._value))
599
600 def __repr__(self):
601 return '%s(%s: %s)' % (self.__class__.__name__, self._name, self._value)
602
603 def __str__(self):
604 return '%s: %s' % (self._name, self._value)
605
606 @property
607 def name(self):
608 return self._name
609
610 @property
611 def value(self):
612 return self._value
613
614 @property
615 def constraint_string(self):
616 return 'node.labels.{name}=={value}'.format(name=self.name, value=self.value)
617
618 @property
619 def constraint(self):
620 return DockerServiceConstraint(
621 name='node.labels.{name}'.format(name=self.name),
622 op='==',
623 value=self.value
624 )
625
626
627 class DockerNodeLabels(DockerAttributeContainer):
628
629 member_class = DockerNodeLabel
630
631 @classmethod
632 def from_label_dictionary(cls, inspect):
633 members = []
634 for k, v in inspect.items():
635 members.append(cls.member_class(name=k, value=v))
636 return cls(members=members)
637
638 @property
639 def constraints(self):
640 return DockerServiceConstraints(members=[x.constraint for x in self.members])
641
642
643 class DockerTask(object):
644
645 # these are the possible *current* state terminal states
646 terminal_states = (
647 'shutdown', # this is normally only a desired state but I've seen a task with it as current as well
648 'complete',
649 'failed',
650 'rejected',
651 'orphaned',
652 )
653
654 def __init__(self, interface, id=None, name=None, image=None, desired_state=None,
655 state=None, error=None, ports=None, service=None, node=None):
656 self._interface = interface
657 self._id = id
658 self._name = name
659 self._image = image
660 self._desired_state = desired_state
661 self._state = state
662 self._error = error
663 self._ports = ports
664 self._service = service
665 self._node = node
666 self._inspect = None
667
668 @classmethod
669 def from_cli(cls, interface, t, service=None, node=None):
670 state = t['CURRENT STATE'].split()[0]
671 return cls(interface, id=t['ID'], name=t['NAME'], image=t['IMAGE'],
672 desired_state=t['DESIRED STATE'], state=state, error=t['ERROR'],
673 ports=t['PORTS'], service=service, node=node)
674
675 @classmethod
676 def from_api(cls, interface, t, service=None, node=None):
677 service = service or interface.service(id=t.get('ServiceID'))
678 node = node or interface.node(id=t.get('NodeID'))
679 if service:
680 name = service.name + '.' + str(t['Slot'])
681 else:
682 name = t['ID']
683 image = t['Spec']['ContainerSpec']['Image'].split('@', 1)[0], # remove pin
684 return cls(interface, id=t['ID'], name=name, image=image, desired_state=t['DesiredState'],
685 state=t['Status']['State'], ports=t['Status']['PortStatus'], error=t['Status']['Message'],
686 service=service, node=node)
687
688 @property
689 def id(self):
690 return self._id
691
692 @property
693 def name(self):
694 return self._name
695
696 @property
697 def inspect(self):
698 if not self._inspect:
699 try:
700 self._inspect = self._interface.task_inspect(self._id)
701 except docker.errors.NotFound:
702 # This shouldn't be possible, appears to be some kind of Swarm bug (the node claims to have a task that
703 # does not actually exist anymore, nor does its service exist).
704 log.error('Task could not be inspected because Docker claims it does not exist: %s (%s)',
705 self.name, self.id)
706 return None
707 return self._inspect
708
709 @property
710 def slot(self):
711 try:
712 return self.inspect['Slot']
713 except TypeError:
714 return None
715
716 @property
717 def node(self):
718 if not self._node:
719 try:
720 self._node = self._interface.node(id=self.inspect['NodeID'])
721 except TypeError:
722 return None
723 return self._node
724
725 @property
726 def service(self):
727 if not self._service:
728 try:
729 self._service = self._interface.service(id=self.inspect['ServiceID'])
730 except TypeError:
731 return None
732 return self._service
733
734 @property
735 def cpus(self):
736 try:
737 cpus = self.inspect['Spec']['Resources']['Reservations']['NanoCPUs'] / 1000000000.0
738 if cpus == int(cpus):
739 cpus = int(cpus)
740 return cpus
741 except TypeError:
742 return None
743 except KeyError:
744 return 0
745
746 @property
747 def state(self):
748 return ('%s-%s' % (self._desired_state, self._state)).lower()
749
750 @property
751 def current_state(self):
752 try:
753 return self._state.lower()
754 except TypeError:
755 log.warning("Current state of %s (%s) is not a string: %s", self.name, self.id, str(self._state))
756 return None
757
758 @property
759 def current_state_time(self):
760 # Docker API returns a stamp w/ higher second precision than Python takes
761 try:
762 stamp = self.inspect['Status']['Timestamp']
763 except TypeError:
764 return None
765 return pretty_print_time_interval(time=stamp[:stamp.index('.') + 7], precise=True, utc=stamp[-1] == 'Z')
766
767 @property
768 def desired_state(self):
769 try:
770 return self._desired_state.lower()
771 except TypeError:
772 log.warning("Desired state of %s (%s) is not a string: %s", self.name, self.id, str(self._desired_state))
773 return None
774
775 @property
776 def terminal(self):
777 return self.desired_state == 'shutdown' and self.current_state in self.terminal_states
778
779 def in_state(self, desired, current):
780 return self.desired_state == desired.lower() and self.current_state == current.lower()