comparison env/lib/python3.7/site-packages/ruamel/yaml/comments.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 # coding: utf-8
2
3 from __future__ import absolute_import, print_function
4
5 """
6 stuff to deal with comments and formatting on dict/list/ordereddict/set
7 these are not really related, formatting could be factored out as
8 a separate base
9 """
10
11 import sys
12 import copy
13
14
15 from ruamel.yaml.compat import ordereddict, PY2, string_types, MutableSliceableSequence
16 from ruamel.yaml.scalarstring import ScalarString
17 from ruamel.yaml.anchor import Anchor
18
19 if PY2:
20 from collections import MutableSet, Sized, Set, Mapping
21 else:
22 from collections.abc import MutableSet, Sized, Set, Mapping
23
24 if False: # MYPY
25 from typing import Any, Dict, Optional, List, Union, Optional, Iterator # NOQA
26
27 # fmt: off
28 __all__ = ['CommentedSeq', 'CommentedKeySeq',
29 'CommentedMap', 'CommentedOrderedMap',
30 'CommentedSet', 'comment_attrib', 'merge_attrib']
31 # fmt: on
32
33 comment_attrib = '_yaml_comment'
34 format_attrib = '_yaml_format'
35 line_col_attrib = '_yaml_line_col'
36 merge_attrib = '_yaml_merge'
37 tag_attrib = '_yaml_tag'
38
39
40 class Comment(object):
41 # sys.getsize tested the Comment objects, __slots__ makes them bigger
42 # and adding self.end did not matter
43 __slots__ = 'comment', '_items', '_end', '_start'
44 attrib = comment_attrib
45
46 def __init__(self):
47 # type: () -> None
48 self.comment = None # [post, [pre]]
49 # map key (mapping/omap/dict) or index (sequence/list) to a list of
50 # dict: post_key, pre_key, post_value, pre_value
51 # list: pre item, post item
52 self._items = {} # type: Dict[Any, Any]
53 # self._start = [] # should not put these on first item
54 self._end = [] # type: List[Any] # end of document comments
55
56 def __str__(self):
57 # type: () -> str
58 if bool(self._end):
59 end = ',\n end=' + str(self._end)
60 else:
61 end = ""
62 return 'Comment(comment={0},\n items={1}{2})'.format(self.comment, self._items, end)
63
64 @property
65 def items(self):
66 # type: () -> Any
67 return self._items
68
69 @property
70 def end(self):
71 # type: () -> Any
72 return self._end
73
74 @end.setter
75 def end(self, value):
76 # type: (Any) -> None
77 self._end = value
78
79 @property
80 def start(self):
81 # type: () -> Any
82 return self._start
83
84 @start.setter
85 def start(self, value):
86 # type: (Any) -> None
87 self._start = value
88
89
90 # to distinguish key from None
91 def NoComment():
92 # type: () -> None
93 pass
94
95
96 class Format(object):
97 __slots__ = ('_flow_style',)
98 attrib = format_attrib
99
100 def __init__(self):
101 # type: () -> None
102 self._flow_style = None # type: Any
103
104 def set_flow_style(self):
105 # type: () -> None
106 self._flow_style = True
107
108 def set_block_style(self):
109 # type: () -> None
110 self._flow_style = False
111
112 def flow_style(self, default=None):
113 # type: (Optional[Any]) -> Any
114 """if default (the flow_style) is None, the flow style tacked on to
115 the object explicitly will be taken. If that is None as well the
116 default flow style rules the format down the line, or the type
117 of the constituent values (simple -> flow, map/list -> block)"""
118 if self._flow_style is None:
119 return default
120 return self._flow_style
121
122
123 class LineCol(object):
124 attrib = line_col_attrib
125
126 def __init__(self):
127 # type: () -> None
128 self.line = None
129 self.col = None
130 self.data = None # type: Optional[Dict[Any, Any]]
131
132 def add_kv_line_col(self, key, data):
133 # type: (Any, Any) -> None
134 if self.data is None:
135 self.data = {}
136 self.data[key] = data
137
138 def key(self, k):
139 # type: (Any) -> Any
140 return self._kv(k, 0, 1)
141
142 def value(self, k):
143 # type: (Any) -> Any
144 return self._kv(k, 2, 3)
145
146 def _kv(self, k, x0, x1):
147 # type: (Any, Any, Any) -> Any
148 if self.data is None:
149 return None
150 data = self.data[k]
151 return data[x0], data[x1]
152
153 def item(self, idx):
154 # type: (Any) -> Any
155 if self.data is None:
156 return None
157 return self.data[idx][0], self.data[idx][1]
158
159 def add_idx_line_col(self, key, data):
160 # type: (Any, Any) -> None
161 if self.data is None:
162 self.data = {} # type: Dict[Any, Any]
163 self.data[key] = data
164
165
166 class Tag(object):
167 """store tag information for roundtripping"""
168
169 __slots__ = ('value',)
170 attrib = tag_attrib
171
172 def __init__(self):
173 # type: () -> None
174 self.value = None
175
176 def __repr__(self):
177 # type: () -> Any
178 return '{0.__class__.__name__}({0.value!r})'.format(self)
179
180
181 class CommentedBase(object):
182 @property
183 def ca(self):
184 # type: () -> Any
185 if not hasattr(self, Comment.attrib):
186 setattr(self, Comment.attrib, Comment())
187 return getattr(self, Comment.attrib)
188
189 def yaml_end_comment_extend(self, comment, clear=False):
190 # type: (Any, bool) -> None
191 if comment is None:
192 return
193 if clear or self.ca.end is None:
194 self.ca.end = []
195 self.ca.end.extend(comment)
196
197 def yaml_key_comment_extend(self, key, comment, clear=False):
198 # type: (Any, Any, bool) -> None
199 r = self.ca._items.setdefault(key, [None, None, None, None])
200 if clear or r[1] is None:
201 if comment[1] is not None:
202 assert isinstance(comment[1], list)
203 r[1] = comment[1]
204 else:
205 r[1].extend(comment[0])
206 r[0] = comment[0]
207
208 def yaml_value_comment_extend(self, key, comment, clear=False):
209 # type: (Any, Any, bool) -> None
210 r = self.ca._items.setdefault(key, [None, None, None, None])
211 if clear or r[3] is None:
212 if comment[1] is not None:
213 assert isinstance(comment[1], list)
214 r[3] = comment[1]
215 else:
216 r[3].extend(comment[0])
217 r[2] = comment[0]
218
219 def yaml_set_start_comment(self, comment, indent=0):
220 # type: (Any, Any) -> None
221 """overwrites any preceding comment lines on an object
222 expects comment to be without `#` and possible have multiple lines
223 """
224 from .error import CommentMark
225 from .tokens import CommentToken
226
227 pre_comments = self._yaml_get_pre_comment()
228 if comment[-1] == '\n':
229 comment = comment[:-1] # strip final newline if there
230 start_mark = CommentMark(indent)
231 for com in comment.split('\n'):
232 pre_comments.append(CommentToken('# ' + com + '\n', start_mark, None))
233
234 def yaml_set_comment_before_after_key(
235 self, key, before=None, indent=0, after=None, after_indent=None
236 ):
237 # type: (Any, Any, Any, Any, Any) -> None
238 """
239 expects comment (before/after) to be without `#` and possible have multiple lines
240 """
241 from ruamel.yaml.error import CommentMark
242 from ruamel.yaml.tokens import CommentToken
243
244 def comment_token(s, mark):
245 # type: (Any, Any) -> Any
246 # handle empty lines as having no comment
247 return CommentToken(('# ' if s else "") + s + '\n', mark, None)
248
249 if after_indent is None:
250 after_indent = indent + 2
251 if before and (len(before) > 1) and before[-1] == '\n':
252 before = before[:-1] # strip final newline if there
253 if after and after[-1] == '\n':
254 after = after[:-1] # strip final newline if there
255 start_mark = CommentMark(indent)
256 c = self.ca.items.setdefault(key, [None, [], None, None])
257 if before == '\n':
258 c[1].append(comment_token("", start_mark))
259 elif before:
260 for com in before.split('\n'):
261 c[1].append(comment_token(com, start_mark))
262 if after:
263 start_mark = CommentMark(after_indent)
264 if c[3] is None:
265 c[3] = []
266 for com in after.split('\n'):
267 c[3].append(comment_token(com, start_mark)) # type: ignore
268
269 @property
270 def fa(self):
271 # type: () -> Any
272 """format attribute
273
274 set_flow_style()/set_block_style()"""
275 if not hasattr(self, Format.attrib):
276 setattr(self, Format.attrib, Format())
277 return getattr(self, Format.attrib)
278
279 def yaml_add_eol_comment(self, comment, key=NoComment, column=None):
280 # type: (Any, Optional[Any], Optional[Any]) -> None
281 """
282 there is a problem as eol comments should start with ' #'
283 (but at the beginning of the line the space doesn't have to be before
284 the #. The column index is for the # mark
285 """
286 from .tokens import CommentToken
287 from .error import CommentMark
288
289 if column is None:
290 try:
291 column = self._yaml_get_column(key)
292 except AttributeError:
293 column = 0
294 if comment[0] != '#':
295 comment = '# ' + comment
296 if column is None:
297 if comment[0] == '#':
298 comment = ' ' + comment
299 column = 0
300 start_mark = CommentMark(column)
301 ct = [CommentToken(comment, start_mark, None), None]
302 self._yaml_add_eol_comment(ct, key=key)
303
304 @property
305 def lc(self):
306 # type: () -> Any
307 if not hasattr(self, LineCol.attrib):
308 setattr(self, LineCol.attrib, LineCol())
309 return getattr(self, LineCol.attrib)
310
311 def _yaml_set_line_col(self, line, col):
312 # type: (Any, Any) -> None
313 self.lc.line = line
314 self.lc.col = col
315
316 def _yaml_set_kv_line_col(self, key, data):
317 # type: (Any, Any) -> None
318 self.lc.add_kv_line_col(key, data)
319
320 def _yaml_set_idx_line_col(self, key, data):
321 # type: (Any, Any) -> None
322 self.lc.add_idx_line_col(key, data)
323
324 @property
325 def anchor(self):
326 # type: () -> Any
327 if not hasattr(self, Anchor.attrib):
328 setattr(self, Anchor.attrib, Anchor())
329 return getattr(self, Anchor.attrib)
330
331 def yaml_anchor(self):
332 # type: () -> Any
333 if not hasattr(self, Anchor.attrib):
334 return None
335 return self.anchor
336
337 def yaml_set_anchor(self, value, always_dump=False):
338 # type: (Any, bool) -> None
339 self.anchor.value = value
340 self.anchor.always_dump = always_dump
341
342 @property
343 def tag(self):
344 # type: () -> Any
345 if not hasattr(self, Tag.attrib):
346 setattr(self, Tag.attrib, Tag())
347 return getattr(self, Tag.attrib)
348
349 def yaml_set_tag(self, value):
350 # type: (Any) -> None
351 self.tag.value = value
352
353 def copy_attributes(self, t, memo=None):
354 # type: (Any, bool) -> None
355 # fmt: off
356 for a in [Comment.attrib, Format.attrib, LineCol.attrib, Anchor.attrib,
357 Tag.attrib, merge_attrib]:
358 if hasattr(self, a):
359 if memo is not None:
360 setattr(t, a, copy.deepcopy(getattr(self, a, memo)))
361 else:
362 setattr(t, a, getattr(self, a))
363 # fmt: on
364
365 def _yaml_add_eol_comment(self, comment, key):
366 # type: (Any, Any) -> None
367 raise NotImplementedError
368
369 def _yaml_get_pre_comment(self):
370 # type: () -> Any
371 raise NotImplementedError
372
373 def _yaml_get_column(self, key):
374 # type: (Any) -> Any
375 raise NotImplementedError
376
377
378 class CommentedSeq(MutableSliceableSequence, list, CommentedBase): # type: ignore
379 __slots__ = (Comment.attrib, '_lst')
380
381 def __init__(self, *args, **kw):
382 # type: (Any, Any) -> None
383 list.__init__(self, *args, **kw)
384
385 def __getsingleitem__(self, idx):
386 # type: (Any) -> Any
387 return list.__getitem__(self, idx)
388
389 def __setsingleitem__(self, idx, value):
390 # type: (Any, Any) -> None
391 # try to preserve the scalarstring type if setting an existing key to a new value
392 if idx < len(self):
393 if (
394 isinstance(value, string_types)
395 and not isinstance(value, ScalarString)
396 and isinstance(self[idx], ScalarString)
397 ):
398 value = type(self[idx])(value)
399 list.__setitem__(self, idx, value)
400
401 def __delsingleitem__(self, idx=None):
402 # type: (Any) -> Any
403 list.__delitem__(self, idx)
404 self.ca.items.pop(idx, None) # might not be there -> default value
405 for list_index in sorted(self.ca.items):
406 if list_index < idx:
407 continue
408 self.ca.items[list_index - 1] = self.ca.items.pop(list_index)
409
410 def __len__(self):
411 # type: () -> int
412 return list.__len__(self)
413
414 def insert(self, idx, val):
415 # type: (Any, Any) -> None
416 """the comments after the insertion have to move forward"""
417 list.insert(self, idx, val)
418 for list_index in sorted(self.ca.items, reverse=True):
419 if list_index < idx:
420 break
421 self.ca.items[list_index + 1] = self.ca.items.pop(list_index)
422
423 def extend(self, val):
424 # type: (Any) -> None
425 list.extend(self, val)
426
427 def __eq__(self, other):
428 # type: (Any) -> bool
429 return list.__eq__(self, other)
430
431 def _yaml_add_comment(self, comment, key=NoComment):
432 # type: (Any, Optional[Any]) -> None
433 if key is not NoComment:
434 self.yaml_key_comment_extend(key, comment)
435 else:
436 self.ca.comment = comment
437
438 def _yaml_add_eol_comment(self, comment, key):
439 # type: (Any, Any) -> None
440 self._yaml_add_comment(comment, key=key)
441
442 def _yaml_get_columnX(self, key):
443 # type: (Any) -> Any
444 return self.ca.items[key][0].start_mark.column
445
446 def _yaml_get_column(self, key):
447 # type: (Any) -> Any
448 column = None
449 sel_idx = None
450 pre, post = key - 1, key + 1
451 if pre in self.ca.items:
452 sel_idx = pre
453 elif post in self.ca.items:
454 sel_idx = post
455 else:
456 # self.ca.items is not ordered
457 for row_idx, _k1 in enumerate(self):
458 if row_idx >= key:
459 break
460 if row_idx not in self.ca.items:
461 continue
462 sel_idx = row_idx
463 if sel_idx is not None:
464 column = self._yaml_get_columnX(sel_idx)
465 return column
466
467 def _yaml_get_pre_comment(self):
468 # type: () -> Any
469 pre_comments = [] # type: List[Any]
470 if self.ca.comment is None:
471 self.ca.comment = [None, pre_comments]
472 else:
473 self.ca.comment[1] = pre_comments
474 return pre_comments
475
476 def __deepcopy__(self, memo):
477 # type: (Any) -> Any
478 res = self.__class__()
479 memo[id(self)] = res
480 for k in self:
481 res.append(copy.deepcopy(k, memo))
482 self.copy_attributes(res, memo=memo)
483 return res
484
485 def __add__(self, other):
486 # type: (Any) -> Any
487 return list.__add__(self, other)
488
489 def sort(self, key=None, reverse=False): # type: ignore
490 # type: (Any, bool) -> None
491 if key is None:
492 tmp_lst = sorted(zip(self, range(len(self))), reverse=reverse)
493 list.__init__(self, [x[0] for x in tmp_lst])
494 else:
495 tmp_lst = sorted(
496 zip(map(key, list.__iter__(self)), range(len(self))), reverse=reverse
497 )
498 list.__init__(self, [list.__getitem__(self, x[1]) for x in tmp_lst])
499 itm = self.ca.items
500 self.ca._items = {}
501 for idx, x in enumerate(tmp_lst):
502 old_index = x[1]
503 if old_index in itm:
504 self.ca.items[idx] = itm[old_index]
505
506 def __repr__(self):
507 # type: () -> Any
508 return list.__repr__(self)
509
510
511 class CommentedKeySeq(tuple, CommentedBase): # type: ignore
512 """This primarily exists to be able to roundtrip keys that are sequences"""
513
514 def _yaml_add_comment(self, comment, key=NoComment):
515 # type: (Any, Optional[Any]) -> None
516 if key is not NoComment:
517 self.yaml_key_comment_extend(key, comment)
518 else:
519 self.ca.comment = comment
520
521 def _yaml_add_eol_comment(self, comment, key):
522 # type: (Any, Any) -> None
523 self._yaml_add_comment(comment, key=key)
524
525 def _yaml_get_columnX(self, key):
526 # type: (Any) -> Any
527 return self.ca.items[key][0].start_mark.column
528
529 def _yaml_get_column(self, key):
530 # type: (Any) -> Any
531 column = None
532 sel_idx = None
533 pre, post = key - 1, key + 1
534 if pre in self.ca.items:
535 sel_idx = pre
536 elif post in self.ca.items:
537 sel_idx = post
538 else:
539 # self.ca.items is not ordered
540 for row_idx, _k1 in enumerate(self):
541 if row_idx >= key:
542 break
543 if row_idx not in self.ca.items:
544 continue
545 sel_idx = row_idx
546 if sel_idx is not None:
547 column = self._yaml_get_columnX(sel_idx)
548 return column
549
550 def _yaml_get_pre_comment(self):
551 # type: () -> Any
552 pre_comments = [] # type: List[Any]
553 if self.ca.comment is None:
554 self.ca.comment = [None, pre_comments]
555 else:
556 self.ca.comment[1] = pre_comments
557 return pre_comments
558
559
560 class CommentedMapView(Sized):
561 __slots__ = ('_mapping',)
562
563 def __init__(self, mapping):
564 # type: (Any) -> None
565 self._mapping = mapping
566
567 def __len__(self):
568 # type: () -> int
569 count = len(self._mapping)
570 return count
571
572
573 class CommentedMapKeysView(CommentedMapView, Set): # type: ignore
574 __slots__ = ()
575
576 @classmethod
577 def _from_iterable(self, it):
578 # type: (Any) -> Any
579 return set(it)
580
581 def __contains__(self, key):
582 # type: (Any) -> Any
583 return key in self._mapping
584
585 def __iter__(self):
586 # type: () -> Any # yield from self._mapping # not in py27, pypy
587 # for x in self._mapping._keys():
588 for x in self._mapping:
589 yield x
590
591
592 class CommentedMapItemsView(CommentedMapView, Set): # type: ignore
593 __slots__ = ()
594
595 @classmethod
596 def _from_iterable(self, it):
597 # type: (Any) -> Any
598 return set(it)
599
600 def __contains__(self, item):
601 # type: (Any) -> Any
602 key, value = item
603 try:
604 v = self._mapping[key]
605 except KeyError:
606 return False
607 else:
608 return v == value
609
610 def __iter__(self):
611 # type: () -> Any
612 for key in self._mapping._keys():
613 yield (key, self._mapping[key])
614
615
616 class CommentedMapValuesView(CommentedMapView):
617 __slots__ = ()
618
619 def __contains__(self, value):
620 # type: (Any) -> Any
621 for key in self._mapping:
622 if value == self._mapping[key]:
623 return True
624 return False
625
626 def __iter__(self):
627 # type: () -> Any
628 for key in self._mapping._keys():
629 yield self._mapping[key]
630
631
632 class CommentedMap(ordereddict, CommentedBase):
633 __slots__ = (Comment.attrib, '_ok', '_ref')
634
635 def __init__(self, *args, **kw):
636 # type: (Any, Any) -> None
637 self._ok = set() # type: MutableSet[Any] # own keys
638 self._ref = [] # type: List[CommentedMap]
639 ordereddict.__init__(self, *args, **kw)
640
641 def _yaml_add_comment(self, comment, key=NoComment, value=NoComment):
642 # type: (Any, Optional[Any], Optional[Any]) -> None
643 """values is set to key to indicate a value attachment of comment"""
644 if key is not NoComment:
645 self.yaml_key_comment_extend(key, comment)
646 return
647 if value is not NoComment:
648 self.yaml_value_comment_extend(value, comment)
649 else:
650 self.ca.comment = comment
651
652 def _yaml_add_eol_comment(self, comment, key):
653 # type: (Any, Any) -> None
654 """add on the value line, with value specified by the key"""
655 self._yaml_add_comment(comment, value=key)
656
657 def _yaml_get_columnX(self, key):
658 # type: (Any) -> Any
659 return self.ca.items[key][2].start_mark.column
660
661 def _yaml_get_column(self, key):
662 # type: (Any) -> Any
663 column = None
664 sel_idx = None
665 pre, post, last = None, None, None
666 for x in self:
667 if pre is not None and x != key:
668 post = x
669 break
670 if x == key:
671 pre = last
672 last = x
673 if pre in self.ca.items:
674 sel_idx = pre
675 elif post in self.ca.items:
676 sel_idx = post
677 else:
678 # self.ca.items is not ordered
679 for k1 in self:
680 if k1 >= key:
681 break
682 if k1 not in self.ca.items:
683 continue
684 sel_idx = k1
685 if sel_idx is not None:
686 column = self._yaml_get_columnX(sel_idx)
687 return column
688
689 def _yaml_get_pre_comment(self):
690 # type: () -> Any
691 pre_comments = [] # type: List[Any]
692 if self.ca.comment is None:
693 self.ca.comment = [None, pre_comments]
694 else:
695 self.ca.comment[1] = pre_comments
696 return pre_comments
697
698 def update(self, vals): # type: ignore
699 # type: (Any) -> None
700 try:
701 ordereddict.update(self, vals)
702 except TypeError:
703 # probably a dict that is used
704 for x in vals:
705 self[x] = vals[x]
706 try:
707 self._ok.update(vals.keys()) # type: ignore
708 except AttributeError:
709 # assume a list/tuple of two element lists/tuples
710 for x in vals:
711 self._ok.add(x[0])
712
713 def insert(self, pos, key, value, comment=None):
714 # type: (Any, Any, Any, Optional[Any]) -> None
715 """insert key value into given position
716 attach comment if provided
717 """
718 ordereddict.insert(self, pos, key, value)
719 self._ok.add(key)
720 if comment is not None:
721 self.yaml_add_eol_comment(comment, key=key)
722
723 def mlget(self, key, default=None, list_ok=False):
724 # type: (Any, Any, Any) -> Any
725 """multi-level get that expects dicts within dicts"""
726 if not isinstance(key, list):
727 return self.get(key, default)
728 # assume that the key is a list of recursively accessible dicts
729
730 def get_one_level(key_list, level, d):
731 # type: (Any, Any, Any) -> Any
732 if not list_ok:
733 assert isinstance(d, dict)
734 if level >= len(key_list):
735 if level > len(key_list):
736 raise IndexError
737 return d[key_list[level - 1]]
738 return get_one_level(key_list, level + 1, d[key_list[level - 1]])
739
740 try:
741 return get_one_level(key, 1, self)
742 except KeyError:
743 return default
744 except (TypeError, IndexError):
745 if not list_ok:
746 raise
747 return default
748
749 def __getitem__(self, key):
750 # type: (Any) -> Any
751 try:
752 return ordereddict.__getitem__(self, key)
753 except KeyError:
754 for merged in getattr(self, merge_attrib, []):
755 if key in merged[1]:
756 return merged[1][key]
757 raise
758
759 def __setitem__(self, key, value):
760 # type: (Any, Any) -> None
761 # try to preserve the scalarstring type if setting an existing key to a new value
762 if key in self:
763 if (
764 isinstance(value, string_types)
765 and not isinstance(value, ScalarString)
766 and isinstance(self[key], ScalarString)
767 ):
768 value = type(self[key])(value)
769 ordereddict.__setitem__(self, key, value)
770 self._ok.add(key)
771
772 def _unmerged_contains(self, key):
773 # type: (Any) -> Any
774 if key in self._ok:
775 return True
776 return None
777
778 def __contains__(self, key):
779 # type: (Any) -> bool
780 return bool(ordereddict.__contains__(self, key))
781
782 def get(self, key, default=None):
783 # type: (Any, Any) -> Any
784 try:
785 return self.__getitem__(key)
786 except: # NOQA
787 return default
788
789 def __repr__(self):
790 # type: () -> Any
791 return ordereddict.__repr__(self).replace('CommentedMap', 'ordereddict')
792
793 def non_merged_items(self):
794 # type: () -> Any
795 for x in ordereddict.__iter__(self):
796 if x in self._ok:
797 yield x, ordereddict.__getitem__(self, x)
798
799 def __delitem__(self, key):
800 # type: (Any) -> None
801 # for merged in getattr(self, merge_attrib, []):
802 # if key in merged[1]:
803 # value = merged[1][key]
804 # break
805 # else:
806 # # not found in merged in stuff
807 # ordereddict.__delitem__(self, key)
808 # for referer in self._ref:
809 # referer.update_key_value(key)
810 # return
811 #
812 # ordereddict.__setitem__(self, key, value) # merge might have different value
813 # self._ok.discard(key)
814 self._ok.discard(key)
815 ordereddict.__delitem__(self, key)
816 for referer in self._ref:
817 referer.update_key_value(key)
818
819 def __iter__(self):
820 # type: () -> Any
821 for x in ordereddict.__iter__(self):
822 yield x
823
824 def _keys(self):
825 # type: () -> Any
826 for x in ordereddict.__iter__(self):
827 yield x
828
829 def __len__(self):
830 # type: () -> int
831 return ordereddict.__len__(self)
832
833 def __eq__(self, other):
834 # type: (Any) -> bool
835 return bool(dict(self) == other)
836
837 if PY2:
838
839 def keys(self):
840 # type: () -> Any
841 return list(self._keys())
842
843 def iterkeys(self):
844 # type: () -> Any
845 return self._keys()
846
847 def viewkeys(self):
848 # type: () -> Any
849 return CommentedMapKeysView(self)
850
851 else:
852
853 def keys(self):
854 # type: () -> Any
855 return CommentedMapKeysView(self)
856
857 if PY2:
858
859 def _values(self):
860 # type: () -> Any
861 for x in ordereddict.__iter__(self):
862 yield ordereddict.__getitem__(self, x)
863
864 def values(self):
865 # type: () -> Any
866 return list(self._values())
867
868 def itervalues(self):
869 # type: () -> Any
870 return self._values()
871
872 def viewvalues(self):
873 # type: () -> Any
874 return CommentedMapValuesView(self)
875
876 else:
877
878 def values(self):
879 # type: () -> Any
880 return CommentedMapValuesView(self)
881
882 def _items(self):
883 # type: () -> Any
884 for x in ordereddict.__iter__(self):
885 yield x, ordereddict.__getitem__(self, x)
886
887 if PY2:
888
889 def items(self):
890 # type: () -> Any
891 return list(self._items())
892
893 def iteritems(self):
894 # type: () -> Any
895 return self._items()
896
897 def viewitems(self):
898 # type: () -> Any
899 return CommentedMapItemsView(self)
900
901 else:
902
903 def items(self):
904 # type: () -> Any
905 return CommentedMapItemsView(self)
906
907 @property
908 def merge(self):
909 # type: () -> Any
910 if not hasattr(self, merge_attrib):
911 setattr(self, merge_attrib, [])
912 return getattr(self, merge_attrib)
913
914 def copy(self):
915 # type: () -> Any
916 x = type(self)() # update doesn't work
917 for k, v in self._items():
918 x[k] = v
919 self.copy_attributes(x)
920 return x
921
922 def add_referent(self, cm):
923 # type: (Any) -> None
924 if cm not in self._ref:
925 self._ref.append(cm)
926
927 def add_yaml_merge(self, value):
928 # type: (Any) -> None
929 for v in value:
930 v[1].add_referent(self)
931 for k, v in v[1].items():
932 if ordereddict.__contains__(self, k):
933 continue
934 ordereddict.__setitem__(self, k, v)
935 self.merge.extend(value)
936
937 def update_key_value(self, key):
938 # type: (Any) -> None
939 if key in self._ok:
940 return
941 for v in self.merge:
942 if key in v[1]:
943 ordereddict.__setitem__(self, key, v[1][key])
944 return
945 ordereddict.__delitem__(self, key)
946
947 def __deepcopy__(self, memo):
948 # type: (Any) -> Any
949 res = self.__class__()
950 memo[id(self)] = res
951 for k in self:
952 res[k] = copy.deepcopy(self[k], memo)
953 self.copy_attributes(res, memo=memo)
954 return res
955
956
957 # based on brownie mappings
958 @classmethod # type: ignore
959 def raise_immutable(cls, *args, **kwargs):
960 # type: (Any, *Any, **Any) -> None
961 raise TypeError('{} objects are immutable'.format(cls.__name__))
962
963
964 class CommentedKeyMap(CommentedBase, Mapping): # type: ignore
965 __slots__ = Comment.attrib, '_od'
966 """This primarily exists to be able to roundtrip keys that are mappings"""
967
968 def __init__(self, *args, **kw):
969 # type: (Any, Any) -> None
970 if hasattr(self, '_od'):
971 raise_immutable(self)
972 try:
973 self._od = ordereddict(*args, **kw)
974 except TypeError:
975 if PY2:
976 self._od = ordereddict(args[0].items())
977 else:
978 raise
979
980 __delitem__ = __setitem__ = clear = pop = popitem = setdefault = update = raise_immutable
981
982 # need to implement __getitem__, __iter__ and __len__
983 def __getitem__(self, index):
984 # type: (Any) -> Any
985 return self._od[index]
986
987 def __iter__(self):
988 # type: () -> Iterator[Any]
989 for x in self._od.__iter__():
990 yield x
991
992 def __len__(self):
993 # type: () -> int
994 return len(self._od)
995
996 def __hash__(self):
997 # type: () -> Any
998 return hash(tuple(self.items()))
999
1000 def __repr__(self):
1001 # type: () -> Any
1002 if not hasattr(self, merge_attrib):
1003 return self._od.__repr__()
1004 return 'ordereddict(' + repr(list(self._od.items())) + ')'
1005
1006 @classmethod
1007 def fromkeys(keys, v=None):
1008 # type: (Any, Any) -> Any
1009 return CommentedKeyMap(dict.fromkeys(keys, v))
1010
1011 def _yaml_add_comment(self, comment, key=NoComment):
1012 # type: (Any, Optional[Any]) -> None
1013 if key is not NoComment:
1014 self.yaml_key_comment_extend(key, comment)
1015 else:
1016 self.ca.comment = comment
1017
1018 def _yaml_add_eol_comment(self, comment, key):
1019 # type: (Any, Any) -> None
1020 self._yaml_add_comment(comment, key=key)
1021
1022 def _yaml_get_columnX(self, key):
1023 # type: (Any) -> Any
1024 return self.ca.items[key][0].start_mark.column
1025
1026 def _yaml_get_column(self, key):
1027 # type: (Any) -> Any
1028 column = None
1029 sel_idx = None
1030 pre, post = key - 1, key + 1
1031 if pre in self.ca.items:
1032 sel_idx = pre
1033 elif post in self.ca.items:
1034 sel_idx = post
1035 else:
1036 # self.ca.items is not ordered
1037 for row_idx, _k1 in enumerate(self):
1038 if row_idx >= key:
1039 break
1040 if row_idx not in self.ca.items:
1041 continue
1042 sel_idx = row_idx
1043 if sel_idx is not None:
1044 column = self._yaml_get_columnX(sel_idx)
1045 return column
1046
1047 def _yaml_get_pre_comment(self):
1048 # type: () -> Any
1049 pre_comments = [] # type: List[Any]
1050 if self.ca.comment is None:
1051 self.ca.comment = [None, pre_comments]
1052 else:
1053 self.ca.comment[1] = pre_comments
1054 return pre_comments
1055
1056
1057 class CommentedOrderedMap(CommentedMap):
1058 __slots__ = (Comment.attrib,)
1059
1060
1061 class CommentedSet(MutableSet, CommentedBase): # type: ignore # NOQA
1062 __slots__ = Comment.attrib, 'odict'
1063
1064 def __init__(self, values=None):
1065 # type: (Any) -> None
1066 self.odict = ordereddict()
1067 MutableSet.__init__(self)
1068 if values is not None:
1069 self |= values # type: ignore
1070
1071 def _yaml_add_comment(self, comment, key=NoComment, value=NoComment):
1072 # type: (Any, Optional[Any], Optional[Any]) -> None
1073 """values is set to key to indicate a value attachment of comment"""
1074 if key is not NoComment:
1075 self.yaml_key_comment_extend(key, comment)
1076 return
1077 if value is not NoComment:
1078 self.yaml_value_comment_extend(value, comment)
1079 else:
1080 self.ca.comment = comment
1081
1082 def _yaml_add_eol_comment(self, comment, key):
1083 # type: (Any, Any) -> None
1084 """add on the value line, with value specified by the key"""
1085 self._yaml_add_comment(comment, value=key)
1086
1087 def add(self, value):
1088 # type: (Any) -> None
1089 """Add an element."""
1090 self.odict[value] = None
1091
1092 def discard(self, value):
1093 # type: (Any) -> None
1094 """Remove an element. Do not raise an exception if absent."""
1095 del self.odict[value]
1096
1097 def __contains__(self, x):
1098 # type: (Any) -> Any
1099 return x in self.odict
1100
1101 def __iter__(self):
1102 # type: () -> Any
1103 for x in self.odict:
1104 yield x
1105
1106 def __len__(self):
1107 # type: () -> int
1108 return len(self.odict)
1109
1110 def __repr__(self):
1111 # type: () -> str
1112 return 'set({0!r})'.format(self.odict.keys())
1113
1114
1115 class TaggedScalar(CommentedBase):
1116 # the value and style attributes are set during roundtrip construction
1117 def __init__(self):
1118 # type: () -> None
1119 self.value = None
1120 self.style = None
1121
1122 def __str__(self):
1123 # type: () -> Any
1124 return self.value
1125
1126
1127 def dump_comments(d, name="", sep='.', out=sys.stdout):
1128 # type: (Any, str, str, Any) -> None
1129 """
1130 recursively dump comments, all but the toplevel preceded by the path
1131 in dotted form x.0.a
1132 """
1133 if isinstance(d, dict) and hasattr(d, 'ca'):
1134 if name:
1135 sys.stdout.write('{}\n'.format(name))
1136 out.write('{}\n'.format(d.ca)) # type: ignore
1137 for k in d:
1138 dump_comments(d[k], name=(name + sep + k) if name else k, sep=sep, out=out)
1139 elif isinstance(d, list) and hasattr(d, 'ca'):
1140 if name:
1141 sys.stdout.write('{}\n'.format(name))
1142 out.write('{}\n'.format(d.ca)) # type: ignore
1143 for idx, k in enumerate(d):
1144 dump_comments(
1145 k, name=(name + sep + str(idx)) if name else str(idx), sep=sep, out=out
1146 )