comparison env/lib/python3.9/site-packages/ruamel/yaml/comments.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # coding: utf-8
2
3 from __future__ import absolute_import, print_function
4
5 """
6 stuff to deal with comments and formatting on dict/list/ordereddict/set
7 these are not really related, formatting could be factored out as
8 a separate base
9 """
10
11 import sys
12 import copy
13
14
15 from ruamel.yaml.compat import ordereddict # type: ignore
16 from ruamel.yaml.compat import PY2, string_types, MutableSliceableSequence
17 from ruamel.yaml.scalarstring import ScalarString
18 from ruamel.yaml.anchor import Anchor
19
20 if PY2:
21 from collections import MutableSet, Sized, Set, Mapping
22 else:
23 from collections.abc import MutableSet, Sized, Set, Mapping
24
25 if False: # MYPY
26 from typing import Any, Dict, Optional, List, Union, Optional, Iterator # NOQA
27
28 # fmt: off
29 __all__ = ['CommentedSeq', 'CommentedKeySeq',
30 'CommentedMap', 'CommentedOrderedMap',
31 'CommentedSet', 'comment_attrib', 'merge_attrib']
32 # fmt: on
33
34 comment_attrib = '_yaml_comment'
35 format_attrib = '_yaml_format'
36 line_col_attrib = '_yaml_line_col'
37 merge_attrib = '_yaml_merge'
38 tag_attrib = '_yaml_tag'
39
40
41 class Comment(object):
42 # sys.getsize tested the Comment objects, __slots__ makes them bigger
43 # and adding self.end did not matter
44 __slots__ = 'comment', '_items', '_end', '_start'
45 attrib = comment_attrib
46
47 def __init__(self):
48 # type: () -> None
49 self.comment = None # [post, [pre]]
50 # map key (mapping/omap/dict) or index (sequence/list) to a list of
51 # dict: post_key, pre_key, post_value, pre_value
52 # list: pre item, post item
53 self._items = {} # type: Dict[Any, Any]
54 # self._start = [] # should not put these on first item
55 self._end = [] # type: List[Any] # end of document comments
56
57 def __str__(self):
58 # type: () -> str
59 if bool(self._end):
60 end = ',\n end=' + str(self._end)
61 else:
62 end = ""
63 return 'Comment(comment={0},\n items={1}{2})'.format(self.comment, self._items, end)
64
65 @property
66 def items(self):
67 # type: () -> Any
68 return self._items
69
70 @property
71 def end(self):
72 # type: () -> Any
73 return self._end
74
75 @end.setter
76 def end(self, value):
77 # type: (Any) -> None
78 self._end = value
79
80 @property
81 def start(self):
82 # type: () -> Any
83 return self._start
84
85 @start.setter
86 def start(self, value):
87 # type: (Any) -> None
88 self._start = value
89
90
91 # to distinguish key from None
92 def NoComment():
93 # type: () -> None
94 pass
95
96
97 class Format(object):
98 __slots__ = ('_flow_style',)
99 attrib = format_attrib
100
101 def __init__(self):
102 # type: () -> None
103 self._flow_style = None # type: Any
104
105 def set_flow_style(self):
106 # type: () -> None
107 self._flow_style = True
108
109 def set_block_style(self):
110 # type: () -> None
111 self._flow_style = False
112
113 def flow_style(self, default=None):
114 # type: (Optional[Any]) -> Any
115 """if default (the flow_style) is None, the flow style tacked on to
116 the object explicitly will be taken. If that is None as well the
117 default flow style rules the format down the line, or the type
118 of the constituent values (simple -> flow, map/list -> block)"""
119 if self._flow_style is None:
120 return default
121 return self._flow_style
122
123
124 class LineCol(object):
125 attrib = line_col_attrib
126
127 def __init__(self):
128 # type: () -> None
129 self.line = None
130 self.col = None
131 self.data = None # type: Optional[Dict[Any, Any]]
132
133 def add_kv_line_col(self, key, data):
134 # type: (Any, Any) -> None
135 if self.data is None:
136 self.data = {}
137 self.data[key] = data
138
139 def key(self, k):
140 # type: (Any) -> Any
141 return self._kv(k, 0, 1)
142
143 def value(self, k):
144 # type: (Any) -> Any
145 return self._kv(k, 2, 3)
146
147 def _kv(self, k, x0, x1):
148 # type: (Any, Any, Any) -> Any
149 if self.data is None:
150 return None
151 data = self.data[k]
152 return data[x0], data[x1]
153
154 def item(self, idx):
155 # type: (Any) -> Any
156 if self.data is None:
157 return None
158 return self.data[idx][0], self.data[idx][1]
159
160 def add_idx_line_col(self, key, data):
161 # type: (Any, Any) -> None
162 if self.data is None:
163 self.data = {}
164 self.data[key] = data
165
166
167 class Tag(object):
168 """store tag information for roundtripping"""
169
170 __slots__ = ('value',)
171 attrib = tag_attrib
172
173 def __init__(self):
174 # type: () -> None
175 self.value = None
176
177 def __repr__(self):
178 # type: () -> Any
179 return '{0.__class__.__name__}({0.value!r})'.format(self)
180
181
182 class CommentedBase(object):
183 @property
184 def ca(self):
185 # type: () -> Any
186 if not hasattr(self, Comment.attrib):
187 setattr(self, Comment.attrib, Comment())
188 return getattr(self, Comment.attrib)
189
190 def yaml_end_comment_extend(self, comment, clear=False):
191 # type: (Any, bool) -> None
192 if comment is None:
193 return
194 if clear or self.ca.end is None:
195 self.ca.end = []
196 self.ca.end.extend(comment)
197
198 def yaml_key_comment_extend(self, key, comment, clear=False):
199 # type: (Any, Any, bool) -> None
200 r = self.ca._items.setdefault(key, [None, None, None, None])
201 if clear or r[1] is None:
202 if comment[1] is not None:
203 assert isinstance(comment[1], list)
204 r[1] = comment[1]
205 else:
206 r[1].extend(comment[0])
207 r[0] = comment[0]
208
209 def yaml_value_comment_extend(self, key, comment, clear=False):
210 # type: (Any, Any, bool) -> None
211 r = self.ca._items.setdefault(key, [None, None, None, None])
212 if clear or r[3] is None:
213 if comment[1] is not None:
214 assert isinstance(comment[1], list)
215 r[3] = comment[1]
216 else:
217 r[3].extend(comment[0])
218 r[2] = comment[0]
219
220 def yaml_set_start_comment(self, comment, indent=0):
221 # type: (Any, Any) -> None
222 """overwrites any preceding comment lines on an object
223 expects comment to be without `#` and possible have multiple lines
224 """
225 from .error import CommentMark
226 from .tokens import CommentToken
227
228 pre_comments = self._yaml_get_pre_comment()
229 if comment[-1] == '\n':
230 comment = comment[:-1] # strip final newline if there
231 start_mark = CommentMark(indent)
232 for com in comment.split('\n'):
233 pre_comments.append(CommentToken('# ' + com + '\n', start_mark, None))
234
235 def yaml_set_comment_before_after_key(
236 self, key, before=None, indent=0, after=None, after_indent=None
237 ):
238 # type: (Any, Any, Any, Any, Any) -> None
239 """
240 expects comment (before/after) to be without `#` and possible have multiple lines
241 """
242 from ruamel.yaml.error import CommentMark
243 from ruamel.yaml.tokens import CommentToken
244
245 def comment_token(s, mark):
246 # type: (Any, Any) -> Any
247 # handle empty lines as having no comment
248 return CommentToken(('# ' if s else "") + s + '\n', mark, None)
249
250 if after_indent is None:
251 after_indent = indent + 2
252 if before and (len(before) > 1) and before[-1] == '\n':
253 before = before[:-1] # strip final newline if there
254 if after and after[-1] == '\n':
255 after = after[:-1] # strip final newline if there
256 start_mark = CommentMark(indent)
257 c = self.ca.items.setdefault(key, [None, [], None, None])
258 if before == '\n':
259 c[1].append(comment_token("", start_mark))
260 elif before:
261 for com in before.split('\n'):
262 c[1].append(comment_token(com, start_mark))
263 if after:
264 start_mark = CommentMark(after_indent)
265 if c[3] is None:
266 c[3] = []
267 for com in after.split('\n'):
268 c[3].append(comment_token(com, start_mark)) # type: ignore
269
270 @property
271 def fa(self):
272 # type: () -> Any
273 """format attribute
274
275 set_flow_style()/set_block_style()"""
276 if not hasattr(self, Format.attrib):
277 setattr(self, Format.attrib, Format())
278 return getattr(self, Format.attrib)
279
280 def yaml_add_eol_comment(self, comment, key=NoComment, column=None):
281 # type: (Any, Optional[Any], Optional[Any]) -> None
282 """
283 there is a problem as eol comments should start with ' #'
284 (but at the beginning of the line the space doesn't have to be before
285 the #. The column index is for the # mark
286 """
287 from .tokens import CommentToken
288 from .error import CommentMark
289
290 if column is None:
291 try:
292 column = self._yaml_get_column(key)
293 except AttributeError:
294 column = 0
295 if comment[0] != '#':
296 comment = '# ' + comment
297 if column is None:
298 if comment[0] == '#':
299 comment = ' ' + comment
300 column = 0
301 start_mark = CommentMark(column)
302 ct = [CommentToken(comment, start_mark, None), None]
303 self._yaml_add_eol_comment(ct, key=key)
304
305 @property
306 def lc(self):
307 # type: () -> Any
308 if not hasattr(self, LineCol.attrib):
309 setattr(self, LineCol.attrib, LineCol())
310 return getattr(self, LineCol.attrib)
311
312 def _yaml_set_line_col(self, line, col):
313 # type: (Any, Any) -> None
314 self.lc.line = line
315 self.lc.col = col
316
317 def _yaml_set_kv_line_col(self, key, data):
318 # type: (Any, Any) -> None
319 self.lc.add_kv_line_col(key, data)
320
321 def _yaml_set_idx_line_col(self, key, data):
322 # type: (Any, Any) -> None
323 self.lc.add_idx_line_col(key, data)
324
325 @property
326 def anchor(self):
327 # type: () -> Any
328 if not hasattr(self, Anchor.attrib):
329 setattr(self, Anchor.attrib, Anchor())
330 return getattr(self, Anchor.attrib)
331
332 def yaml_anchor(self):
333 # type: () -> Any
334 if not hasattr(self, Anchor.attrib):
335 return None
336 return self.anchor
337
338 def yaml_set_anchor(self, value, always_dump=False):
339 # type: (Any, bool) -> None
340 self.anchor.value = value
341 self.anchor.always_dump = always_dump
342
343 @property
344 def tag(self):
345 # type: () -> Any
346 if not hasattr(self, Tag.attrib):
347 setattr(self, Tag.attrib, Tag())
348 return getattr(self, Tag.attrib)
349
350 def yaml_set_tag(self, value):
351 # type: (Any) -> None
352 self.tag.value = value
353
354 def copy_attributes(self, t, memo=None):
355 # type: (Any, Any) -> None
356 # fmt: off
357 for a in [Comment.attrib, Format.attrib, LineCol.attrib, Anchor.attrib,
358 Tag.attrib, merge_attrib]:
359 if hasattr(self, a):
360 if memo is not None:
361 setattr(t, a, copy.deepcopy(getattr(self, a, memo)))
362 else:
363 setattr(t, a, getattr(self, a))
364 # fmt: on
365
366 def _yaml_add_eol_comment(self, comment, key):
367 # type: (Any, Any) -> None
368 raise NotImplementedError
369
370 def _yaml_get_pre_comment(self):
371 # type: () -> Any
372 raise NotImplementedError
373
374 def _yaml_get_column(self, key):
375 # type: (Any) -> Any
376 raise NotImplementedError
377
378
379 class CommentedSeq(MutableSliceableSequence, list, CommentedBase): # type: ignore
380 __slots__ = (Comment.attrib, '_lst')
381
382 def __init__(self, *args, **kw):
383 # type: (Any, Any) -> None
384 list.__init__(self, *args, **kw)
385
386 def __getsingleitem__(self, idx):
387 # type: (Any) -> Any
388 return list.__getitem__(self, idx)
389
390 def __setsingleitem__(self, idx, value):
391 # type: (Any, Any) -> None
392 # try to preserve the scalarstring type if setting an existing key to a new value
393 if idx < len(self):
394 if (
395 isinstance(value, string_types)
396 and not isinstance(value, ScalarString)
397 and isinstance(self[idx], ScalarString)
398 ):
399 value = type(self[idx])(value)
400 list.__setitem__(self, idx, value)
401
402 def __delsingleitem__(self, idx=None):
403 # type: (Any) -> Any
404 list.__delitem__(self, idx)
405 self.ca.items.pop(idx, None) # might not be there -> default value
406 for list_index in sorted(self.ca.items):
407 if list_index < idx:
408 continue
409 self.ca.items[list_index - 1] = self.ca.items.pop(list_index)
410
411 def __len__(self):
412 # type: () -> int
413 return list.__len__(self)
414
415 def insert(self, idx, val):
416 # type: (Any, Any) -> None
417 """the comments after the insertion have to move forward"""
418 list.insert(self, idx, val)
419 for list_index in sorted(self.ca.items, reverse=True):
420 if list_index < idx:
421 break
422 self.ca.items[list_index + 1] = self.ca.items.pop(list_index)
423
424 def extend(self, val):
425 # type: (Any) -> None
426 list.extend(self, val)
427
428 def __eq__(self, other):
429 # type: (Any) -> bool
430 return list.__eq__(self, other)
431
432 def _yaml_add_comment(self, comment, key=NoComment):
433 # type: (Any, Optional[Any]) -> None
434 if key is not NoComment:
435 self.yaml_key_comment_extend(key, comment)
436 else:
437 self.ca.comment = comment
438
439 def _yaml_add_eol_comment(self, comment, key):
440 # type: (Any, Any) -> None
441 self._yaml_add_comment(comment, key=key)
442
443 def _yaml_get_columnX(self, key):
444 # type: (Any) -> Any
445 return self.ca.items[key][0].start_mark.column
446
447 def _yaml_get_column(self, key):
448 # type: (Any) -> Any
449 column = None
450 sel_idx = None
451 pre, post = key - 1, key + 1
452 if pre in self.ca.items:
453 sel_idx = pre
454 elif post in self.ca.items:
455 sel_idx = post
456 else:
457 # self.ca.items is not ordered
458 for row_idx, _k1 in enumerate(self):
459 if row_idx >= key:
460 break
461 if row_idx not in self.ca.items:
462 continue
463 sel_idx = row_idx
464 if sel_idx is not None:
465 column = self._yaml_get_columnX(sel_idx)
466 return column
467
468 def _yaml_get_pre_comment(self):
469 # type: () -> Any
470 pre_comments = [] # type: List[Any]
471 if self.ca.comment is None:
472 self.ca.comment = [None, pre_comments]
473 else:
474 self.ca.comment[1] = pre_comments
475 return pre_comments
476
477 def __deepcopy__(self, memo):
478 # type: (Any) -> Any
479 res = self.__class__()
480 memo[id(self)] = res
481 for k in self:
482 res.append(copy.deepcopy(k, memo))
483 self.copy_attributes(res, memo=memo)
484 return res
485
486 def __add__(self, other):
487 # type: (Any) -> Any
488 return list.__add__(self, other)
489
490 def sort(self, key=None, reverse=False): # type: ignore
491 # type: (Any, bool) -> None
492 if key is None:
493 tmp_lst = sorted(zip(self, range(len(self))), reverse=reverse)
494 list.__init__(self, [x[0] for x in tmp_lst])
495 else:
496 tmp_lst = sorted(
497 zip(map(key, list.__iter__(self)), range(len(self))), reverse=reverse
498 )
499 list.__init__(self, [list.__getitem__(self, x[1]) for x in tmp_lst])
500 itm = self.ca.items
501 self.ca._items = {}
502 for idx, x in enumerate(tmp_lst):
503 old_index = x[1]
504 if old_index in itm:
505 self.ca.items[idx] = itm[old_index]
506
507 def __repr__(self):
508 # type: () -> Any
509 return list.__repr__(self)
510
511
512 class CommentedKeySeq(tuple, CommentedBase): # type: ignore
513 """This primarily exists to be able to roundtrip keys that are sequences"""
514
515 def _yaml_add_comment(self, comment, key=NoComment):
516 # type: (Any, Optional[Any]) -> None
517 if key is not NoComment:
518 self.yaml_key_comment_extend(key, comment)
519 else:
520 self.ca.comment = comment
521
522 def _yaml_add_eol_comment(self, comment, key):
523 # type: (Any, Any) -> None
524 self._yaml_add_comment(comment, key=key)
525
526 def _yaml_get_columnX(self, key):
527 # type: (Any) -> Any
528 return self.ca.items[key][0].start_mark.column
529
530 def _yaml_get_column(self, key):
531 # type: (Any) -> Any
532 column = None
533 sel_idx = None
534 pre, post = key - 1, key + 1
535 if pre in self.ca.items:
536 sel_idx = pre
537 elif post in self.ca.items:
538 sel_idx = post
539 else:
540 # self.ca.items is not ordered
541 for row_idx, _k1 in enumerate(self):
542 if row_idx >= key:
543 break
544 if row_idx not in self.ca.items:
545 continue
546 sel_idx = row_idx
547 if sel_idx is not None:
548 column = self._yaml_get_columnX(sel_idx)
549 return column
550
551 def _yaml_get_pre_comment(self):
552 # type: () -> Any
553 pre_comments = [] # type: List[Any]
554 if self.ca.comment is None:
555 self.ca.comment = [None, pre_comments]
556 else:
557 self.ca.comment[1] = pre_comments
558 return pre_comments
559
560
561 class CommentedMapView(Sized):
562 __slots__ = ('_mapping',)
563
564 def __init__(self, mapping):
565 # type: (Any) -> None
566 self._mapping = mapping
567
568 def __len__(self):
569 # type: () -> int
570 count = len(self._mapping)
571 return count
572
573
574 class CommentedMapKeysView(CommentedMapView, Set): # type: ignore
575 __slots__ = ()
576
577 @classmethod
578 def _from_iterable(self, it):
579 # type: (Any) -> Any
580 return set(it)
581
582 def __contains__(self, key):
583 # type: (Any) -> Any
584 return key in self._mapping
585
586 def __iter__(self):
587 # type: () -> Any # yield from self._mapping # not in py27, pypy
588 # for x in self._mapping._keys():
589 for x in self._mapping:
590 yield x
591
592
593 class CommentedMapItemsView(CommentedMapView, Set): # type: ignore
594 __slots__ = ()
595
596 @classmethod
597 def _from_iterable(self, it):
598 # type: (Any) -> Any
599 return set(it)
600
601 def __contains__(self, item):
602 # type: (Any) -> Any
603 key, value = item
604 try:
605 v = self._mapping[key]
606 except KeyError:
607 return False
608 else:
609 return v == value
610
611 def __iter__(self):
612 # type: () -> Any
613 for key in self._mapping._keys():
614 yield (key, self._mapping[key])
615
616
617 class CommentedMapValuesView(CommentedMapView):
618 __slots__ = ()
619
620 def __contains__(self, value):
621 # type: (Any) -> Any
622 for key in self._mapping:
623 if value == self._mapping[key]:
624 return True
625 return False
626
627 def __iter__(self):
628 # type: () -> Any
629 for key in self._mapping._keys():
630 yield self._mapping[key]
631
632
633 class CommentedMap(ordereddict, CommentedBase): # type: ignore
634 __slots__ = (Comment.attrib, '_ok', '_ref')
635
636 def __init__(self, *args, **kw):
637 # type: (Any, Any) -> None
638 self._ok = set() # type: MutableSet[Any] # own keys
639 self._ref = [] # type: List[CommentedMap]
640 ordereddict.__init__(self, *args, **kw)
641
642 def _yaml_add_comment(self, comment, key=NoComment, value=NoComment):
643 # type: (Any, Optional[Any], Optional[Any]) -> None
644 """values is set to key to indicate a value attachment of comment"""
645 if key is not NoComment:
646 self.yaml_key_comment_extend(key, comment)
647 return
648 if value is not NoComment:
649 self.yaml_value_comment_extend(value, comment)
650 else:
651 self.ca.comment = comment
652
653 def _yaml_add_eol_comment(self, comment, key):
654 # type: (Any, Any) -> None
655 """add on the value line, with value specified by the key"""
656 self._yaml_add_comment(comment, value=key)
657
658 def _yaml_get_columnX(self, key):
659 # type: (Any) -> Any
660 return self.ca.items[key][2].start_mark.column
661
662 def _yaml_get_column(self, key):
663 # type: (Any) -> Any
664 column = None
665 sel_idx = None
666 pre, post, last = None, None, None
667 for x in self:
668 if pre is not None and x != key:
669 post = x
670 break
671 if x == key:
672 pre = last
673 last = x
674 if pre in self.ca.items:
675 sel_idx = pre
676 elif post in self.ca.items:
677 sel_idx = post
678 else:
679 # self.ca.items is not ordered
680 for k1 in self:
681 if k1 >= key:
682 break
683 if k1 not in self.ca.items:
684 continue
685 sel_idx = k1
686 if sel_idx is not None:
687 column = self._yaml_get_columnX(sel_idx)
688 return column
689
690 def _yaml_get_pre_comment(self):
691 # type: () -> Any
692 pre_comments = [] # type: List[Any]
693 if self.ca.comment is None:
694 self.ca.comment = [None, pre_comments]
695 else:
696 self.ca.comment[1] = pre_comments
697 return pre_comments
698
699 def update(self, vals):
700 # type: (Any) -> None
701 try:
702 ordereddict.update(self, vals)
703 except TypeError:
704 # probably a dict that is used
705 for x in vals:
706 self[x] = vals[x]
707 try:
708 self._ok.update(vals.keys()) # type: ignore
709 except AttributeError:
710 # assume a list/tuple of two element lists/tuples
711 for x in vals:
712 self._ok.add(x[0])
713
714 def insert(self, pos, key, value, comment=None):
715 # type: (Any, Any, Any, Optional[Any]) -> None
716 """insert key value into given position
717 attach comment if provided
718 """
719 ordereddict.insert(self, pos, key, value)
720 self._ok.add(key)
721 if comment is not None:
722 self.yaml_add_eol_comment(comment, key=key)
723
724 def mlget(self, key, default=None, list_ok=False):
725 # type: (Any, Any, Any) -> Any
726 """multi-level get that expects dicts within dicts"""
727 if not isinstance(key, list):
728 return self.get(key, default)
729 # assume that the key is a list of recursively accessible dicts
730
731 def get_one_level(key_list, level, d):
732 # type: (Any, Any, Any) -> Any
733 if not list_ok:
734 assert isinstance(d, dict)
735 if level >= len(key_list):
736 if level > len(key_list):
737 raise IndexError
738 return d[key_list[level - 1]]
739 return get_one_level(key_list, level + 1, d[key_list[level - 1]])
740
741 try:
742 return get_one_level(key, 1, self)
743 except KeyError:
744 return default
745 except (TypeError, IndexError):
746 if not list_ok:
747 raise
748 return default
749
750 def __getitem__(self, key):
751 # type: (Any) -> Any
752 try:
753 return ordereddict.__getitem__(self, key)
754 except KeyError:
755 for merged in getattr(self, merge_attrib, []):
756 if key in merged[1]:
757 return merged[1][key]
758 raise
759
760 def __setitem__(self, key, value):
761 # type: (Any, Any) -> None
762 # try to preserve the scalarstring type if setting an existing key to a new value
763 if key in self:
764 if (
765 isinstance(value, string_types)
766 and not isinstance(value, ScalarString)
767 and isinstance(self[key], ScalarString)
768 ):
769 value = type(self[key])(value)
770 ordereddict.__setitem__(self, key, value)
771 self._ok.add(key)
772
773 def _unmerged_contains(self, key):
774 # type: (Any) -> Any
775 if key in self._ok:
776 return True
777 return None
778
779 def __contains__(self, key):
780 # type: (Any) -> bool
781 return bool(ordereddict.__contains__(self, key))
782
783 def get(self, key, default=None):
784 # type: (Any, Any) -> Any
785 try:
786 return self.__getitem__(key)
787 except: # NOQA
788 return default
789
790 def __repr__(self):
791 # type: () -> Any
792 return ordereddict.__repr__(self).replace('CommentedMap', 'ordereddict')
793
794 def non_merged_items(self):
795 # type: () -> Any
796 for x in ordereddict.__iter__(self):
797 if x in self._ok:
798 yield x, ordereddict.__getitem__(self, x)
799
800 def __delitem__(self, key):
801 # type: (Any) -> None
802 # for merged in getattr(self, merge_attrib, []):
803 # if key in merged[1]:
804 # value = merged[1][key]
805 # break
806 # else:
807 # # not found in merged in stuff
808 # ordereddict.__delitem__(self, key)
809 # for referer in self._ref:
810 # referer.update_key_value(key)
811 # return
812 #
813 # ordereddict.__setitem__(self, key, value) # merge might have different value
814 # self._ok.discard(key)
815 self._ok.discard(key)
816 ordereddict.__delitem__(self, key)
817 for referer in self._ref:
818 referer.update_key_value(key)
819
820 def __iter__(self):
821 # type: () -> Any
822 for x in ordereddict.__iter__(self):
823 yield x
824
825 def _keys(self):
826 # type: () -> Any
827 for x in ordereddict.__iter__(self):
828 yield x
829
830 def __len__(self):
831 # type: () -> int
832 return int(ordereddict.__len__(self))
833
834 def __eq__(self, other):
835 # type: (Any) -> bool
836 return bool(dict(self) == other)
837
838 if PY2:
839
840 def keys(self):
841 # type: () -> Any
842 return list(self._keys())
843
844 def iterkeys(self):
845 # type: () -> Any
846 return self._keys()
847
848 def viewkeys(self):
849 # type: () -> Any
850 return CommentedMapKeysView(self)
851
852 else:
853
854 def keys(self):
855 # type: () -> Any
856 return CommentedMapKeysView(self)
857
858 if PY2:
859
860 def _values(self):
861 # type: () -> Any
862 for x in ordereddict.__iter__(self):
863 yield ordereddict.__getitem__(self, x)
864
865 def values(self):
866 # type: () -> Any
867 return list(self._values())
868
869 def itervalues(self):
870 # type: () -> Any
871 return self._values()
872
873 def viewvalues(self):
874 # type: () -> Any
875 return CommentedMapValuesView(self)
876
877 else:
878
879 def values(self):
880 # type: () -> Any
881 return CommentedMapValuesView(self)
882
883 def _items(self):
884 # type: () -> Any
885 for x in ordereddict.__iter__(self):
886 yield x, ordereddict.__getitem__(self, x)
887
888 if PY2:
889
890 def items(self):
891 # type: () -> Any
892 return list(self._items())
893
894 def iteritems(self):
895 # type: () -> Any
896 return self._items()
897
898 def viewitems(self):
899 # type: () -> Any
900 return CommentedMapItemsView(self)
901
902 else:
903
904 def items(self):
905 # type: () -> Any
906 return CommentedMapItemsView(self)
907
908 @property
909 def merge(self):
910 # type: () -> Any
911 if not hasattr(self, merge_attrib):
912 setattr(self, merge_attrib, [])
913 return getattr(self, merge_attrib)
914
915 def copy(self):
916 # type: () -> Any
917 x = type(self)() # update doesn't work
918 for k, v in self._items():
919 x[k] = v
920 self.copy_attributes(x)
921 return x
922
923 def add_referent(self, cm):
924 # type: (Any) -> None
925 if cm not in self._ref:
926 self._ref.append(cm)
927
928 def add_yaml_merge(self, value):
929 # type: (Any) -> None
930 for v in value:
931 v[1].add_referent(self)
932 for k, v in v[1].items():
933 if ordereddict.__contains__(self, k):
934 continue
935 ordereddict.__setitem__(self, k, v)
936 self.merge.extend(value)
937
938 def update_key_value(self, key):
939 # type: (Any) -> None
940 if key in self._ok:
941 return
942 for v in self.merge:
943 if key in v[1]:
944 ordereddict.__setitem__(self, key, v[1][key])
945 return
946 ordereddict.__delitem__(self, key)
947
948 def __deepcopy__(self, memo):
949 # type: (Any) -> Any
950 res = self.__class__()
951 memo[id(self)] = res
952 for k in self:
953 res[k] = copy.deepcopy(self[k], memo)
954 self.copy_attributes(res, memo=memo)
955 return res
956
957
958 # based on brownie mappings
959 @classmethod # type: ignore
960 def raise_immutable(cls, *args, **kwargs):
961 # type: (Any, *Any, **Any) -> None
962 raise TypeError('{} objects are immutable'.format(cls.__name__))
963
964
965 class CommentedKeyMap(CommentedBase, Mapping): # type: ignore
966 __slots__ = Comment.attrib, '_od'
967 """This primarily exists to be able to roundtrip keys that are mappings"""
968
969 def __init__(self, *args, **kw):
970 # type: (Any, Any) -> None
971 if hasattr(self, '_od'):
972 raise_immutable(self)
973 try:
974 self._od = ordereddict(*args, **kw)
975 except TypeError:
976 if PY2:
977 self._od = ordereddict(args[0].items())
978 else:
979 raise
980
981 __delitem__ = __setitem__ = clear = pop = popitem = setdefault = update = raise_immutable
982
983 # need to implement __getitem__, __iter__ and __len__
984 def __getitem__(self, index):
985 # type: (Any) -> Any
986 return self._od[index]
987
988 def __iter__(self):
989 # type: () -> Iterator[Any]
990 for x in self._od.__iter__():
991 yield x
992
993 def __len__(self):
994 # type: () -> int
995 return len(self._od)
996
997 def __hash__(self):
998 # type: () -> Any
999 return hash(tuple(self.items()))
1000
1001 def __repr__(self):
1002 # type: () -> Any
1003 if not hasattr(self, merge_attrib):
1004 return self._od.__repr__()
1005 return 'ordereddict(' + repr(list(self._od.items())) + ')'
1006
1007 @classmethod
1008 def fromkeys(keys, v=None):
1009 # type: (Any, Any) -> Any
1010 return CommentedKeyMap(dict.fromkeys(keys, v))
1011
1012 def _yaml_add_comment(self, comment, key=NoComment):
1013 # type: (Any, Optional[Any]) -> None
1014 if key is not NoComment:
1015 self.yaml_key_comment_extend(key, comment)
1016 else:
1017 self.ca.comment = comment
1018
1019 def _yaml_add_eol_comment(self, comment, key):
1020 # type: (Any, Any) -> None
1021 self._yaml_add_comment(comment, key=key)
1022
1023 def _yaml_get_columnX(self, key):
1024 # type: (Any) -> Any
1025 return self.ca.items[key][0].start_mark.column
1026
1027 def _yaml_get_column(self, key):
1028 # type: (Any) -> Any
1029 column = None
1030 sel_idx = None
1031 pre, post = key - 1, key + 1
1032 if pre in self.ca.items:
1033 sel_idx = pre
1034 elif post in self.ca.items:
1035 sel_idx = post
1036 else:
1037 # self.ca.items is not ordered
1038 for row_idx, _k1 in enumerate(self):
1039 if row_idx >= key:
1040 break
1041 if row_idx not in self.ca.items:
1042 continue
1043 sel_idx = row_idx
1044 if sel_idx is not None:
1045 column = self._yaml_get_columnX(sel_idx)
1046 return column
1047
1048 def _yaml_get_pre_comment(self):
1049 # type: () -> Any
1050 pre_comments = [] # type: List[Any]
1051 if self.ca.comment is None:
1052 self.ca.comment = [None, pre_comments]
1053 else:
1054 self.ca.comment[1] = pre_comments
1055 return pre_comments
1056
1057
1058 class CommentedOrderedMap(CommentedMap):
1059 __slots__ = (Comment.attrib,)
1060
1061
1062 class CommentedSet(MutableSet, CommentedBase): # type: ignore # NOQA
1063 __slots__ = Comment.attrib, 'odict'
1064
1065 def __init__(self, values=None):
1066 # type: (Any) -> None
1067 self.odict = ordereddict()
1068 MutableSet.__init__(self)
1069 if values is not None:
1070 self |= values # type: ignore
1071
1072 def _yaml_add_comment(self, comment, key=NoComment, value=NoComment):
1073 # type: (Any, Optional[Any], Optional[Any]) -> None
1074 """values is set to key to indicate a value attachment of comment"""
1075 if key is not NoComment:
1076 self.yaml_key_comment_extend(key, comment)
1077 return
1078 if value is not NoComment:
1079 self.yaml_value_comment_extend(value, comment)
1080 else:
1081 self.ca.comment = comment
1082
1083 def _yaml_add_eol_comment(self, comment, key):
1084 # type: (Any, Any) -> None
1085 """add on the value line, with value specified by the key"""
1086 self._yaml_add_comment(comment, value=key)
1087
1088 def add(self, value):
1089 # type: (Any) -> None
1090 """Add an element."""
1091 self.odict[value] = None
1092
1093 def discard(self, value):
1094 # type: (Any) -> None
1095 """Remove an element. Do not raise an exception if absent."""
1096 del self.odict[value]
1097
1098 def __contains__(self, x):
1099 # type: (Any) -> Any
1100 return x in self.odict
1101
1102 def __iter__(self):
1103 # type: () -> Any
1104 for x in self.odict:
1105 yield x
1106
1107 def __len__(self):
1108 # type: () -> int
1109 return len(self.odict)
1110
1111 def __repr__(self):
1112 # type: () -> str
1113 return 'set({0!r})'.format(self.odict.keys())
1114
1115
1116 class TaggedScalar(CommentedBase):
1117 # the value and style attributes are set during roundtrip construction
1118 def __init__(self):
1119 # type: () -> None
1120 self.value = None
1121 self.style = None
1122
1123 def __str__(self):
1124 # type: () -> Any
1125 return self.value
1126
1127
1128 def dump_comments(d, name="", sep='.', out=sys.stdout):
1129 # type: (Any, str, str, Any) -> None
1130 """
1131 recursively dump comments, all but the toplevel preceded by the path
1132 in dotted form x.0.a
1133 """
1134 if isinstance(d, dict) and hasattr(d, 'ca'):
1135 if name:
1136 sys.stdout.write('{}\n'.format(name))
1137 out.write('{}\n'.format(d.ca)) # type: ignore
1138 for k in d:
1139 dump_comments(d[k], name=(name + sep + k) if name else k, sep=sep, out=out)
1140 elif isinstance(d, list) and hasattr(d, 'ca'):
1141 if name:
1142 sys.stdout.write('{}\n'.format(name))
1143 out.write('{}\n'.format(d.ca)) # type: ignore
1144 for idx, k in enumerate(d):
1145 dump_comments(
1146 k, name=(name + sep + str(idx)) if name else str(idx), sep=sep, out=out
1147 )