Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/ruamel/yaml/composer.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 # coding: utf-8 | |
2 | |
3 from __future__ import absolute_import, print_function | |
4 | |
5 import warnings | |
6 | |
7 from ruamel.yaml.error import MarkedYAMLError, ReusedAnchorWarning | |
8 from ruamel.yaml.compat import utf8, nprint, nprintf # NOQA | |
9 | |
10 from ruamel.yaml.events import ( | |
11 StreamStartEvent, | |
12 StreamEndEvent, | |
13 MappingStartEvent, | |
14 MappingEndEvent, | |
15 SequenceStartEvent, | |
16 SequenceEndEvent, | |
17 AliasEvent, | |
18 ScalarEvent, | |
19 ) | |
20 from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode | |
21 | |
22 if False: # MYPY | |
23 from typing import Any, Dict, Optional, List # NOQA | |
24 | |
25 __all__ = ['Composer', 'ComposerError'] | |
26 | |
27 | |
28 class ComposerError(MarkedYAMLError): | |
29 pass | |
30 | |
31 | |
32 class Composer(object): | |
33 def __init__(self, loader=None): | |
34 # type: (Any) -> None | |
35 self.loader = loader | |
36 if self.loader is not None and getattr(self.loader, '_composer', None) is None: | |
37 self.loader._composer = self | |
38 self.anchors = {} # type: Dict[Any, Any] | |
39 | |
40 @property | |
41 def parser(self): | |
42 # type: () -> Any | |
43 if hasattr(self.loader, 'typ'): | |
44 self.loader.parser | |
45 return self.loader._parser | |
46 | |
47 @property | |
48 def resolver(self): | |
49 # type: () -> Any | |
50 # assert self.loader._resolver is not None | |
51 if hasattr(self.loader, 'typ'): | |
52 self.loader.resolver | |
53 return self.loader._resolver | |
54 | |
55 def check_node(self): | |
56 # type: () -> Any | |
57 # Drop the STREAM-START event. | |
58 if self.parser.check_event(StreamStartEvent): | |
59 self.parser.get_event() | |
60 | |
61 # If there are more documents available? | |
62 return not self.parser.check_event(StreamEndEvent) | |
63 | |
64 def get_node(self): | |
65 # type: () -> Any | |
66 # Get the root node of the next document. | |
67 if not self.parser.check_event(StreamEndEvent): | |
68 return self.compose_document() | |
69 | |
70 def get_single_node(self): | |
71 # type: () -> Any | |
72 # Drop the STREAM-START event. | |
73 self.parser.get_event() | |
74 | |
75 # Compose a document if the stream is not empty. | |
76 document = None # type: Any | |
77 if not self.parser.check_event(StreamEndEvent): | |
78 document = self.compose_document() | |
79 | |
80 # Ensure that the stream contains no more documents. | |
81 if not self.parser.check_event(StreamEndEvent): | |
82 event = self.parser.get_event() | |
83 raise ComposerError( | |
84 'expected a single document in the stream', | |
85 document.start_mark, | |
86 'but found another document', | |
87 event.start_mark, | |
88 ) | |
89 | |
90 # Drop the STREAM-END event. | |
91 self.parser.get_event() | |
92 | |
93 return document | |
94 | |
95 def compose_document(self): | |
96 # type: (Any) -> Any | |
97 # Drop the DOCUMENT-START event. | |
98 self.parser.get_event() | |
99 | |
100 # Compose the root node. | |
101 node = self.compose_node(None, None) | |
102 | |
103 # Drop the DOCUMENT-END event. | |
104 self.parser.get_event() | |
105 | |
106 self.anchors = {} | |
107 return node | |
108 | |
109 def compose_node(self, parent, index): | |
110 # type: (Any, Any) -> Any | |
111 if self.parser.check_event(AliasEvent): | |
112 event = self.parser.get_event() | |
113 alias = event.anchor | |
114 if alias not in self.anchors: | |
115 raise ComposerError( | |
116 None, None, 'found undefined alias %r' % utf8(alias), event.start_mark | |
117 ) | |
118 return self.anchors[alias] | |
119 event = self.parser.peek_event() | |
120 anchor = event.anchor | |
121 if anchor is not None: # have an anchor | |
122 if anchor in self.anchors: | |
123 # raise ComposerError( | |
124 # "found duplicate anchor %r; first occurrence" | |
125 # % utf8(anchor), self.anchors[anchor].start_mark, | |
126 # "second occurrence", event.start_mark) | |
127 ws = ( | |
128 '\nfound duplicate anchor {!r}\nfirst occurrence {}\nsecond occurrence ' | |
129 '{}'.format((anchor), self.anchors[anchor].start_mark, event.start_mark) | |
130 ) | |
131 warnings.warn(ws, ReusedAnchorWarning) | |
132 self.resolver.descend_resolver(parent, index) | |
133 if self.parser.check_event(ScalarEvent): | |
134 node = self.compose_scalar_node(anchor) | |
135 elif self.parser.check_event(SequenceStartEvent): | |
136 node = self.compose_sequence_node(anchor) | |
137 elif self.parser.check_event(MappingStartEvent): | |
138 node = self.compose_mapping_node(anchor) | |
139 self.resolver.ascend_resolver() | |
140 return node | |
141 | |
142 def compose_scalar_node(self, anchor): | |
143 # type: (Any) -> Any | |
144 event = self.parser.get_event() | |
145 tag = event.tag | |
146 if tag is None or tag == u'!': | |
147 tag = self.resolver.resolve(ScalarNode, event.value, event.implicit) | |
148 node = ScalarNode( | |
149 tag, | |
150 event.value, | |
151 event.start_mark, | |
152 event.end_mark, | |
153 style=event.style, | |
154 comment=event.comment, | |
155 anchor=anchor, | |
156 ) | |
157 if anchor is not None: | |
158 self.anchors[anchor] = node | |
159 return node | |
160 | |
161 def compose_sequence_node(self, anchor): | |
162 # type: (Any) -> Any | |
163 start_event = self.parser.get_event() | |
164 tag = start_event.tag | |
165 if tag is None or tag == u'!': | |
166 tag = self.resolver.resolve(SequenceNode, None, start_event.implicit) | |
167 node = SequenceNode( | |
168 tag, | |
169 [], | |
170 start_event.start_mark, | |
171 None, | |
172 flow_style=start_event.flow_style, | |
173 comment=start_event.comment, | |
174 anchor=anchor, | |
175 ) | |
176 if anchor is not None: | |
177 self.anchors[anchor] = node | |
178 index = 0 | |
179 while not self.parser.check_event(SequenceEndEvent): | |
180 node.value.append(self.compose_node(node, index)) | |
181 index += 1 | |
182 end_event = self.parser.get_event() | |
183 if node.flow_style is True and end_event.comment is not None: | |
184 if node.comment is not None: | |
185 nprint( | |
186 'Warning: unexpected end_event commment in sequence ' | |
187 'node {}'.format(node.flow_style) | |
188 ) | |
189 node.comment = end_event.comment | |
190 node.end_mark = end_event.end_mark | |
191 self.check_end_doc_comment(end_event, node) | |
192 return node | |
193 | |
194 def compose_mapping_node(self, anchor): | |
195 # type: (Any) -> Any | |
196 start_event = self.parser.get_event() | |
197 tag = start_event.tag | |
198 if tag is None or tag == u'!': | |
199 tag = self.resolver.resolve(MappingNode, None, start_event.implicit) | |
200 node = MappingNode( | |
201 tag, | |
202 [], | |
203 start_event.start_mark, | |
204 None, | |
205 flow_style=start_event.flow_style, | |
206 comment=start_event.comment, | |
207 anchor=anchor, | |
208 ) | |
209 if anchor is not None: | |
210 self.anchors[anchor] = node | |
211 while not self.parser.check_event(MappingEndEvent): | |
212 # key_event = self.parser.peek_event() | |
213 item_key = self.compose_node(node, None) | |
214 # if item_key in node.value: | |
215 # raise ComposerError("while composing a mapping", | |
216 # start_event.start_mark, | |
217 # "found duplicate key", key_event.start_mark) | |
218 item_value = self.compose_node(node, item_key) | |
219 # node.value[item_key] = item_value | |
220 node.value.append((item_key, item_value)) | |
221 end_event = self.parser.get_event() | |
222 if node.flow_style is True and end_event.comment is not None: | |
223 node.comment = end_event.comment | |
224 node.end_mark = end_event.end_mark | |
225 self.check_end_doc_comment(end_event, node) | |
226 return node | |
227 | |
228 def check_end_doc_comment(self, end_event, node): | |
229 # type: (Any, Any) -> None | |
230 if end_event.comment and end_event.comment[1]: | |
231 # pre comments on an end_event, no following to move to | |
232 if node.comment is None: | |
233 node.comment = [None, None] | |
234 assert not isinstance(node, ScalarEvent) | |
235 # this is a post comment on a mapping node, add as third element | |
236 # in the list | |
237 node.comment.append(end_event.comment[1]) | |
238 end_event.comment[1] = None |