Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/ruamel/yaml/reader.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 # coding: utf-8 | |
2 | |
3 from __future__ import absolute_import | |
4 | |
5 # This module contains abstractions for the input stream. You don't have to | |
6 # looks further, there are no pretty code. | |
7 # | |
8 # We define two classes here. | |
9 # | |
10 # Mark(source, line, column) | |
11 # It's just a record and its only use is producing nice error messages. | |
12 # Parser does not use it for any other purposes. | |
13 # | |
14 # Reader(source, data) | |
15 # Reader determines the encoding of `data` and converts it to unicode. | |
16 # Reader provides the following methods and attributes: | |
17 # reader.peek(length=1) - return the next `length` characters | |
18 # reader.forward(length=1) - move the current position to `length` | |
19 # characters. | |
20 # reader.index - the number of the current character. | |
21 # reader.line, stream.column - the line and the column of the current | |
22 # character. | |
23 | |
24 import codecs | |
25 | |
26 from ruamel.yaml.error import YAMLError, FileMark, StringMark, YAMLStreamError | |
27 from ruamel.yaml.compat import text_type, binary_type, PY3, UNICODE_SIZE | |
28 from ruamel.yaml.util import RegExp | |
29 | |
30 if False: # MYPY | |
31 from typing import Any, Dict, Optional, List, Union, Text, Tuple, Optional # NOQA | |
32 # from ruamel.yaml.compat import StreamTextType # NOQA | |
33 | |
34 __all__ = ['Reader', 'ReaderError'] | |
35 | |
36 | |
37 class ReaderError(YAMLError): | |
38 def __init__(self, name, position, character, encoding, reason): | |
39 # type: (Any, Any, Any, Any, Any) -> None | |
40 self.name = name | |
41 self.character = character | |
42 self.position = position | |
43 self.encoding = encoding | |
44 self.reason = reason | |
45 | |
46 def __str__(self): | |
47 # type: () -> str | |
48 if isinstance(self.character, binary_type): | |
49 return "'%s' codec can't decode byte #x%02x: %s\n" ' in "%s", position %d' % ( | |
50 self.encoding, | |
51 ord(self.character), | |
52 self.reason, | |
53 self.name, | |
54 self.position, | |
55 ) | |
56 else: | |
57 return 'unacceptable character #x%04x: %s\n' ' in "%s", position %d' % ( | |
58 self.character, | |
59 self.reason, | |
60 self.name, | |
61 self.position, | |
62 ) | |
63 | |
64 | |
65 class Reader(object): | |
66 # Reader: | |
67 # - determines the data encoding and converts it to a unicode string, | |
68 # - checks if characters are in allowed range, | |
69 # - adds '\0' to the end. | |
70 | |
71 # Reader accepts | |
72 # - a `str` object (PY2) / a `bytes` object (PY3), | |
73 # - a `unicode` object (PY2) / a `str` object (PY3), | |
74 # - a file-like object with its `read` method returning `str`, | |
75 # - a file-like object with its `read` method returning `unicode`. | |
76 | |
77 # Yeah, it's ugly and slow. | |
78 | |
79 def __init__(self, stream, loader=None): | |
80 # type: (Any, Any) -> None | |
81 self.loader = loader | |
82 if self.loader is not None and getattr(self.loader, '_reader', None) is None: | |
83 self.loader._reader = self | |
84 self.reset_reader() | |
85 self.stream = stream # type: Any # as .read is called | |
86 | |
87 def reset_reader(self): | |
88 # type: () -> None | |
89 self.name = None # type: Any | |
90 self.stream_pointer = 0 | |
91 self.eof = True | |
92 self.buffer = "" | |
93 self.pointer = 0 | |
94 self.raw_buffer = None # type: Any | |
95 self.raw_decode = None | |
96 self.encoding = None # type: Optional[Text] | |
97 self.index = 0 | |
98 self.line = 0 | |
99 self.column = 0 | |
100 | |
101 @property | |
102 def stream(self): | |
103 # type: () -> Any | |
104 try: | |
105 return self._stream | |
106 except AttributeError: | |
107 raise YAMLStreamError('input stream needs to specified') | |
108 | |
109 @stream.setter | |
110 def stream(self, val): | |
111 # type: (Any) -> None | |
112 if val is None: | |
113 return | |
114 self._stream = None | |
115 if isinstance(val, text_type): | |
116 self.name = '<unicode string>' | |
117 self.check_printable(val) | |
118 self.buffer = val + u'\0' # type: ignore | |
119 elif isinstance(val, binary_type): | |
120 self.name = '<byte string>' | |
121 self.raw_buffer = val | |
122 self.determine_encoding() | |
123 else: | |
124 if not hasattr(val, 'read'): | |
125 raise YAMLStreamError('stream argument needs to have a read() method') | |
126 self._stream = val | |
127 self.name = getattr(self.stream, 'name', '<file>') | |
128 self.eof = False | |
129 self.raw_buffer = None | |
130 self.determine_encoding() | |
131 | |
132 def peek(self, index=0): | |
133 # type: (int) -> Text | |
134 try: | |
135 return self.buffer[self.pointer + index] | |
136 except IndexError: | |
137 self.update(index + 1) | |
138 return self.buffer[self.pointer + index] | |
139 | |
140 def prefix(self, length=1): | |
141 # type: (int) -> Any | |
142 if self.pointer + length >= len(self.buffer): | |
143 self.update(length) | |
144 return self.buffer[self.pointer : self.pointer + length] | |
145 | |
146 def forward_1_1(self, length=1): | |
147 # type: (int) -> None | |
148 if self.pointer + length + 1 >= len(self.buffer): | |
149 self.update(length + 1) | |
150 while length != 0: | |
151 ch = self.buffer[self.pointer] | |
152 self.pointer += 1 | |
153 self.index += 1 | |
154 if ch in u'\n\x85\u2028\u2029' or ( | |
155 ch == u'\r' and self.buffer[self.pointer] != u'\n' | |
156 ): | |
157 self.line += 1 | |
158 self.column = 0 | |
159 elif ch != u'\uFEFF': | |
160 self.column += 1 | |
161 length -= 1 | |
162 | |
163 def forward(self, length=1): | |
164 # type: (int) -> None | |
165 if self.pointer + length + 1 >= len(self.buffer): | |
166 self.update(length + 1) | |
167 while length != 0: | |
168 ch = self.buffer[self.pointer] | |
169 self.pointer += 1 | |
170 self.index += 1 | |
171 if ch == u'\n' or (ch == u'\r' and self.buffer[self.pointer] != u'\n'): | |
172 self.line += 1 | |
173 self.column = 0 | |
174 elif ch != u'\uFEFF': | |
175 self.column += 1 | |
176 length -= 1 | |
177 | |
178 def get_mark(self): | |
179 # type: () -> Any | |
180 if self.stream is None: | |
181 return StringMark( | |
182 self.name, self.index, self.line, self.column, self.buffer, self.pointer | |
183 ) | |
184 else: | |
185 return FileMark(self.name, self.index, self.line, self.column) | |
186 | |
187 def determine_encoding(self): | |
188 # type: () -> None | |
189 while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2): | |
190 self.update_raw() | |
191 if isinstance(self.raw_buffer, binary_type): | |
192 if self.raw_buffer.startswith(codecs.BOM_UTF16_LE): | |
193 self.raw_decode = codecs.utf_16_le_decode # type: ignore | |
194 self.encoding = 'utf-16-le' | |
195 elif self.raw_buffer.startswith(codecs.BOM_UTF16_BE): | |
196 self.raw_decode = codecs.utf_16_be_decode # type: ignore | |
197 self.encoding = 'utf-16-be' | |
198 else: | |
199 self.raw_decode = codecs.utf_8_decode # type: ignore | |
200 self.encoding = 'utf-8' | |
201 self.update(1) | |
202 | |
203 if UNICODE_SIZE == 2: | |
204 NON_PRINTABLE = RegExp( | |
205 u'[^\x09\x0A\x0D\x20-\x7E\x85' u'\xA0-\uD7FF' u'\uE000-\uFFFD' u']' | |
206 ) | |
207 else: | |
208 NON_PRINTABLE = RegExp( | |
209 u'[^\x09\x0A\x0D\x20-\x7E\x85' | |
210 u'\xA0-\uD7FF' | |
211 u'\uE000-\uFFFD' | |
212 u'\U00010000-\U0010FFFF' | |
213 u']' | |
214 ) | |
215 | |
216 _printable_ascii = ('\x09\x0A\x0D' + "".join(map(chr, range(0x20, 0x7F)))).encode('ascii') | |
217 | |
218 @classmethod | |
219 def _get_non_printable_ascii(cls, data): # type: ignore | |
220 # type: (Text, bytes) -> Optional[Tuple[int, Text]] | |
221 ascii_bytes = data.encode('ascii') | |
222 non_printables = ascii_bytes.translate(None, cls._printable_ascii) # type: ignore | |
223 if not non_printables: | |
224 return None | |
225 non_printable = non_printables[:1] | |
226 return ascii_bytes.index(non_printable), non_printable.decode('ascii') | |
227 | |
228 @classmethod | |
229 def _get_non_printable_regex(cls, data): | |
230 # type: (Text) -> Optional[Tuple[int, Text]] | |
231 match = cls.NON_PRINTABLE.search(data) | |
232 if not bool(match): | |
233 return None | |
234 return match.start(), match.group() | |
235 | |
236 @classmethod | |
237 def _get_non_printable(cls, data): | |
238 # type: (Text) -> Optional[Tuple[int, Text]] | |
239 try: | |
240 return cls._get_non_printable_ascii(data) # type: ignore | |
241 except UnicodeEncodeError: | |
242 return cls._get_non_printable_regex(data) | |
243 | |
244 def check_printable(self, data): | |
245 # type: (Any) -> None | |
246 non_printable_match = self._get_non_printable(data) | |
247 if non_printable_match is not None: | |
248 start, character = non_printable_match | |
249 position = self.index + (len(self.buffer) - self.pointer) + start | |
250 raise ReaderError( | |
251 self.name, | |
252 position, | |
253 ord(character), | |
254 'unicode', | |
255 'special characters are not allowed', | |
256 ) | |
257 | |
258 def update(self, length): | |
259 # type: (int) -> None | |
260 if self.raw_buffer is None: | |
261 return | |
262 self.buffer = self.buffer[self.pointer :] | |
263 self.pointer = 0 | |
264 while len(self.buffer) < length: | |
265 if not self.eof: | |
266 self.update_raw() | |
267 if self.raw_decode is not None: | |
268 try: | |
269 data, converted = self.raw_decode(self.raw_buffer, 'strict', self.eof) | |
270 except UnicodeDecodeError as exc: | |
271 if PY3: | |
272 character = self.raw_buffer[exc.start] | |
273 else: | |
274 character = exc.object[exc.start] | |
275 if self.stream is not None: | |
276 position = self.stream_pointer - len(self.raw_buffer) + exc.start | |
277 elif self.stream is not None: | |
278 position = self.stream_pointer - len(self.raw_buffer) + exc.start | |
279 else: | |
280 position = exc.start | |
281 raise ReaderError(self.name, position, character, exc.encoding, exc.reason) | |
282 else: | |
283 data = self.raw_buffer | |
284 converted = len(data) | |
285 self.check_printable(data) | |
286 self.buffer += data | |
287 self.raw_buffer = self.raw_buffer[converted:] | |
288 if self.eof: | |
289 self.buffer += '\0' | |
290 self.raw_buffer = None | |
291 break | |
292 | |
293 def update_raw(self, size=None): | |
294 # type: (Optional[int]) -> None | |
295 if size is None: | |
296 size = 4096 if PY3 else 1024 | |
297 data = self.stream.read(size) | |
298 if self.raw_buffer is None: | |
299 self.raw_buffer = data | |
300 else: | |
301 self.raw_buffer += data | |
302 self.stream_pointer += len(data) | |
303 if not data: | |
304 self.eof = True | |
305 | |
306 | |
307 # try: | |
308 # import psyco | |
309 # psyco.bind(Reader) | |
310 # except ImportError: | |
311 # pass |