Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/ruamel/yaml/comments.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 # coding: utf-8 | |
2 | |
3 from __future__ import absolute_import, print_function | |
4 | |
5 """ | |
6 stuff to deal with comments and formatting on dict/list/ordereddict/set | |
7 these are not really related, formatting could be factored out as | |
8 a separate base | |
9 """ | |
10 | |
11 import sys | |
12 import copy | |
13 | |
14 | |
15 from ruamel.yaml.compat import ordereddict # type: ignore | |
16 from ruamel.yaml.compat import PY2, string_types, MutableSliceableSequence | |
17 from ruamel.yaml.scalarstring import ScalarString | |
18 from ruamel.yaml.anchor import Anchor | |
19 | |
20 if PY2: | |
21 from collections import MutableSet, Sized, Set, Mapping | |
22 else: | |
23 from collections.abc import MutableSet, Sized, Set, Mapping | |
24 | |
25 if False: # MYPY | |
26 from typing import Any, Dict, Optional, List, Union, Optional, Iterator # NOQA | |
27 | |
28 # fmt: off | |
29 __all__ = ['CommentedSeq', 'CommentedKeySeq', | |
30 'CommentedMap', 'CommentedOrderedMap', | |
31 'CommentedSet', 'comment_attrib', 'merge_attrib'] | |
32 # fmt: on | |
33 | |
34 comment_attrib = '_yaml_comment' | |
35 format_attrib = '_yaml_format' | |
36 line_col_attrib = '_yaml_line_col' | |
37 merge_attrib = '_yaml_merge' | |
38 tag_attrib = '_yaml_tag' | |
39 | |
40 | |
41 class Comment(object): | |
42 # sys.getsize tested the Comment objects, __slots__ makes them bigger | |
43 # and adding self.end did not matter | |
44 __slots__ = 'comment', '_items', '_end', '_start' | |
45 attrib = comment_attrib | |
46 | |
47 def __init__(self): | |
48 # type: () -> None | |
49 self.comment = None # [post, [pre]] | |
50 # map key (mapping/omap/dict) or index (sequence/list) to a list of | |
51 # dict: post_key, pre_key, post_value, pre_value | |
52 # list: pre item, post item | |
53 self._items = {} # type: Dict[Any, Any] | |
54 # self._start = [] # should not put these on first item | |
55 self._end = [] # type: List[Any] # end of document comments | |
56 | |
57 def __str__(self): | |
58 # type: () -> str | |
59 if bool(self._end): | |
60 end = ',\n end=' + str(self._end) | |
61 else: | |
62 end = "" | |
63 return 'Comment(comment={0},\n items={1}{2})'.format(self.comment, self._items, end) | |
64 | |
65 @property | |
66 def items(self): | |
67 # type: () -> Any | |
68 return self._items | |
69 | |
70 @property | |
71 def end(self): | |
72 # type: () -> Any | |
73 return self._end | |
74 | |
75 @end.setter | |
76 def end(self, value): | |
77 # type: (Any) -> None | |
78 self._end = value | |
79 | |
80 @property | |
81 def start(self): | |
82 # type: () -> Any | |
83 return self._start | |
84 | |
85 @start.setter | |
86 def start(self, value): | |
87 # type: (Any) -> None | |
88 self._start = value | |
89 | |
90 | |
91 # to distinguish key from None | |
92 def NoComment(): | |
93 # type: () -> None | |
94 pass | |
95 | |
96 | |
97 class Format(object): | |
98 __slots__ = ('_flow_style',) | |
99 attrib = format_attrib | |
100 | |
101 def __init__(self): | |
102 # type: () -> None | |
103 self._flow_style = None # type: Any | |
104 | |
105 def set_flow_style(self): | |
106 # type: () -> None | |
107 self._flow_style = True | |
108 | |
109 def set_block_style(self): | |
110 # type: () -> None | |
111 self._flow_style = False | |
112 | |
113 def flow_style(self, default=None): | |
114 # type: (Optional[Any]) -> Any | |
115 """if default (the flow_style) is None, the flow style tacked on to | |
116 the object explicitly will be taken. If that is None as well the | |
117 default flow style rules the format down the line, or the type | |
118 of the constituent values (simple -> flow, map/list -> block)""" | |
119 if self._flow_style is None: | |
120 return default | |
121 return self._flow_style | |
122 | |
123 | |
124 class LineCol(object): | |
125 attrib = line_col_attrib | |
126 | |
127 def __init__(self): | |
128 # type: () -> None | |
129 self.line = None | |
130 self.col = None | |
131 self.data = None # type: Optional[Dict[Any, Any]] | |
132 | |
133 def add_kv_line_col(self, key, data): | |
134 # type: (Any, Any) -> None | |
135 if self.data is None: | |
136 self.data = {} | |
137 self.data[key] = data | |
138 | |
139 def key(self, k): | |
140 # type: (Any) -> Any | |
141 return self._kv(k, 0, 1) | |
142 | |
143 def value(self, k): | |
144 # type: (Any) -> Any | |
145 return self._kv(k, 2, 3) | |
146 | |
147 def _kv(self, k, x0, x1): | |
148 # type: (Any, Any, Any) -> Any | |
149 if self.data is None: | |
150 return None | |
151 data = self.data[k] | |
152 return data[x0], data[x1] | |
153 | |
154 def item(self, idx): | |
155 # type: (Any) -> Any | |
156 if self.data is None: | |
157 return None | |
158 return self.data[idx][0], self.data[idx][1] | |
159 | |
160 def add_idx_line_col(self, key, data): | |
161 # type: (Any, Any) -> None | |
162 if self.data is None: | |
163 self.data = {} | |
164 self.data[key] = data | |
165 | |
166 | |
167 class Tag(object): | |
168 """store tag information for roundtripping""" | |
169 | |
170 __slots__ = ('value',) | |
171 attrib = tag_attrib | |
172 | |
173 def __init__(self): | |
174 # type: () -> None | |
175 self.value = None | |
176 | |
177 def __repr__(self): | |
178 # type: () -> Any | |
179 return '{0.__class__.__name__}({0.value!r})'.format(self) | |
180 | |
181 | |
182 class CommentedBase(object): | |
183 @property | |
184 def ca(self): | |
185 # type: () -> Any | |
186 if not hasattr(self, Comment.attrib): | |
187 setattr(self, Comment.attrib, Comment()) | |
188 return getattr(self, Comment.attrib) | |
189 | |
190 def yaml_end_comment_extend(self, comment, clear=False): | |
191 # type: (Any, bool) -> None | |
192 if comment is None: | |
193 return | |
194 if clear or self.ca.end is None: | |
195 self.ca.end = [] | |
196 self.ca.end.extend(comment) | |
197 | |
198 def yaml_key_comment_extend(self, key, comment, clear=False): | |
199 # type: (Any, Any, bool) -> None | |
200 r = self.ca._items.setdefault(key, [None, None, None, None]) | |
201 if clear or r[1] is None: | |
202 if comment[1] is not None: | |
203 assert isinstance(comment[1], list) | |
204 r[1] = comment[1] | |
205 else: | |
206 r[1].extend(comment[0]) | |
207 r[0] = comment[0] | |
208 | |
209 def yaml_value_comment_extend(self, key, comment, clear=False): | |
210 # type: (Any, Any, bool) -> None | |
211 r = self.ca._items.setdefault(key, [None, None, None, None]) | |
212 if clear or r[3] is None: | |
213 if comment[1] is not None: | |
214 assert isinstance(comment[1], list) | |
215 r[3] = comment[1] | |
216 else: | |
217 r[3].extend(comment[0]) | |
218 r[2] = comment[0] | |
219 | |
220 def yaml_set_start_comment(self, comment, indent=0): | |
221 # type: (Any, Any) -> None | |
222 """overwrites any preceding comment lines on an object | |
223 expects comment to be without `#` and possible have multiple lines | |
224 """ | |
225 from .error import CommentMark | |
226 from .tokens import CommentToken | |
227 | |
228 pre_comments = self._yaml_get_pre_comment() | |
229 if comment[-1] == '\n': | |
230 comment = comment[:-1] # strip final newline if there | |
231 start_mark = CommentMark(indent) | |
232 for com in comment.split('\n'): | |
233 pre_comments.append(CommentToken('# ' + com + '\n', start_mark, None)) | |
234 | |
235 def yaml_set_comment_before_after_key( | |
236 self, key, before=None, indent=0, after=None, after_indent=None | |
237 ): | |
238 # type: (Any, Any, Any, Any, Any) -> None | |
239 """ | |
240 expects comment (before/after) to be without `#` and possible have multiple lines | |
241 """ | |
242 from ruamel.yaml.error import CommentMark | |
243 from ruamel.yaml.tokens import CommentToken | |
244 | |
245 def comment_token(s, mark): | |
246 # type: (Any, Any) -> Any | |
247 # handle empty lines as having no comment | |
248 return CommentToken(('# ' if s else "") + s + '\n', mark, None) | |
249 | |
250 if after_indent is None: | |
251 after_indent = indent + 2 | |
252 if before and (len(before) > 1) and before[-1] == '\n': | |
253 before = before[:-1] # strip final newline if there | |
254 if after and after[-1] == '\n': | |
255 after = after[:-1] # strip final newline if there | |
256 start_mark = CommentMark(indent) | |
257 c = self.ca.items.setdefault(key, [None, [], None, None]) | |
258 if before == '\n': | |
259 c[1].append(comment_token("", start_mark)) | |
260 elif before: | |
261 for com in before.split('\n'): | |
262 c[1].append(comment_token(com, start_mark)) | |
263 if after: | |
264 start_mark = CommentMark(after_indent) | |
265 if c[3] is None: | |
266 c[3] = [] | |
267 for com in after.split('\n'): | |
268 c[3].append(comment_token(com, start_mark)) # type: ignore | |
269 | |
270 @property | |
271 def fa(self): | |
272 # type: () -> Any | |
273 """format attribute | |
274 | |
275 set_flow_style()/set_block_style()""" | |
276 if not hasattr(self, Format.attrib): | |
277 setattr(self, Format.attrib, Format()) | |
278 return getattr(self, Format.attrib) | |
279 | |
280 def yaml_add_eol_comment(self, comment, key=NoComment, column=None): | |
281 # type: (Any, Optional[Any], Optional[Any]) -> None | |
282 """ | |
283 there is a problem as eol comments should start with ' #' | |
284 (but at the beginning of the line the space doesn't have to be before | |
285 the #. The column index is for the # mark | |
286 """ | |
287 from .tokens import CommentToken | |
288 from .error import CommentMark | |
289 | |
290 if column is None: | |
291 try: | |
292 column = self._yaml_get_column(key) | |
293 except AttributeError: | |
294 column = 0 | |
295 if comment[0] != '#': | |
296 comment = '# ' + comment | |
297 if column is None: | |
298 if comment[0] == '#': | |
299 comment = ' ' + comment | |
300 column = 0 | |
301 start_mark = CommentMark(column) | |
302 ct = [CommentToken(comment, start_mark, None), None] | |
303 self._yaml_add_eol_comment(ct, key=key) | |
304 | |
305 @property | |
306 def lc(self): | |
307 # type: () -> Any | |
308 if not hasattr(self, LineCol.attrib): | |
309 setattr(self, LineCol.attrib, LineCol()) | |
310 return getattr(self, LineCol.attrib) | |
311 | |
312 def _yaml_set_line_col(self, line, col): | |
313 # type: (Any, Any) -> None | |
314 self.lc.line = line | |
315 self.lc.col = col | |
316 | |
317 def _yaml_set_kv_line_col(self, key, data): | |
318 # type: (Any, Any) -> None | |
319 self.lc.add_kv_line_col(key, data) | |
320 | |
321 def _yaml_set_idx_line_col(self, key, data): | |
322 # type: (Any, Any) -> None | |
323 self.lc.add_idx_line_col(key, data) | |
324 | |
325 @property | |
326 def anchor(self): | |
327 # type: () -> Any | |
328 if not hasattr(self, Anchor.attrib): | |
329 setattr(self, Anchor.attrib, Anchor()) | |
330 return getattr(self, Anchor.attrib) | |
331 | |
332 def yaml_anchor(self): | |
333 # type: () -> Any | |
334 if not hasattr(self, Anchor.attrib): | |
335 return None | |
336 return self.anchor | |
337 | |
338 def yaml_set_anchor(self, value, always_dump=False): | |
339 # type: (Any, bool) -> None | |
340 self.anchor.value = value | |
341 self.anchor.always_dump = always_dump | |
342 | |
343 @property | |
344 def tag(self): | |
345 # type: () -> Any | |
346 if not hasattr(self, Tag.attrib): | |
347 setattr(self, Tag.attrib, Tag()) | |
348 return getattr(self, Tag.attrib) | |
349 | |
350 def yaml_set_tag(self, value): | |
351 # type: (Any) -> None | |
352 self.tag.value = value | |
353 | |
354 def copy_attributes(self, t, memo=None): | |
355 # type: (Any, Any) -> None | |
356 # fmt: off | |
357 for a in [Comment.attrib, Format.attrib, LineCol.attrib, Anchor.attrib, | |
358 Tag.attrib, merge_attrib]: | |
359 if hasattr(self, a): | |
360 if memo is not None: | |
361 setattr(t, a, copy.deepcopy(getattr(self, a, memo))) | |
362 else: | |
363 setattr(t, a, getattr(self, a)) | |
364 # fmt: on | |
365 | |
366 def _yaml_add_eol_comment(self, comment, key): | |
367 # type: (Any, Any) -> None | |
368 raise NotImplementedError | |
369 | |
370 def _yaml_get_pre_comment(self): | |
371 # type: () -> Any | |
372 raise NotImplementedError | |
373 | |
374 def _yaml_get_column(self, key): | |
375 # type: (Any) -> Any | |
376 raise NotImplementedError | |
377 | |
378 | |
379 class CommentedSeq(MutableSliceableSequence, list, CommentedBase): # type: ignore | |
380 __slots__ = (Comment.attrib, '_lst') | |
381 | |
382 def __init__(self, *args, **kw): | |
383 # type: (Any, Any) -> None | |
384 list.__init__(self, *args, **kw) | |
385 | |
386 def __getsingleitem__(self, idx): | |
387 # type: (Any) -> Any | |
388 return list.__getitem__(self, idx) | |
389 | |
390 def __setsingleitem__(self, idx, value): | |
391 # type: (Any, Any) -> None | |
392 # try to preserve the scalarstring type if setting an existing key to a new value | |
393 if idx < len(self): | |
394 if ( | |
395 isinstance(value, string_types) | |
396 and not isinstance(value, ScalarString) | |
397 and isinstance(self[idx], ScalarString) | |
398 ): | |
399 value = type(self[idx])(value) | |
400 list.__setitem__(self, idx, value) | |
401 | |
402 def __delsingleitem__(self, idx=None): | |
403 # type: (Any) -> Any | |
404 list.__delitem__(self, idx) | |
405 self.ca.items.pop(idx, None) # might not be there -> default value | |
406 for list_index in sorted(self.ca.items): | |
407 if list_index < idx: | |
408 continue | |
409 self.ca.items[list_index - 1] = self.ca.items.pop(list_index) | |
410 | |
411 def __len__(self): | |
412 # type: () -> int | |
413 return list.__len__(self) | |
414 | |
415 def insert(self, idx, val): | |
416 # type: (Any, Any) -> None | |
417 """the comments after the insertion have to move forward""" | |
418 list.insert(self, idx, val) | |
419 for list_index in sorted(self.ca.items, reverse=True): | |
420 if list_index < idx: | |
421 break | |
422 self.ca.items[list_index + 1] = self.ca.items.pop(list_index) | |
423 | |
424 def extend(self, val): | |
425 # type: (Any) -> None | |
426 list.extend(self, val) | |
427 | |
428 def __eq__(self, other): | |
429 # type: (Any) -> bool | |
430 return list.__eq__(self, other) | |
431 | |
432 def _yaml_add_comment(self, comment, key=NoComment): | |
433 # type: (Any, Optional[Any]) -> None | |
434 if key is not NoComment: | |
435 self.yaml_key_comment_extend(key, comment) | |
436 else: | |
437 self.ca.comment = comment | |
438 | |
439 def _yaml_add_eol_comment(self, comment, key): | |
440 # type: (Any, Any) -> None | |
441 self._yaml_add_comment(comment, key=key) | |
442 | |
443 def _yaml_get_columnX(self, key): | |
444 # type: (Any) -> Any | |
445 return self.ca.items[key][0].start_mark.column | |
446 | |
447 def _yaml_get_column(self, key): | |
448 # type: (Any) -> Any | |
449 column = None | |
450 sel_idx = None | |
451 pre, post = key - 1, key + 1 | |
452 if pre in self.ca.items: | |
453 sel_idx = pre | |
454 elif post in self.ca.items: | |
455 sel_idx = post | |
456 else: | |
457 # self.ca.items is not ordered | |
458 for row_idx, _k1 in enumerate(self): | |
459 if row_idx >= key: | |
460 break | |
461 if row_idx not in self.ca.items: | |
462 continue | |
463 sel_idx = row_idx | |
464 if sel_idx is not None: | |
465 column = self._yaml_get_columnX(sel_idx) | |
466 return column | |
467 | |
468 def _yaml_get_pre_comment(self): | |
469 # type: () -> Any | |
470 pre_comments = [] # type: List[Any] | |
471 if self.ca.comment is None: | |
472 self.ca.comment = [None, pre_comments] | |
473 else: | |
474 self.ca.comment[1] = pre_comments | |
475 return pre_comments | |
476 | |
477 def __deepcopy__(self, memo): | |
478 # type: (Any) -> Any | |
479 res = self.__class__() | |
480 memo[id(self)] = res | |
481 for k in self: | |
482 res.append(copy.deepcopy(k, memo)) | |
483 self.copy_attributes(res, memo=memo) | |
484 return res | |
485 | |
486 def __add__(self, other): | |
487 # type: (Any) -> Any | |
488 return list.__add__(self, other) | |
489 | |
490 def sort(self, key=None, reverse=False): # type: ignore | |
491 # type: (Any, bool) -> None | |
492 if key is None: | |
493 tmp_lst = sorted(zip(self, range(len(self))), reverse=reverse) | |
494 list.__init__(self, [x[0] for x in tmp_lst]) | |
495 else: | |
496 tmp_lst = sorted( | |
497 zip(map(key, list.__iter__(self)), range(len(self))), reverse=reverse | |
498 ) | |
499 list.__init__(self, [list.__getitem__(self, x[1]) for x in tmp_lst]) | |
500 itm = self.ca.items | |
501 self.ca._items = {} | |
502 for idx, x in enumerate(tmp_lst): | |
503 old_index = x[1] | |
504 if old_index in itm: | |
505 self.ca.items[idx] = itm[old_index] | |
506 | |
507 def __repr__(self): | |
508 # type: () -> Any | |
509 return list.__repr__(self) | |
510 | |
511 | |
512 class CommentedKeySeq(tuple, CommentedBase): # type: ignore | |
513 """This primarily exists to be able to roundtrip keys that are sequences""" | |
514 | |
515 def _yaml_add_comment(self, comment, key=NoComment): | |
516 # type: (Any, Optional[Any]) -> None | |
517 if key is not NoComment: | |
518 self.yaml_key_comment_extend(key, comment) | |
519 else: | |
520 self.ca.comment = comment | |
521 | |
522 def _yaml_add_eol_comment(self, comment, key): | |
523 # type: (Any, Any) -> None | |
524 self._yaml_add_comment(comment, key=key) | |
525 | |
526 def _yaml_get_columnX(self, key): | |
527 # type: (Any) -> Any | |
528 return self.ca.items[key][0].start_mark.column | |
529 | |
530 def _yaml_get_column(self, key): | |
531 # type: (Any) -> Any | |
532 column = None | |
533 sel_idx = None | |
534 pre, post = key - 1, key + 1 | |
535 if pre in self.ca.items: | |
536 sel_idx = pre | |
537 elif post in self.ca.items: | |
538 sel_idx = post | |
539 else: | |
540 # self.ca.items is not ordered | |
541 for row_idx, _k1 in enumerate(self): | |
542 if row_idx >= key: | |
543 break | |
544 if row_idx not in self.ca.items: | |
545 continue | |
546 sel_idx = row_idx | |
547 if sel_idx is not None: | |
548 column = self._yaml_get_columnX(sel_idx) | |
549 return column | |
550 | |
551 def _yaml_get_pre_comment(self): | |
552 # type: () -> Any | |
553 pre_comments = [] # type: List[Any] | |
554 if self.ca.comment is None: | |
555 self.ca.comment = [None, pre_comments] | |
556 else: | |
557 self.ca.comment[1] = pre_comments | |
558 return pre_comments | |
559 | |
560 | |
561 class CommentedMapView(Sized): | |
562 __slots__ = ('_mapping',) | |
563 | |
564 def __init__(self, mapping): | |
565 # type: (Any) -> None | |
566 self._mapping = mapping | |
567 | |
568 def __len__(self): | |
569 # type: () -> int | |
570 count = len(self._mapping) | |
571 return count | |
572 | |
573 | |
574 class CommentedMapKeysView(CommentedMapView, Set): # type: ignore | |
575 __slots__ = () | |
576 | |
577 @classmethod | |
578 def _from_iterable(self, it): | |
579 # type: (Any) -> Any | |
580 return set(it) | |
581 | |
582 def __contains__(self, key): | |
583 # type: (Any) -> Any | |
584 return key in self._mapping | |
585 | |
586 def __iter__(self): | |
587 # type: () -> Any # yield from self._mapping # not in py27, pypy | |
588 # for x in self._mapping._keys(): | |
589 for x in self._mapping: | |
590 yield x | |
591 | |
592 | |
593 class CommentedMapItemsView(CommentedMapView, Set): # type: ignore | |
594 __slots__ = () | |
595 | |
596 @classmethod | |
597 def _from_iterable(self, it): | |
598 # type: (Any) -> Any | |
599 return set(it) | |
600 | |
601 def __contains__(self, item): | |
602 # type: (Any) -> Any | |
603 key, value = item | |
604 try: | |
605 v = self._mapping[key] | |
606 except KeyError: | |
607 return False | |
608 else: | |
609 return v == value | |
610 | |
611 def __iter__(self): | |
612 # type: () -> Any | |
613 for key in self._mapping._keys(): | |
614 yield (key, self._mapping[key]) | |
615 | |
616 | |
617 class CommentedMapValuesView(CommentedMapView): | |
618 __slots__ = () | |
619 | |
620 def __contains__(self, value): | |
621 # type: (Any) -> Any | |
622 for key in self._mapping: | |
623 if value == self._mapping[key]: | |
624 return True | |
625 return False | |
626 | |
627 def __iter__(self): | |
628 # type: () -> Any | |
629 for key in self._mapping._keys(): | |
630 yield self._mapping[key] | |
631 | |
632 | |
633 class CommentedMap(ordereddict, CommentedBase): # type: ignore | |
634 __slots__ = (Comment.attrib, '_ok', '_ref') | |
635 | |
636 def __init__(self, *args, **kw): | |
637 # type: (Any, Any) -> None | |
638 self._ok = set() # type: MutableSet[Any] # own keys | |
639 self._ref = [] # type: List[CommentedMap] | |
640 ordereddict.__init__(self, *args, **kw) | |
641 | |
642 def _yaml_add_comment(self, comment, key=NoComment, value=NoComment): | |
643 # type: (Any, Optional[Any], Optional[Any]) -> None | |
644 """values is set to key to indicate a value attachment of comment""" | |
645 if key is not NoComment: | |
646 self.yaml_key_comment_extend(key, comment) | |
647 return | |
648 if value is not NoComment: | |
649 self.yaml_value_comment_extend(value, comment) | |
650 else: | |
651 self.ca.comment = comment | |
652 | |
653 def _yaml_add_eol_comment(self, comment, key): | |
654 # type: (Any, Any) -> None | |
655 """add on the value line, with value specified by the key""" | |
656 self._yaml_add_comment(comment, value=key) | |
657 | |
658 def _yaml_get_columnX(self, key): | |
659 # type: (Any) -> Any | |
660 return self.ca.items[key][2].start_mark.column | |
661 | |
662 def _yaml_get_column(self, key): | |
663 # type: (Any) -> Any | |
664 column = None | |
665 sel_idx = None | |
666 pre, post, last = None, None, None | |
667 for x in self: | |
668 if pre is not None and x != key: | |
669 post = x | |
670 break | |
671 if x == key: | |
672 pre = last | |
673 last = x | |
674 if pre in self.ca.items: | |
675 sel_idx = pre | |
676 elif post in self.ca.items: | |
677 sel_idx = post | |
678 else: | |
679 # self.ca.items is not ordered | |
680 for k1 in self: | |
681 if k1 >= key: | |
682 break | |
683 if k1 not in self.ca.items: | |
684 continue | |
685 sel_idx = k1 | |
686 if sel_idx is not None: | |
687 column = self._yaml_get_columnX(sel_idx) | |
688 return column | |
689 | |
690 def _yaml_get_pre_comment(self): | |
691 # type: () -> Any | |
692 pre_comments = [] # type: List[Any] | |
693 if self.ca.comment is None: | |
694 self.ca.comment = [None, pre_comments] | |
695 else: | |
696 self.ca.comment[1] = pre_comments | |
697 return pre_comments | |
698 | |
699 def update(self, vals): | |
700 # type: (Any) -> None | |
701 try: | |
702 ordereddict.update(self, vals) | |
703 except TypeError: | |
704 # probably a dict that is used | |
705 for x in vals: | |
706 self[x] = vals[x] | |
707 try: | |
708 self._ok.update(vals.keys()) # type: ignore | |
709 except AttributeError: | |
710 # assume a list/tuple of two element lists/tuples | |
711 for x in vals: | |
712 self._ok.add(x[0]) | |
713 | |
714 def insert(self, pos, key, value, comment=None): | |
715 # type: (Any, Any, Any, Optional[Any]) -> None | |
716 """insert key value into given position | |
717 attach comment if provided | |
718 """ | |
719 ordereddict.insert(self, pos, key, value) | |
720 self._ok.add(key) | |
721 if comment is not None: | |
722 self.yaml_add_eol_comment(comment, key=key) | |
723 | |
724 def mlget(self, key, default=None, list_ok=False): | |
725 # type: (Any, Any, Any) -> Any | |
726 """multi-level get that expects dicts within dicts""" | |
727 if not isinstance(key, list): | |
728 return self.get(key, default) | |
729 # assume that the key is a list of recursively accessible dicts | |
730 | |
731 def get_one_level(key_list, level, d): | |
732 # type: (Any, Any, Any) -> Any | |
733 if not list_ok: | |
734 assert isinstance(d, dict) | |
735 if level >= len(key_list): | |
736 if level > len(key_list): | |
737 raise IndexError | |
738 return d[key_list[level - 1]] | |
739 return get_one_level(key_list, level + 1, d[key_list[level - 1]]) | |
740 | |
741 try: | |
742 return get_one_level(key, 1, self) | |
743 except KeyError: | |
744 return default | |
745 except (TypeError, IndexError): | |
746 if not list_ok: | |
747 raise | |
748 return default | |
749 | |
750 def __getitem__(self, key): | |
751 # type: (Any) -> Any | |
752 try: | |
753 return ordereddict.__getitem__(self, key) | |
754 except KeyError: | |
755 for merged in getattr(self, merge_attrib, []): | |
756 if key in merged[1]: | |
757 return merged[1][key] | |
758 raise | |
759 | |
760 def __setitem__(self, key, value): | |
761 # type: (Any, Any) -> None | |
762 # try to preserve the scalarstring type if setting an existing key to a new value | |
763 if key in self: | |
764 if ( | |
765 isinstance(value, string_types) | |
766 and not isinstance(value, ScalarString) | |
767 and isinstance(self[key], ScalarString) | |
768 ): | |
769 value = type(self[key])(value) | |
770 ordereddict.__setitem__(self, key, value) | |
771 self._ok.add(key) | |
772 | |
773 def _unmerged_contains(self, key): | |
774 # type: (Any) -> Any | |
775 if key in self._ok: | |
776 return True | |
777 return None | |
778 | |
779 def __contains__(self, key): | |
780 # type: (Any) -> bool | |
781 return bool(ordereddict.__contains__(self, key)) | |
782 | |
783 def get(self, key, default=None): | |
784 # type: (Any, Any) -> Any | |
785 try: | |
786 return self.__getitem__(key) | |
787 except: # NOQA | |
788 return default | |
789 | |
790 def __repr__(self): | |
791 # type: () -> Any | |
792 return ordereddict.__repr__(self).replace('CommentedMap', 'ordereddict') | |
793 | |
794 def non_merged_items(self): | |
795 # type: () -> Any | |
796 for x in ordereddict.__iter__(self): | |
797 if x in self._ok: | |
798 yield x, ordereddict.__getitem__(self, x) | |
799 | |
800 def __delitem__(self, key): | |
801 # type: (Any) -> None | |
802 # for merged in getattr(self, merge_attrib, []): | |
803 # if key in merged[1]: | |
804 # value = merged[1][key] | |
805 # break | |
806 # else: | |
807 # # not found in merged in stuff | |
808 # ordereddict.__delitem__(self, key) | |
809 # for referer in self._ref: | |
810 # referer.update_key_value(key) | |
811 # return | |
812 # | |
813 # ordereddict.__setitem__(self, key, value) # merge might have different value | |
814 # self._ok.discard(key) | |
815 self._ok.discard(key) | |
816 ordereddict.__delitem__(self, key) | |
817 for referer in self._ref: | |
818 referer.update_key_value(key) | |
819 | |
820 def __iter__(self): | |
821 # type: () -> Any | |
822 for x in ordereddict.__iter__(self): | |
823 yield x | |
824 | |
825 def _keys(self): | |
826 # type: () -> Any | |
827 for x in ordereddict.__iter__(self): | |
828 yield x | |
829 | |
830 def __len__(self): | |
831 # type: () -> int | |
832 return int(ordereddict.__len__(self)) | |
833 | |
834 def __eq__(self, other): | |
835 # type: (Any) -> bool | |
836 return bool(dict(self) == other) | |
837 | |
838 if PY2: | |
839 | |
840 def keys(self): | |
841 # type: () -> Any | |
842 return list(self._keys()) | |
843 | |
844 def iterkeys(self): | |
845 # type: () -> Any | |
846 return self._keys() | |
847 | |
848 def viewkeys(self): | |
849 # type: () -> Any | |
850 return CommentedMapKeysView(self) | |
851 | |
852 else: | |
853 | |
854 def keys(self): | |
855 # type: () -> Any | |
856 return CommentedMapKeysView(self) | |
857 | |
858 if PY2: | |
859 | |
860 def _values(self): | |
861 # type: () -> Any | |
862 for x in ordereddict.__iter__(self): | |
863 yield ordereddict.__getitem__(self, x) | |
864 | |
865 def values(self): | |
866 # type: () -> Any | |
867 return list(self._values()) | |
868 | |
869 def itervalues(self): | |
870 # type: () -> Any | |
871 return self._values() | |
872 | |
873 def viewvalues(self): | |
874 # type: () -> Any | |
875 return CommentedMapValuesView(self) | |
876 | |
877 else: | |
878 | |
879 def values(self): | |
880 # type: () -> Any | |
881 return CommentedMapValuesView(self) | |
882 | |
883 def _items(self): | |
884 # type: () -> Any | |
885 for x in ordereddict.__iter__(self): | |
886 yield x, ordereddict.__getitem__(self, x) | |
887 | |
888 if PY2: | |
889 | |
890 def items(self): | |
891 # type: () -> Any | |
892 return list(self._items()) | |
893 | |
894 def iteritems(self): | |
895 # type: () -> Any | |
896 return self._items() | |
897 | |
898 def viewitems(self): | |
899 # type: () -> Any | |
900 return CommentedMapItemsView(self) | |
901 | |
902 else: | |
903 | |
904 def items(self): | |
905 # type: () -> Any | |
906 return CommentedMapItemsView(self) | |
907 | |
908 @property | |
909 def merge(self): | |
910 # type: () -> Any | |
911 if not hasattr(self, merge_attrib): | |
912 setattr(self, merge_attrib, []) | |
913 return getattr(self, merge_attrib) | |
914 | |
915 def copy(self): | |
916 # type: () -> Any | |
917 x = type(self)() # update doesn't work | |
918 for k, v in self._items(): | |
919 x[k] = v | |
920 self.copy_attributes(x) | |
921 return x | |
922 | |
923 def add_referent(self, cm): | |
924 # type: (Any) -> None | |
925 if cm not in self._ref: | |
926 self._ref.append(cm) | |
927 | |
928 def add_yaml_merge(self, value): | |
929 # type: (Any) -> None | |
930 for v in value: | |
931 v[1].add_referent(self) | |
932 for k, v in v[1].items(): | |
933 if ordereddict.__contains__(self, k): | |
934 continue | |
935 ordereddict.__setitem__(self, k, v) | |
936 self.merge.extend(value) | |
937 | |
938 def update_key_value(self, key): | |
939 # type: (Any) -> None | |
940 if key in self._ok: | |
941 return | |
942 for v in self.merge: | |
943 if key in v[1]: | |
944 ordereddict.__setitem__(self, key, v[1][key]) | |
945 return | |
946 ordereddict.__delitem__(self, key) | |
947 | |
948 def __deepcopy__(self, memo): | |
949 # type: (Any) -> Any | |
950 res = self.__class__() | |
951 memo[id(self)] = res | |
952 for k in self: | |
953 res[k] = copy.deepcopy(self[k], memo) | |
954 self.copy_attributes(res, memo=memo) | |
955 return res | |
956 | |
957 | |
958 # based on brownie mappings | |
959 @classmethod # type: ignore | |
960 def raise_immutable(cls, *args, **kwargs): | |
961 # type: (Any, *Any, **Any) -> None | |
962 raise TypeError('{} objects are immutable'.format(cls.__name__)) | |
963 | |
964 | |
965 class CommentedKeyMap(CommentedBase, Mapping): # type: ignore | |
966 __slots__ = Comment.attrib, '_od' | |
967 """This primarily exists to be able to roundtrip keys that are mappings""" | |
968 | |
969 def __init__(self, *args, **kw): | |
970 # type: (Any, Any) -> None | |
971 if hasattr(self, '_od'): | |
972 raise_immutable(self) | |
973 try: | |
974 self._od = ordereddict(*args, **kw) | |
975 except TypeError: | |
976 if PY2: | |
977 self._od = ordereddict(args[0].items()) | |
978 else: | |
979 raise | |
980 | |
981 __delitem__ = __setitem__ = clear = pop = popitem = setdefault = update = raise_immutable | |
982 | |
983 # need to implement __getitem__, __iter__ and __len__ | |
984 def __getitem__(self, index): | |
985 # type: (Any) -> Any | |
986 return self._od[index] | |
987 | |
988 def __iter__(self): | |
989 # type: () -> Iterator[Any] | |
990 for x in self._od.__iter__(): | |
991 yield x | |
992 | |
993 def __len__(self): | |
994 # type: () -> int | |
995 return len(self._od) | |
996 | |
997 def __hash__(self): | |
998 # type: () -> Any | |
999 return hash(tuple(self.items())) | |
1000 | |
1001 def __repr__(self): | |
1002 # type: () -> Any | |
1003 if not hasattr(self, merge_attrib): | |
1004 return self._od.__repr__() | |
1005 return 'ordereddict(' + repr(list(self._od.items())) + ')' | |
1006 | |
1007 @classmethod | |
1008 def fromkeys(keys, v=None): | |
1009 # type: (Any, Any) -> Any | |
1010 return CommentedKeyMap(dict.fromkeys(keys, v)) | |
1011 | |
1012 def _yaml_add_comment(self, comment, key=NoComment): | |
1013 # type: (Any, Optional[Any]) -> None | |
1014 if key is not NoComment: | |
1015 self.yaml_key_comment_extend(key, comment) | |
1016 else: | |
1017 self.ca.comment = comment | |
1018 | |
1019 def _yaml_add_eol_comment(self, comment, key): | |
1020 # type: (Any, Any) -> None | |
1021 self._yaml_add_comment(comment, key=key) | |
1022 | |
1023 def _yaml_get_columnX(self, key): | |
1024 # type: (Any) -> Any | |
1025 return self.ca.items[key][0].start_mark.column | |
1026 | |
1027 def _yaml_get_column(self, key): | |
1028 # type: (Any) -> Any | |
1029 column = None | |
1030 sel_idx = None | |
1031 pre, post = key - 1, key + 1 | |
1032 if pre in self.ca.items: | |
1033 sel_idx = pre | |
1034 elif post in self.ca.items: | |
1035 sel_idx = post | |
1036 else: | |
1037 # self.ca.items is not ordered | |
1038 for row_idx, _k1 in enumerate(self): | |
1039 if row_idx >= key: | |
1040 break | |
1041 if row_idx not in self.ca.items: | |
1042 continue | |
1043 sel_idx = row_idx | |
1044 if sel_idx is not None: | |
1045 column = self._yaml_get_columnX(sel_idx) | |
1046 return column | |
1047 | |
1048 def _yaml_get_pre_comment(self): | |
1049 # type: () -> Any | |
1050 pre_comments = [] # type: List[Any] | |
1051 if self.ca.comment is None: | |
1052 self.ca.comment = [None, pre_comments] | |
1053 else: | |
1054 self.ca.comment[1] = pre_comments | |
1055 return pre_comments | |
1056 | |
1057 | |
1058 class CommentedOrderedMap(CommentedMap): | |
1059 __slots__ = (Comment.attrib,) | |
1060 | |
1061 | |
1062 class CommentedSet(MutableSet, CommentedBase): # type: ignore # NOQA | |
1063 __slots__ = Comment.attrib, 'odict' | |
1064 | |
1065 def __init__(self, values=None): | |
1066 # type: (Any) -> None | |
1067 self.odict = ordereddict() | |
1068 MutableSet.__init__(self) | |
1069 if values is not None: | |
1070 self |= values # type: ignore | |
1071 | |
1072 def _yaml_add_comment(self, comment, key=NoComment, value=NoComment): | |
1073 # type: (Any, Optional[Any], Optional[Any]) -> None | |
1074 """values is set to key to indicate a value attachment of comment""" | |
1075 if key is not NoComment: | |
1076 self.yaml_key_comment_extend(key, comment) | |
1077 return | |
1078 if value is not NoComment: | |
1079 self.yaml_value_comment_extend(value, comment) | |
1080 else: | |
1081 self.ca.comment = comment | |
1082 | |
1083 def _yaml_add_eol_comment(self, comment, key): | |
1084 # type: (Any, Any) -> None | |
1085 """add on the value line, with value specified by the key""" | |
1086 self._yaml_add_comment(comment, value=key) | |
1087 | |
1088 def add(self, value): | |
1089 # type: (Any) -> None | |
1090 """Add an element.""" | |
1091 self.odict[value] = None | |
1092 | |
1093 def discard(self, value): | |
1094 # type: (Any) -> None | |
1095 """Remove an element. Do not raise an exception if absent.""" | |
1096 del self.odict[value] | |
1097 | |
1098 def __contains__(self, x): | |
1099 # type: (Any) -> Any | |
1100 return x in self.odict | |
1101 | |
1102 def __iter__(self): | |
1103 # type: () -> Any | |
1104 for x in self.odict: | |
1105 yield x | |
1106 | |
1107 def __len__(self): | |
1108 # type: () -> int | |
1109 return len(self.odict) | |
1110 | |
1111 def __repr__(self): | |
1112 # type: () -> str | |
1113 return 'set({0!r})'.format(self.odict.keys()) | |
1114 | |
1115 | |
1116 class TaggedScalar(CommentedBase): | |
1117 # the value and style attributes are set during roundtrip construction | |
1118 def __init__(self): | |
1119 # type: () -> None | |
1120 self.value = None | |
1121 self.style = None | |
1122 | |
1123 def __str__(self): | |
1124 # type: () -> Any | |
1125 return self.value | |
1126 | |
1127 | |
1128 def dump_comments(d, name="", sep='.', out=sys.stdout): | |
1129 # type: (Any, str, str, Any) -> None | |
1130 """ | |
1131 recursively dump comments, all but the toplevel preceded by the path | |
1132 in dotted form x.0.a | |
1133 """ | |
1134 if isinstance(d, dict) and hasattr(d, 'ca'): | |
1135 if name: | |
1136 sys.stdout.write('{}\n'.format(name)) | |
1137 out.write('{}\n'.format(d.ca)) # type: ignore | |
1138 for k in d: | |
1139 dump_comments(d[k], name=(name + sep + k) if name else k, sep=sep, out=out) | |
1140 elif isinstance(d, list) and hasattr(d, 'ca'): | |
1141 if name: | |
1142 sys.stdout.write('{}\n'.format(name)) | |
1143 out.write('{}\n'.format(d.ca)) # type: ignore | |
1144 for idx, k in enumerate(d): | |
1145 dump_comments( | |
1146 k, name=(name + sep + str(idx)) if name else str(idx), sep=sep, out=out | |
1147 ) |