comparison env/lib/python3.7/site-packages/ruamel/yaml/composer.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 # coding: utf-8
2
3 from __future__ import absolute_import, print_function
4
5 import warnings
6
7 from ruamel.yaml.error import MarkedYAMLError, ReusedAnchorWarning
8 from ruamel.yaml.compat import utf8, nprint, nprintf # NOQA
9
10 from ruamel.yaml.events import (
11 StreamStartEvent,
12 StreamEndEvent,
13 MappingStartEvent,
14 MappingEndEvent,
15 SequenceStartEvent,
16 SequenceEndEvent,
17 AliasEvent,
18 ScalarEvent,
19 )
20 from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode
21
22 if False: # MYPY
23 from typing import Any, Dict, Optional, List # NOQA
24
25 __all__ = ['Composer', 'ComposerError']
26
27
28 class ComposerError(MarkedYAMLError):
29 pass
30
31
32 class Composer(object):
33 def __init__(self, loader=None):
34 # type: (Any) -> None
35 self.loader = loader
36 if self.loader is not None and getattr(self.loader, '_composer', None) is None:
37 self.loader._composer = self
38 self.anchors = {} # type: Dict[Any, Any]
39
40 @property
41 def parser(self):
42 # type: () -> Any
43 if hasattr(self.loader, 'typ'):
44 self.loader.parser
45 return self.loader._parser
46
47 @property
48 def resolver(self):
49 # type: () -> Any
50 # assert self.loader._resolver is not None
51 if hasattr(self.loader, 'typ'):
52 self.loader.resolver
53 return self.loader._resolver
54
55 def check_node(self):
56 # type: () -> Any
57 # Drop the STREAM-START event.
58 if self.parser.check_event(StreamStartEvent):
59 self.parser.get_event()
60
61 # If there are more documents available?
62 return not self.parser.check_event(StreamEndEvent)
63
64 def get_node(self):
65 # type: () -> Any
66 # Get the root node of the next document.
67 if not self.parser.check_event(StreamEndEvent):
68 return self.compose_document()
69
70 def get_single_node(self):
71 # type: () -> Any
72 # Drop the STREAM-START event.
73 self.parser.get_event()
74
75 # Compose a document if the stream is not empty.
76 document = None # type: Any
77 if not self.parser.check_event(StreamEndEvent):
78 document = self.compose_document()
79
80 # Ensure that the stream contains no more documents.
81 if not self.parser.check_event(StreamEndEvent):
82 event = self.parser.get_event()
83 raise ComposerError(
84 'expected a single document in the stream',
85 document.start_mark,
86 'but found another document',
87 event.start_mark,
88 )
89
90 # Drop the STREAM-END event.
91 self.parser.get_event()
92
93 return document
94
95 def compose_document(self):
96 # type: (Any) -> Any
97 # Drop the DOCUMENT-START event.
98 self.parser.get_event()
99
100 # Compose the root node.
101 node = self.compose_node(None, None)
102
103 # Drop the DOCUMENT-END event.
104 self.parser.get_event()
105
106 self.anchors = {}
107 return node
108
109 def compose_node(self, parent, index):
110 # type: (Any, Any) -> Any
111 if self.parser.check_event(AliasEvent):
112 event = self.parser.get_event()
113 alias = event.anchor
114 if alias not in self.anchors:
115 raise ComposerError(
116 None, None, 'found undefined alias %r' % utf8(alias), event.start_mark
117 )
118 return self.anchors[alias]
119 event = self.parser.peek_event()
120 anchor = event.anchor
121 if anchor is not None: # have an anchor
122 if anchor in self.anchors:
123 # raise ComposerError(
124 # "found duplicate anchor %r; first occurrence"
125 # % utf8(anchor), self.anchors[anchor].start_mark,
126 # "second occurrence", event.start_mark)
127 ws = (
128 '\nfound duplicate anchor {!r}\nfirst occurrence {}\nsecond occurrence '
129 '{}'.format((anchor), self.anchors[anchor].start_mark, event.start_mark)
130 )
131 warnings.warn(ws, ReusedAnchorWarning)
132 self.resolver.descend_resolver(parent, index)
133 if self.parser.check_event(ScalarEvent):
134 node = self.compose_scalar_node(anchor)
135 elif self.parser.check_event(SequenceStartEvent):
136 node = self.compose_sequence_node(anchor)
137 elif self.parser.check_event(MappingStartEvent):
138 node = self.compose_mapping_node(anchor)
139 self.resolver.ascend_resolver()
140 return node
141
142 def compose_scalar_node(self, anchor):
143 # type: (Any) -> Any
144 event = self.parser.get_event()
145 tag = event.tag
146 if tag is None or tag == u'!':
147 tag = self.resolver.resolve(ScalarNode, event.value, event.implicit)
148 node = ScalarNode(
149 tag,
150 event.value,
151 event.start_mark,
152 event.end_mark,
153 style=event.style,
154 comment=event.comment,
155 anchor=anchor,
156 )
157 if anchor is not None:
158 self.anchors[anchor] = node
159 return node
160
161 def compose_sequence_node(self, anchor):
162 # type: (Any) -> Any
163 start_event = self.parser.get_event()
164 tag = start_event.tag
165 if tag is None or tag == u'!':
166 tag = self.resolver.resolve(SequenceNode, None, start_event.implicit)
167 node = SequenceNode(
168 tag,
169 [],
170 start_event.start_mark,
171 None,
172 flow_style=start_event.flow_style,
173 comment=start_event.comment,
174 anchor=anchor,
175 )
176 if anchor is not None:
177 self.anchors[anchor] = node
178 index = 0
179 while not self.parser.check_event(SequenceEndEvent):
180 node.value.append(self.compose_node(node, index))
181 index += 1
182 end_event = self.parser.get_event()
183 if node.flow_style is True and end_event.comment is not None:
184 if node.comment is not None:
185 nprint(
186 'Warning: unexpected end_event commment in sequence '
187 'node {}'.format(node.flow_style)
188 )
189 node.comment = end_event.comment
190 node.end_mark = end_event.end_mark
191 self.check_end_doc_comment(end_event, node)
192 return node
193
194 def compose_mapping_node(self, anchor):
195 # type: (Any) -> Any
196 start_event = self.parser.get_event()
197 tag = start_event.tag
198 if tag is None or tag == u'!':
199 tag = self.resolver.resolve(MappingNode, None, start_event.implicit)
200 node = MappingNode(
201 tag,
202 [],
203 start_event.start_mark,
204 None,
205 flow_style=start_event.flow_style,
206 comment=start_event.comment,
207 anchor=anchor,
208 )
209 if anchor is not None:
210 self.anchors[anchor] = node
211 while not self.parser.check_event(MappingEndEvent):
212 # key_event = self.parser.peek_event()
213 item_key = self.compose_node(node, None)
214 # if item_key in node.value:
215 # raise ComposerError("while composing a mapping",
216 # start_event.start_mark,
217 # "found duplicate key", key_event.start_mark)
218 item_value = self.compose_node(node, item_key)
219 # node.value[item_key] = item_value
220 node.value.append((item_key, item_value))
221 end_event = self.parser.get_event()
222 if node.flow_style is True and end_event.comment is not None:
223 node.comment = end_event.comment
224 node.end_mark = end_event.end_mark
225 self.check_end_doc_comment(end_event, node)
226 return node
227
228 def check_end_doc_comment(self, end_event, node):
229 # type: (Any, Any) -> None
230 if end_event.comment and end_event.comment[1]:
231 # pre comments on an end_event, no following to move to
232 if node.comment is None:
233 node.comment = [None, None]
234 assert not isinstance(node, ScalarEvent)
235 # this is a post comment on a mapping node, add as third element
236 # in the list
237 node.comment.append(end_event.comment[1])
238 end_event.comment[1] = None