Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/ruamel/yaml/main.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
comparison
equal
deleted
inserted
replaced
1:75ca89e9b81c | 2:6af9afd405e9 |
---|---|
1 # coding: utf-8 | |
2 | |
3 from __future__ import absolute_import, unicode_literals, print_function | |
4 | |
5 import sys | |
6 import os | |
7 import warnings | |
8 import glob | |
9 from importlib import import_module | |
10 | |
11 | |
12 import ruamel.yaml | |
13 from ruamel.yaml.error import UnsafeLoaderWarning, YAMLError # NOQA | |
14 | |
15 from ruamel.yaml.tokens import * # NOQA | |
16 from ruamel.yaml.events import * # NOQA | |
17 from ruamel.yaml.nodes import * # NOQA | |
18 | |
19 from ruamel.yaml.loader import BaseLoader, SafeLoader, Loader, RoundTripLoader # NOQA | |
20 from ruamel.yaml.dumper import BaseDumper, SafeDumper, Dumper, RoundTripDumper # NOQA | |
21 from ruamel.yaml.compat import StringIO, BytesIO, with_metaclass, PY3, nprint | |
22 from ruamel.yaml.resolver import VersionedResolver, Resolver # NOQA | |
23 from ruamel.yaml.representer import ( | |
24 BaseRepresenter, | |
25 SafeRepresenter, | |
26 Representer, | |
27 RoundTripRepresenter, | |
28 ) | |
29 from ruamel.yaml.constructor import ( | |
30 BaseConstructor, | |
31 SafeConstructor, | |
32 Constructor, | |
33 RoundTripConstructor, | |
34 ) | |
35 from ruamel.yaml.loader import Loader as UnsafeLoader | |
36 | |
37 if False: # MYPY | |
38 from typing import List, Set, Dict, Union, Any, Callable # NOQA | |
39 from ruamel.yaml.compat import StreamType, StreamTextType, VersionType # NOQA | |
40 | |
41 if PY3: | |
42 from pathlib import Path | |
43 else: | |
44 Path = Any | |
45 | |
46 try: | |
47 from _ruamel_yaml import CParser, CEmitter # type: ignore | |
48 except: # NOQA | |
49 CParser = CEmitter = None | |
50 | |
51 # import io | |
52 | |
53 enforce = object() | |
54 | |
55 | |
56 # YAML is an acronym, i.e. spoken: rhymes with "camel". And thus a | |
57 # subset of abbreviations, which should be all caps according to PEP8 | |
58 | |
59 | |
60 class YAML(object): | |
61 def __init__( | |
62 self, _kw=enforce, typ=None, pure=False, output=None, plug_ins=None # input=None, | |
63 ): | |
64 # type: (Any, Optional[Text], Any, Any, Any) -> None | |
65 """ | |
66 _kw: not used, forces keyword arguments in 2.7 (in 3 you can do (*, safe_load=..) | |
67 typ: 'rt'/None -> RoundTripLoader/RoundTripDumper, (default) | |
68 'safe' -> SafeLoader/SafeDumper, | |
69 'unsafe' -> normal/unsafe Loader/Dumper | |
70 'base' -> baseloader | |
71 pure: if True only use Python modules | |
72 input/output: needed to work as context manager | |
73 plug_ins: a list of plug-in files | |
74 """ | |
75 if _kw is not enforce: | |
76 raise TypeError( | |
77 '{}.__init__() takes no positional argument but at least ' | |
78 'one was given ({!r})'.format(self.__class__.__name__, _kw) | |
79 ) | |
80 | |
81 self.typ = 'rt' if typ is None else typ | |
82 self.pure = pure | |
83 | |
84 # self._input = input | |
85 self._output = output | |
86 self._context_manager = None # type: Any | |
87 | |
88 self.plug_ins = [] # type: List[Any] | |
89 for pu in ([] if plug_ins is None else plug_ins) + self.official_plug_ins(): | |
90 file_name = pu.replace(os.sep, '.') | |
91 self.plug_ins.append(import_module(file_name)) | |
92 self.Resolver = ruamel.yaml.resolver.VersionedResolver # type: Any | |
93 self.allow_unicode = True | |
94 self.Reader = None # type: Any | |
95 self.Scanner = None # type: Any | |
96 self.Serializer = None # type: Any | |
97 self.default_flow_style = None # type: Any | |
98 if self.typ == 'rt': | |
99 self.default_flow_style = False | |
100 # no optimized rt-dumper yet | |
101 self.Emitter = ruamel.yaml.emitter.Emitter # type: Any | |
102 self.Serializer = ruamel.yaml.serializer.Serializer # type: Any | |
103 self.Representer = ruamel.yaml.representer.RoundTripRepresenter # type: Any | |
104 self.Scanner = ruamel.yaml.scanner.RoundTripScanner # type: Any | |
105 # no optimized rt-parser yet | |
106 self.Parser = ruamel.yaml.parser.RoundTripParser # type: Any | |
107 self.Composer = ruamel.yaml.composer.Composer # type: Any | |
108 self.Constructor = ruamel.yaml.constructor.RoundTripConstructor # type: Any | |
109 elif self.typ == 'safe': | |
110 self.Emitter = ( | |
111 ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter | |
112 ) | |
113 self.Representer = ruamel.yaml.representer.SafeRepresenter | |
114 self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser | |
115 self.Composer = ruamel.yaml.composer.Composer | |
116 self.Constructor = ruamel.yaml.constructor.SafeConstructor | |
117 elif self.typ == 'base': | |
118 self.Emitter = ruamel.yaml.emitter.Emitter | |
119 self.Representer = ruamel.yaml.representer.BaseRepresenter | |
120 self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser | |
121 self.Composer = ruamel.yaml.composer.Composer | |
122 self.Constructor = ruamel.yaml.constructor.BaseConstructor | |
123 elif self.typ == 'unsafe': | |
124 self.Emitter = ( | |
125 ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter | |
126 ) | |
127 self.Representer = ruamel.yaml.representer.Representer | |
128 self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser | |
129 self.Composer = ruamel.yaml.composer.Composer | |
130 self.Constructor = ruamel.yaml.constructor.Constructor | |
131 else: | |
132 for module in self.plug_ins: | |
133 if getattr(module, 'typ', None) == self.typ: | |
134 module.init_typ(self) | |
135 break | |
136 else: | |
137 raise NotImplementedError( | |
138 'typ "{}"not recognised (need to install plug-in?)'.format(self.typ) | |
139 ) | |
140 self.stream = None | |
141 self.canonical = None | |
142 self.old_indent = None | |
143 self.width = None | |
144 self.line_break = None | |
145 | |
146 self.map_indent = None | |
147 self.sequence_indent = None | |
148 self.sequence_dash_offset = 0 | |
149 self.compact_seq_seq = None | |
150 self.compact_seq_map = None | |
151 self.sort_base_mapping_type_on_output = None # default: sort | |
152 | |
153 self.top_level_colon_align = None | |
154 self.prefix_colon = None | |
155 self.version = None | |
156 self.preserve_quotes = None | |
157 self.allow_duplicate_keys = False # duplicate keys in map, set | |
158 self.encoding = 'utf-8' | |
159 self.explicit_start = None | |
160 self.explicit_end = None | |
161 self.tags = None | |
162 self.default_style = None | |
163 self.top_level_block_style_scalar_no_indent_error_1_1 = False | |
164 # [a, b: 1, c: {d: 2}] vs. [a, {b: 1}, {c: {d: 2}}] | |
165 self.brace_single_entry_mapping_in_flow_sequence = False | |
166 | |
167 @property | |
168 def reader(self): | |
169 # type: () -> Any | |
170 try: | |
171 return self._reader # type: ignore | |
172 except AttributeError: | |
173 self._reader = self.Reader(None, loader=self) | |
174 return self._reader | |
175 | |
176 @property | |
177 def scanner(self): | |
178 # type: () -> Any | |
179 try: | |
180 return self._scanner # type: ignore | |
181 except AttributeError: | |
182 self._scanner = self.Scanner(loader=self) | |
183 return self._scanner | |
184 | |
185 @property | |
186 def parser(self): | |
187 # type: () -> Any | |
188 attr = '_' + sys._getframe().f_code.co_name | |
189 if not hasattr(self, attr): | |
190 if self.Parser is not CParser: | |
191 setattr(self, attr, self.Parser(loader=self)) | |
192 else: | |
193 if getattr(self, '_stream', None) is None: | |
194 # wait for the stream | |
195 return None | |
196 else: | |
197 # if not hasattr(self._stream, 'read') and hasattr(self._stream, 'open'): | |
198 # # pathlib.Path() instance | |
199 # setattr(self, attr, CParser(self._stream)) | |
200 # else: | |
201 setattr(self, attr, CParser(self._stream)) | |
202 # self._parser = self._composer = self | |
203 # nprint('scanner', self.loader.scanner) | |
204 | |
205 return getattr(self, attr) | |
206 | |
207 @property | |
208 def composer(self): | |
209 # type: () -> Any | |
210 attr = '_' + sys._getframe().f_code.co_name | |
211 if not hasattr(self, attr): | |
212 setattr(self, attr, self.Composer(loader=self)) | |
213 return getattr(self, attr) | |
214 | |
215 @property | |
216 def constructor(self): | |
217 # type: () -> Any | |
218 attr = '_' + sys._getframe().f_code.co_name | |
219 if not hasattr(self, attr): | |
220 cnst = self.Constructor(preserve_quotes=self.preserve_quotes, loader=self) | |
221 cnst.allow_duplicate_keys = self.allow_duplicate_keys | |
222 setattr(self, attr, cnst) | |
223 return getattr(self, attr) | |
224 | |
225 @property | |
226 def resolver(self): | |
227 # type: () -> Any | |
228 attr = '_' + sys._getframe().f_code.co_name | |
229 if not hasattr(self, attr): | |
230 setattr(self, attr, self.Resolver(version=self.version, loader=self)) | |
231 return getattr(self, attr) | |
232 | |
233 @property | |
234 def emitter(self): | |
235 # type: () -> Any | |
236 attr = '_' + sys._getframe().f_code.co_name | |
237 if not hasattr(self, attr): | |
238 if self.Emitter is not CEmitter: | |
239 _emitter = self.Emitter( | |
240 None, | |
241 canonical=self.canonical, | |
242 indent=self.old_indent, | |
243 width=self.width, | |
244 allow_unicode=self.allow_unicode, | |
245 line_break=self.line_break, | |
246 prefix_colon=self.prefix_colon, | |
247 brace_single_entry_mapping_in_flow_sequence=self.brace_single_entry_mapping_in_flow_sequence, # NOQA | |
248 dumper=self, | |
249 ) | |
250 setattr(self, attr, _emitter) | |
251 if self.map_indent is not None: | |
252 _emitter.best_map_indent = self.map_indent | |
253 if self.sequence_indent is not None: | |
254 _emitter.best_sequence_indent = self.sequence_indent | |
255 if self.sequence_dash_offset is not None: | |
256 _emitter.sequence_dash_offset = self.sequence_dash_offset | |
257 # _emitter.block_seq_indent = self.sequence_dash_offset | |
258 if self.compact_seq_seq is not None: | |
259 _emitter.compact_seq_seq = self.compact_seq_seq | |
260 if self.compact_seq_map is not None: | |
261 _emitter.compact_seq_map = self.compact_seq_map | |
262 else: | |
263 if getattr(self, '_stream', None) is None: | |
264 # wait for the stream | |
265 return None | |
266 return None | |
267 return getattr(self, attr) | |
268 | |
269 @property | |
270 def serializer(self): | |
271 # type: () -> Any | |
272 attr = '_' + sys._getframe().f_code.co_name | |
273 if not hasattr(self, attr): | |
274 setattr( | |
275 self, | |
276 attr, | |
277 self.Serializer( | |
278 encoding=self.encoding, | |
279 explicit_start=self.explicit_start, | |
280 explicit_end=self.explicit_end, | |
281 version=self.version, | |
282 tags=self.tags, | |
283 dumper=self, | |
284 ), | |
285 ) | |
286 return getattr(self, attr) | |
287 | |
288 @property | |
289 def representer(self): | |
290 # type: () -> Any | |
291 attr = '_' + sys._getframe().f_code.co_name | |
292 if not hasattr(self, attr): | |
293 repres = self.Representer( | |
294 default_style=self.default_style, | |
295 default_flow_style=self.default_flow_style, | |
296 dumper=self, | |
297 ) | |
298 if self.sort_base_mapping_type_on_output is not None: | |
299 repres.sort_base_mapping_type_on_output = self.sort_base_mapping_type_on_output | |
300 setattr(self, attr, repres) | |
301 return getattr(self, attr) | |
302 | |
303 # separate output resolver? | |
304 | |
305 # def load(self, stream=None): | |
306 # if self._context_manager: | |
307 # if not self._input: | |
308 # raise TypeError("Missing input stream while dumping from context manager") | |
309 # for data in self._context_manager.load(): | |
310 # yield data | |
311 # return | |
312 # if stream is None: | |
313 # raise TypeError("Need a stream argument when not loading from context manager") | |
314 # return self.load_one(stream) | |
315 | |
316 def load(self, stream): | |
317 # type: (Union[Path, StreamTextType]) -> Any | |
318 """ | |
319 at this point you either have the non-pure Parser (which has its own reader and | |
320 scanner) or you have the pure Parser. | |
321 If the pure Parser is set, then set the Reader and Scanner, if not already set. | |
322 If either the Scanner or Reader are set, you cannot use the non-pure Parser, | |
323 so reset it to the pure parser and set the Reader resp. Scanner if necessary | |
324 """ | |
325 if not hasattr(stream, 'read') and hasattr(stream, 'open'): | |
326 # pathlib.Path() instance | |
327 with stream.open('rb') as fp: # type: ignore | |
328 return self.load(fp) | |
329 constructor, parser = self.get_constructor_parser(stream) | |
330 try: | |
331 return constructor.get_single_data() | |
332 finally: | |
333 parser.dispose() | |
334 try: | |
335 self._reader.reset_reader() | |
336 except AttributeError: | |
337 pass | |
338 try: | |
339 self._scanner.reset_scanner() | |
340 except AttributeError: | |
341 pass | |
342 | |
343 def load_all(self, stream, _kw=enforce): # , skip=None): | |
344 # type: (Union[Path, StreamTextType], Any) -> Any | |
345 if _kw is not enforce: | |
346 raise TypeError( | |
347 '{}.__init__() takes no positional argument but at least ' | |
348 'one was given ({!r})'.format(self.__class__.__name__, _kw) | |
349 ) | |
350 if not hasattr(stream, 'read') and hasattr(stream, 'open'): | |
351 # pathlib.Path() instance | |
352 with stream.open('r') as fp: # type: ignore | |
353 for d in self.load_all(fp, _kw=enforce): | |
354 yield d | |
355 return | |
356 # if skip is None: | |
357 # skip = [] | |
358 # elif isinstance(skip, int): | |
359 # skip = [skip] | |
360 constructor, parser = self.get_constructor_parser(stream) | |
361 try: | |
362 while constructor.check_data(): | |
363 yield constructor.get_data() | |
364 finally: | |
365 parser.dispose() | |
366 try: | |
367 self._reader.reset_reader() | |
368 except AttributeError: | |
369 pass | |
370 try: | |
371 self._scanner.reset_scanner() | |
372 except AttributeError: | |
373 pass | |
374 | |
375 def get_constructor_parser(self, stream): | |
376 # type: (StreamTextType) -> Any | |
377 """ | |
378 the old cyaml needs special setup, and therefore the stream | |
379 """ | |
380 if self.Parser is not CParser: | |
381 if self.Reader is None: | |
382 self.Reader = ruamel.yaml.reader.Reader | |
383 if self.Scanner is None: | |
384 self.Scanner = ruamel.yaml.scanner.Scanner | |
385 self.reader.stream = stream | |
386 else: | |
387 if self.Reader is not None: | |
388 if self.Scanner is None: | |
389 self.Scanner = ruamel.yaml.scanner.Scanner | |
390 self.Parser = ruamel.yaml.parser.Parser | |
391 self.reader.stream = stream | |
392 elif self.Scanner is not None: | |
393 if self.Reader is None: | |
394 self.Reader = ruamel.yaml.reader.Reader | |
395 self.Parser = ruamel.yaml.parser.Parser | |
396 self.reader.stream = stream | |
397 else: | |
398 # combined C level reader>scanner>parser | |
399 # does some calls to the resolver, e.g. BaseResolver.descend_resolver | |
400 # if you just initialise the CParser, to much of resolver.py | |
401 # is actually used | |
402 rslvr = self.Resolver | |
403 # if rslvr is ruamel.yaml.resolver.VersionedResolver: | |
404 # rslvr = ruamel.yaml.resolver.Resolver | |
405 | |
406 class XLoader(self.Parser, self.Constructor, rslvr): # type: ignore | |
407 def __init__(selfx, stream, version=self.version, preserve_quotes=None): | |
408 # type: (StreamTextType, Optional[VersionType], Optional[bool]) -> None # NOQA | |
409 CParser.__init__(selfx, stream) | |
410 selfx._parser = selfx._composer = selfx | |
411 self.Constructor.__init__(selfx, loader=selfx) | |
412 selfx.allow_duplicate_keys = self.allow_duplicate_keys | |
413 rslvr.__init__(selfx, version=version, loadumper=selfx) | |
414 | |
415 self._stream = stream | |
416 loader = XLoader(stream) | |
417 return loader, loader | |
418 return self.constructor, self.parser | |
419 | |
420 def dump(self, data, stream=None, _kw=enforce, transform=None): | |
421 # type: (Any, Union[Path, StreamType], Any, Any) -> Any | |
422 if self._context_manager: | |
423 if not self._output: | |
424 raise TypeError('Missing output stream while dumping from context manager') | |
425 if _kw is not enforce: | |
426 raise TypeError( | |
427 '{}.dump() takes one positional argument but at least ' | |
428 'two were given ({!r})'.format(self.__class__.__name__, _kw) | |
429 ) | |
430 if transform is not None: | |
431 raise TypeError( | |
432 '{}.dump() in the context manager cannot have transform keyword ' | |
433 ''.format(self.__class__.__name__) | |
434 ) | |
435 self._context_manager.dump(data) | |
436 else: # old style | |
437 if stream is None: | |
438 raise TypeError('Need a stream argument when not dumping from context manager') | |
439 return self.dump_all([data], stream, _kw, transform=transform) | |
440 | |
441 def dump_all(self, documents, stream, _kw=enforce, transform=None): | |
442 # type: (Any, Union[Path, StreamType], Any, Any) -> Any | |
443 if self._context_manager: | |
444 raise NotImplementedError | |
445 if _kw is not enforce: | |
446 raise TypeError( | |
447 '{}.dump(_all) takes two positional argument but at least ' | |
448 'three were given ({!r})'.format(self.__class__.__name__, _kw) | |
449 ) | |
450 self._output = stream | |
451 self._context_manager = YAMLContextManager(self, transform=transform) | |
452 for data in documents: | |
453 self._context_manager.dump(data) | |
454 self._context_manager.teardown_output() | |
455 self._output = None | |
456 self._context_manager = None | |
457 | |
458 def Xdump_all(self, documents, stream, _kw=enforce, transform=None): | |
459 # type: (Any, Union[Path, StreamType], Any, Any) -> Any | |
460 """ | |
461 Serialize a sequence of Python objects into a YAML stream. | |
462 """ | |
463 if not hasattr(stream, 'write') and hasattr(stream, 'open'): | |
464 # pathlib.Path() instance | |
465 with stream.open('w') as fp: | |
466 return self.dump_all(documents, fp, _kw, transform=transform) | |
467 if _kw is not enforce: | |
468 raise TypeError( | |
469 '{}.dump(_all) takes two positional argument but at least ' | |
470 'three were given ({!r})'.format(self.__class__.__name__, _kw) | |
471 ) | |
472 # The stream should have the methods `write` and possibly `flush`. | |
473 if self.top_level_colon_align is True: | |
474 tlca = max([len(str(x)) for x in documents[0]]) # type: Any | |
475 else: | |
476 tlca = self.top_level_colon_align | |
477 if transform is not None: | |
478 fstream = stream | |
479 if self.encoding is None: | |
480 stream = StringIO() | |
481 else: | |
482 stream = BytesIO() | |
483 serializer, representer, emitter = self.get_serializer_representer_emitter( | |
484 stream, tlca | |
485 ) | |
486 try: | |
487 self.serializer.open() | |
488 for data in documents: | |
489 try: | |
490 self.representer.represent(data) | |
491 except AttributeError: | |
492 # nprint(dir(dumper._representer)) | |
493 raise | |
494 self.serializer.close() | |
495 finally: | |
496 try: | |
497 self.emitter.dispose() | |
498 except AttributeError: | |
499 raise | |
500 # self.dumper.dispose() # cyaml | |
501 delattr(self, '_serializer') | |
502 delattr(self, '_emitter') | |
503 if transform: | |
504 val = stream.getvalue() | |
505 if self.encoding: | |
506 val = val.decode(self.encoding) | |
507 if fstream is None: | |
508 transform(val) | |
509 else: | |
510 fstream.write(transform(val)) | |
511 return None | |
512 | |
513 def get_serializer_representer_emitter(self, stream, tlca): | |
514 # type: (StreamType, Any) -> Any | |
515 # we have only .Serializer to deal with (vs .Reader & .Scanner), much simpler | |
516 if self.Emitter is not CEmitter: | |
517 if self.Serializer is None: | |
518 self.Serializer = ruamel.yaml.serializer.Serializer | |
519 self.emitter.stream = stream | |
520 self.emitter.top_level_colon_align = tlca | |
521 return self.serializer, self.representer, self.emitter | |
522 if self.Serializer is not None: | |
523 # cannot set serializer with CEmitter | |
524 self.Emitter = ruamel.yaml.emitter.Emitter | |
525 self.emitter.stream = stream | |
526 self.emitter.top_level_colon_align = tlca | |
527 return self.serializer, self.representer, self.emitter | |
528 # C routines | |
529 | |
530 rslvr = ( | |
531 ruamel.yaml.resolver.BaseResolver | |
532 if self.typ == 'base' | |
533 else ruamel.yaml.resolver.Resolver | |
534 ) | |
535 | |
536 class XDumper(CEmitter, self.Representer, rslvr): # type: ignore | |
537 def __init__( | |
538 selfx, | |
539 stream, | |
540 default_style=None, | |
541 default_flow_style=None, | |
542 canonical=None, | |
543 indent=None, | |
544 width=None, | |
545 allow_unicode=None, | |
546 line_break=None, | |
547 encoding=None, | |
548 explicit_start=None, | |
549 explicit_end=None, | |
550 version=None, | |
551 tags=None, | |
552 block_seq_indent=None, | |
553 top_level_colon_align=None, | |
554 prefix_colon=None, | |
555 ): | |
556 # type: (StreamType, Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Any, Any, Any, Any, Any) -> None # NOQA | |
557 CEmitter.__init__( | |
558 selfx, | |
559 stream, | |
560 canonical=canonical, | |
561 indent=indent, | |
562 width=width, | |
563 encoding=encoding, | |
564 allow_unicode=allow_unicode, | |
565 line_break=line_break, | |
566 explicit_start=explicit_start, | |
567 explicit_end=explicit_end, | |
568 version=version, | |
569 tags=tags, | |
570 ) | |
571 selfx._emitter = selfx._serializer = selfx._representer = selfx | |
572 self.Representer.__init__( | |
573 selfx, default_style=default_style, default_flow_style=default_flow_style | |
574 ) | |
575 rslvr.__init__(selfx) | |
576 | |
577 self._stream = stream | |
578 dumper = XDumper( | |
579 stream, | |
580 default_style=self.default_style, | |
581 default_flow_style=self.default_flow_style, | |
582 canonical=self.canonical, | |
583 indent=self.old_indent, | |
584 width=self.width, | |
585 allow_unicode=self.allow_unicode, | |
586 line_break=self.line_break, | |
587 explicit_start=self.explicit_start, | |
588 explicit_end=self.explicit_end, | |
589 version=self.version, | |
590 tags=self.tags, | |
591 ) | |
592 self._emitter = self._serializer = dumper | |
593 return dumper, dumper, dumper | |
594 | |
595 # basic types | |
596 def map(self, **kw): | |
597 # type: (Any) -> Any | |
598 if self.typ == 'rt': | |
599 from ruamel.yaml.comments import CommentedMap | |
600 | |
601 return CommentedMap(**kw) | |
602 else: | |
603 return dict(**kw) | |
604 | |
605 def seq(self, *args): | |
606 # type: (Any) -> Any | |
607 if self.typ == 'rt': | |
608 from ruamel.yaml.comments import CommentedSeq | |
609 | |
610 return CommentedSeq(*args) | |
611 else: | |
612 return list(*args) | |
613 | |
614 # helpers | |
615 def official_plug_ins(self): | |
616 # type: () -> Any | |
617 bd = os.path.dirname(__file__) | |
618 gpbd = os.path.dirname(os.path.dirname(bd)) | |
619 res = [x.replace(gpbd, "")[1:-3] for x in glob.glob(bd + '/*/__plug_in__.py')] | |
620 return res | |
621 | |
622 def register_class(self, cls): | |
623 # type:(Any) -> Any | |
624 """ | |
625 register a class for dumping loading | |
626 - if it has attribute yaml_tag use that to register, else use class name | |
627 - if it has methods to_yaml/from_yaml use those to dump/load else dump attributes | |
628 as mapping | |
629 """ | |
630 tag = getattr(cls, 'yaml_tag', '!' + cls.__name__) | |
631 try: | |
632 self.representer.add_representer(cls, cls.to_yaml) | |
633 except AttributeError: | |
634 | |
635 def t_y(representer, data): | |
636 # type: (Any, Any) -> Any | |
637 return representer.represent_yaml_object( | |
638 tag, data, cls, flow_style=representer.default_flow_style | |
639 ) | |
640 | |
641 self.representer.add_representer(cls, t_y) | |
642 try: | |
643 self.constructor.add_constructor(tag, cls.from_yaml) | |
644 except AttributeError: | |
645 | |
646 def f_y(constructor, node): | |
647 # type: (Any, Any) -> Any | |
648 return constructor.construct_yaml_object(node, cls) | |
649 | |
650 self.constructor.add_constructor(tag, f_y) | |
651 return cls | |
652 | |
653 def parse(self, stream): | |
654 # type: (StreamTextType) -> Any | |
655 """ | |
656 Parse a YAML stream and produce parsing events. | |
657 """ | |
658 _, parser = self.get_constructor_parser(stream) | |
659 try: | |
660 while parser.check_event(): | |
661 yield parser.get_event() | |
662 finally: | |
663 parser.dispose() | |
664 try: | |
665 self._reader.reset_reader() | |
666 except AttributeError: | |
667 pass | |
668 try: | |
669 self._scanner.reset_scanner() | |
670 except AttributeError: | |
671 pass | |
672 | |
673 # ### context manager | |
674 | |
675 def __enter__(self): | |
676 # type: () -> Any | |
677 self._context_manager = YAMLContextManager(self) | |
678 return self | |
679 | |
680 def __exit__(self, typ, value, traceback): | |
681 # type: (Any, Any, Any) -> None | |
682 if typ: | |
683 nprint('typ', typ) | |
684 self._context_manager.teardown_output() | |
685 # self._context_manager.teardown_input() | |
686 self._context_manager = None | |
687 | |
688 # ### backwards compatibility | |
689 def _indent(self, mapping=None, sequence=None, offset=None): | |
690 # type: (Any, Any, Any) -> None | |
691 if mapping is not None: | |
692 self.map_indent = mapping | |
693 if sequence is not None: | |
694 self.sequence_indent = sequence | |
695 if offset is not None: | |
696 self.sequence_dash_offset = offset | |
697 | |
698 @property | |
699 def indent(self): | |
700 # type: () -> Any | |
701 return self._indent | |
702 | |
703 @indent.setter | |
704 def indent(self, val): | |
705 # type: (Any) -> None | |
706 self.old_indent = val | |
707 | |
708 @property | |
709 def block_seq_indent(self): | |
710 # type: () -> Any | |
711 return self.sequence_dash_offset | |
712 | |
713 @block_seq_indent.setter | |
714 def block_seq_indent(self, val): | |
715 # type: (Any) -> None | |
716 self.sequence_dash_offset = val | |
717 | |
718 def compact(self, seq_seq=None, seq_map=None): | |
719 # type: (Any, Any) -> None | |
720 self.compact_seq_seq = seq_seq | |
721 self.compact_seq_map = seq_map | |
722 | |
723 | |
724 class YAMLContextManager(object): | |
725 def __init__(self, yaml, transform=None): | |
726 # type: (Any, Any) -> None # used to be: (Any, Optional[Callable]) -> None | |
727 self._yaml = yaml | |
728 self._output_inited = False | |
729 self._output_path = None | |
730 self._output = self._yaml._output | |
731 self._transform = transform | |
732 | |
733 # self._input_inited = False | |
734 # self._input = input | |
735 # self._input_path = None | |
736 # self._transform = yaml.transform | |
737 # self._fstream = None | |
738 | |
739 if not hasattr(self._output, 'write') and hasattr(self._output, 'open'): | |
740 # pathlib.Path() instance, open with the same mode | |
741 self._output_path = self._output | |
742 self._output = self._output_path.open('w') | |
743 | |
744 # if not hasattr(self._stream, 'write') and hasattr(stream, 'open'): | |
745 # if not hasattr(self._input, 'read') and hasattr(self._input, 'open'): | |
746 # # pathlib.Path() instance, open with the same mode | |
747 # self._input_path = self._input | |
748 # self._input = self._input_path.open('r') | |
749 | |
750 if self._transform is not None: | |
751 self._fstream = self._output | |
752 if self._yaml.encoding is None: | |
753 self._output = StringIO() | |
754 else: | |
755 self._output = BytesIO() | |
756 | |
757 def teardown_output(self): | |
758 # type: () -> None | |
759 if self._output_inited: | |
760 self._yaml.serializer.close() | |
761 else: | |
762 return | |
763 try: | |
764 self._yaml.emitter.dispose() | |
765 except AttributeError: | |
766 raise | |
767 # self.dumper.dispose() # cyaml | |
768 try: | |
769 delattr(self._yaml, '_serializer') | |
770 delattr(self._yaml, '_emitter') | |
771 except AttributeError: | |
772 raise | |
773 if self._transform: | |
774 val = self._output.getvalue() | |
775 if self._yaml.encoding: | |
776 val = val.decode(self._yaml.encoding) | |
777 if self._fstream is None: | |
778 self._transform(val) | |
779 else: | |
780 self._fstream.write(self._transform(val)) | |
781 self._fstream.flush() | |
782 self._output = self._fstream # maybe not necessary | |
783 if self._output_path is not None: | |
784 self._output.close() | |
785 | |
786 def init_output(self, first_data): | |
787 # type: (Any) -> None | |
788 if self._yaml.top_level_colon_align is True: | |
789 tlca = max([len(str(x)) for x in first_data]) # type: Any | |
790 else: | |
791 tlca = self._yaml.top_level_colon_align | |
792 self._yaml.get_serializer_representer_emitter(self._output, tlca) | |
793 self._yaml.serializer.open() | |
794 self._output_inited = True | |
795 | |
796 def dump(self, data): | |
797 # type: (Any) -> None | |
798 if not self._output_inited: | |
799 self.init_output(data) | |
800 try: | |
801 self._yaml.representer.represent(data) | |
802 except AttributeError: | |
803 # nprint(dir(dumper._representer)) | |
804 raise | |
805 | |
806 # def teardown_input(self): | |
807 # pass | |
808 # | |
809 # def init_input(self): | |
810 # # set the constructor and parser on YAML() instance | |
811 # self._yaml.get_constructor_parser(stream) | |
812 # | |
813 # def load(self): | |
814 # if not self._input_inited: | |
815 # self.init_input() | |
816 # try: | |
817 # while self._yaml.constructor.check_data(): | |
818 # yield self._yaml.constructor.get_data() | |
819 # finally: | |
820 # parser.dispose() | |
821 # try: | |
822 # self._reader.reset_reader() # type: ignore | |
823 # except AttributeError: | |
824 # pass | |
825 # try: | |
826 # self._scanner.reset_scanner() # type: ignore | |
827 # except AttributeError: | |
828 # pass | |
829 | |
830 | |
831 def yaml_object(yml): | |
832 # type: (Any) -> Any | |
833 """ decorator for classes that needs to dump/load objects | |
834 The tag for such objects is taken from the class attribute yaml_tag (or the | |
835 class name in lowercase in case unavailable) | |
836 If methods to_yaml and/or from_yaml are available, these are called for dumping resp. | |
837 loading, default routines (dumping a mapping of the attributes) used otherwise. | |
838 """ | |
839 | |
840 def yo_deco(cls): | |
841 # type: (Any) -> Any | |
842 tag = getattr(cls, 'yaml_tag', '!' + cls.__name__) | |
843 try: | |
844 yml.representer.add_representer(cls, cls.to_yaml) | |
845 except AttributeError: | |
846 | |
847 def t_y(representer, data): | |
848 # type: (Any, Any) -> Any | |
849 return representer.represent_yaml_object( | |
850 tag, data, cls, flow_style=representer.default_flow_style | |
851 ) | |
852 | |
853 yml.representer.add_representer(cls, t_y) | |
854 try: | |
855 yml.constructor.add_constructor(tag, cls.from_yaml) | |
856 except AttributeError: | |
857 | |
858 def f_y(constructor, node): | |
859 # type: (Any, Any) -> Any | |
860 return constructor.construct_yaml_object(node, cls) | |
861 | |
862 yml.constructor.add_constructor(tag, f_y) | |
863 return cls | |
864 | |
865 return yo_deco | |
866 | |
867 | |
868 ######################################################################################## | |
869 | |
870 | |
871 def scan(stream, Loader=Loader): | |
872 # type: (StreamTextType, Any) -> Any | |
873 """ | |
874 Scan a YAML stream and produce scanning tokens. | |
875 """ | |
876 loader = Loader(stream) | |
877 try: | |
878 while loader.scanner.check_token(): | |
879 yield loader.scanner.get_token() | |
880 finally: | |
881 loader._parser.dispose() | |
882 | |
883 | |
884 def parse(stream, Loader=Loader): | |
885 # type: (StreamTextType, Any) -> Any | |
886 """ | |
887 Parse a YAML stream and produce parsing events. | |
888 """ | |
889 loader = Loader(stream) | |
890 try: | |
891 while loader._parser.check_event(): | |
892 yield loader._parser.get_event() | |
893 finally: | |
894 loader._parser.dispose() | |
895 | |
896 | |
897 def compose(stream, Loader=Loader): | |
898 # type: (StreamTextType, Any) -> Any | |
899 """ | |
900 Parse the first YAML document in a stream | |
901 and produce the corresponding representation tree. | |
902 """ | |
903 loader = Loader(stream) | |
904 try: | |
905 return loader.get_single_node() | |
906 finally: | |
907 loader.dispose() | |
908 | |
909 | |
910 def compose_all(stream, Loader=Loader): | |
911 # type: (StreamTextType, Any) -> Any | |
912 """ | |
913 Parse all YAML documents in a stream | |
914 and produce corresponding representation trees. | |
915 """ | |
916 loader = Loader(stream) | |
917 try: | |
918 while loader.check_node(): | |
919 yield loader._composer.get_node() | |
920 finally: | |
921 loader._parser.dispose() | |
922 | |
923 | |
924 def load(stream, Loader=None, version=None, preserve_quotes=None): | |
925 # type: (StreamTextType, Any, Optional[VersionType], Any) -> Any | |
926 """ | |
927 Parse the first YAML document in a stream | |
928 and produce the corresponding Python object. | |
929 """ | |
930 if Loader is None: | |
931 warnings.warn(UnsafeLoaderWarning.text, UnsafeLoaderWarning, stacklevel=2) | |
932 Loader = UnsafeLoader | |
933 loader = Loader(stream, version, preserve_quotes=preserve_quotes) | |
934 try: | |
935 return loader._constructor.get_single_data() | |
936 finally: | |
937 loader._parser.dispose() | |
938 try: | |
939 loader._reader.reset_reader() | |
940 except AttributeError: | |
941 pass | |
942 try: | |
943 loader._scanner.reset_scanner() | |
944 except AttributeError: | |
945 pass | |
946 | |
947 | |
948 def load_all(stream, Loader=None, version=None, preserve_quotes=None): | |
949 # type: (Optional[StreamTextType], Any, Optional[VersionType], Optional[bool]) -> Any # NOQA | |
950 """ | |
951 Parse all YAML documents in a stream | |
952 and produce corresponding Python objects. | |
953 """ | |
954 if Loader is None: | |
955 warnings.warn(UnsafeLoaderWarning.text, UnsafeLoaderWarning, stacklevel=2) | |
956 Loader = UnsafeLoader | |
957 loader = Loader(stream, version, preserve_quotes=preserve_quotes) | |
958 try: | |
959 while loader._constructor.check_data(): | |
960 yield loader._constructor.get_data() | |
961 finally: | |
962 loader._parser.dispose() | |
963 try: | |
964 loader._reader.reset_reader() | |
965 except AttributeError: | |
966 pass | |
967 try: | |
968 loader._scanner.reset_scanner() | |
969 except AttributeError: | |
970 pass | |
971 | |
972 | |
973 def safe_load(stream, version=None): | |
974 # type: (StreamTextType, Optional[VersionType]) -> Any | |
975 """ | |
976 Parse the first YAML document in a stream | |
977 and produce the corresponding Python object. | |
978 Resolve only basic YAML tags. | |
979 """ | |
980 return load(stream, SafeLoader, version) | |
981 | |
982 | |
983 def safe_load_all(stream, version=None): | |
984 # type: (StreamTextType, Optional[VersionType]) -> Any | |
985 """ | |
986 Parse all YAML documents in a stream | |
987 and produce corresponding Python objects. | |
988 Resolve only basic YAML tags. | |
989 """ | |
990 return load_all(stream, SafeLoader, version) | |
991 | |
992 | |
993 def round_trip_load(stream, version=None, preserve_quotes=None): | |
994 # type: (StreamTextType, Optional[VersionType], Optional[bool]) -> Any | |
995 """ | |
996 Parse the first YAML document in a stream | |
997 and produce the corresponding Python object. | |
998 Resolve only basic YAML tags. | |
999 """ | |
1000 return load(stream, RoundTripLoader, version, preserve_quotes=preserve_quotes) | |
1001 | |
1002 | |
1003 def round_trip_load_all(stream, version=None, preserve_quotes=None): | |
1004 # type: (StreamTextType, Optional[VersionType], Optional[bool]) -> Any | |
1005 """ | |
1006 Parse all YAML documents in a stream | |
1007 and produce corresponding Python objects. | |
1008 Resolve only basic YAML tags. | |
1009 """ | |
1010 return load_all(stream, RoundTripLoader, version, preserve_quotes=preserve_quotes) | |
1011 | |
1012 | |
1013 def emit( | |
1014 events, | |
1015 stream=None, | |
1016 Dumper=Dumper, | |
1017 canonical=None, | |
1018 indent=None, | |
1019 width=None, | |
1020 allow_unicode=None, | |
1021 line_break=None, | |
1022 ): | |
1023 # type: (Any, Optional[StreamType], Any, Optional[bool], Union[int, None], Optional[int], Optional[bool], Any) -> Any # NOQA | |
1024 """ | |
1025 Emit YAML parsing events into a stream. | |
1026 If stream is None, return the produced string instead. | |
1027 """ | |
1028 getvalue = None | |
1029 if stream is None: | |
1030 stream = StringIO() | |
1031 getvalue = stream.getvalue | |
1032 dumper = Dumper( | |
1033 stream, | |
1034 canonical=canonical, | |
1035 indent=indent, | |
1036 width=width, | |
1037 allow_unicode=allow_unicode, | |
1038 line_break=line_break, | |
1039 ) | |
1040 try: | |
1041 for event in events: | |
1042 dumper.emit(event) | |
1043 finally: | |
1044 try: | |
1045 dumper._emitter.dispose() | |
1046 except AttributeError: | |
1047 raise | |
1048 dumper.dispose() # cyaml | |
1049 if getvalue is not None: | |
1050 return getvalue() | |
1051 | |
1052 | |
1053 enc = None if PY3 else 'utf-8' | |
1054 | |
1055 | |
1056 def serialize_all( | |
1057 nodes, | |
1058 stream=None, | |
1059 Dumper=Dumper, | |
1060 canonical=None, | |
1061 indent=None, | |
1062 width=None, | |
1063 allow_unicode=None, | |
1064 line_break=None, | |
1065 encoding=enc, | |
1066 explicit_start=None, | |
1067 explicit_end=None, | |
1068 version=None, | |
1069 tags=None, | |
1070 ): | |
1071 # type: (Any, Optional[StreamType], Any, Any, Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Optional[VersionType], Any) -> Any # NOQA | |
1072 """ | |
1073 Serialize a sequence of representation trees into a YAML stream. | |
1074 If stream is None, return the produced string instead. | |
1075 """ | |
1076 getvalue = None | |
1077 if stream is None: | |
1078 if encoding is None: | |
1079 stream = StringIO() | |
1080 else: | |
1081 stream = BytesIO() | |
1082 getvalue = stream.getvalue | |
1083 dumper = Dumper( | |
1084 stream, | |
1085 canonical=canonical, | |
1086 indent=indent, | |
1087 width=width, | |
1088 allow_unicode=allow_unicode, | |
1089 line_break=line_break, | |
1090 encoding=encoding, | |
1091 version=version, | |
1092 tags=tags, | |
1093 explicit_start=explicit_start, | |
1094 explicit_end=explicit_end, | |
1095 ) | |
1096 try: | |
1097 dumper._serializer.open() | |
1098 for node in nodes: | |
1099 dumper.serialize(node) | |
1100 dumper._serializer.close() | |
1101 finally: | |
1102 try: | |
1103 dumper._emitter.dispose() | |
1104 except AttributeError: | |
1105 raise | |
1106 dumper.dispose() # cyaml | |
1107 if getvalue is not None: | |
1108 return getvalue() | |
1109 | |
1110 | |
1111 def serialize(node, stream=None, Dumper=Dumper, **kwds): | |
1112 # type: (Any, Optional[StreamType], Any, Any) -> Any | |
1113 """ | |
1114 Serialize a representation tree into a YAML stream. | |
1115 If stream is None, return the produced string instead. | |
1116 """ | |
1117 return serialize_all([node], stream, Dumper=Dumper, **kwds) | |
1118 | |
1119 | |
1120 def dump_all( | |
1121 documents, | |
1122 stream=None, | |
1123 Dumper=Dumper, | |
1124 default_style=None, | |
1125 default_flow_style=None, | |
1126 canonical=None, | |
1127 indent=None, | |
1128 width=None, | |
1129 allow_unicode=None, | |
1130 line_break=None, | |
1131 encoding=enc, | |
1132 explicit_start=None, | |
1133 explicit_end=None, | |
1134 version=None, | |
1135 tags=None, | |
1136 block_seq_indent=None, | |
1137 top_level_colon_align=None, | |
1138 prefix_colon=None, | |
1139 ): | |
1140 # type: (Any, Optional[StreamType], Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Any, Any, Any, Any, Any) -> Optional[str] # NOQA | |
1141 """ | |
1142 Serialize a sequence of Python objects into a YAML stream. | |
1143 If stream is None, return the produced string instead. | |
1144 """ | |
1145 getvalue = None | |
1146 if top_level_colon_align is True: | |
1147 top_level_colon_align = max([len(str(x)) for x in documents[0]]) | |
1148 if stream is None: | |
1149 if encoding is None: | |
1150 stream = StringIO() | |
1151 else: | |
1152 stream = BytesIO() | |
1153 getvalue = stream.getvalue | |
1154 dumper = Dumper( | |
1155 stream, | |
1156 default_style=default_style, | |
1157 default_flow_style=default_flow_style, | |
1158 canonical=canonical, | |
1159 indent=indent, | |
1160 width=width, | |
1161 allow_unicode=allow_unicode, | |
1162 line_break=line_break, | |
1163 encoding=encoding, | |
1164 explicit_start=explicit_start, | |
1165 explicit_end=explicit_end, | |
1166 version=version, | |
1167 tags=tags, | |
1168 block_seq_indent=block_seq_indent, | |
1169 top_level_colon_align=top_level_colon_align, | |
1170 prefix_colon=prefix_colon, | |
1171 ) | |
1172 try: | |
1173 dumper._serializer.open() | |
1174 for data in documents: | |
1175 try: | |
1176 dumper._representer.represent(data) | |
1177 except AttributeError: | |
1178 # nprint(dir(dumper._representer)) | |
1179 raise | |
1180 dumper._serializer.close() | |
1181 finally: | |
1182 try: | |
1183 dumper._emitter.dispose() | |
1184 except AttributeError: | |
1185 raise | |
1186 dumper.dispose() # cyaml | |
1187 if getvalue is not None: | |
1188 return getvalue() | |
1189 return None | |
1190 | |
1191 | |
1192 def dump( | |
1193 data, | |
1194 stream=None, | |
1195 Dumper=Dumper, | |
1196 default_style=None, | |
1197 default_flow_style=None, | |
1198 canonical=None, | |
1199 indent=None, | |
1200 width=None, | |
1201 allow_unicode=None, | |
1202 line_break=None, | |
1203 encoding=enc, | |
1204 explicit_start=None, | |
1205 explicit_end=None, | |
1206 version=None, | |
1207 tags=None, | |
1208 block_seq_indent=None, | |
1209 ): | |
1210 # type: (Any, Optional[StreamType], Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Optional[VersionType], Any, Any) -> Optional[str] # NOQA | |
1211 """ | |
1212 Serialize a Python object into a YAML stream. | |
1213 If stream is None, return the produced string instead. | |
1214 | |
1215 default_style ∈ None, '', '"', "'", '|', '>' | |
1216 | |
1217 """ | |
1218 return dump_all( | |
1219 [data], | |
1220 stream, | |
1221 Dumper=Dumper, | |
1222 default_style=default_style, | |
1223 default_flow_style=default_flow_style, | |
1224 canonical=canonical, | |
1225 indent=indent, | |
1226 width=width, | |
1227 allow_unicode=allow_unicode, | |
1228 line_break=line_break, | |
1229 encoding=encoding, | |
1230 explicit_start=explicit_start, | |
1231 explicit_end=explicit_end, | |
1232 version=version, | |
1233 tags=tags, | |
1234 block_seq_indent=block_seq_indent, | |
1235 ) | |
1236 | |
1237 | |
1238 def safe_dump_all(documents, stream=None, **kwds): | |
1239 # type: (Any, Optional[StreamType], Any) -> Optional[str] | |
1240 """ | |
1241 Serialize a sequence of Python objects into a YAML stream. | |
1242 Produce only basic YAML tags. | |
1243 If stream is None, return the produced string instead. | |
1244 """ | |
1245 return dump_all(documents, stream, Dumper=SafeDumper, **kwds) | |
1246 | |
1247 | |
1248 def safe_dump(data, stream=None, **kwds): | |
1249 # type: (Any, Optional[StreamType], Any) -> Optional[str] | |
1250 """ | |
1251 Serialize a Python object into a YAML stream. | |
1252 Produce only basic YAML tags. | |
1253 If stream is None, return the produced string instead. | |
1254 """ | |
1255 return dump_all([data], stream, Dumper=SafeDumper, **kwds) | |
1256 | |
1257 | |
1258 def round_trip_dump( | |
1259 data, | |
1260 stream=None, | |
1261 Dumper=RoundTripDumper, | |
1262 default_style=None, | |
1263 default_flow_style=None, | |
1264 canonical=None, | |
1265 indent=None, | |
1266 width=None, | |
1267 allow_unicode=None, | |
1268 line_break=None, | |
1269 encoding=enc, | |
1270 explicit_start=None, | |
1271 explicit_end=None, | |
1272 version=None, | |
1273 tags=None, | |
1274 block_seq_indent=None, | |
1275 top_level_colon_align=None, | |
1276 prefix_colon=None, | |
1277 ): | |
1278 # type: (Any, Optional[StreamType], Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Optional[VersionType], Any, Any, Any, Any) -> Optional[str] # NOQA | |
1279 allow_unicode = True if allow_unicode is None else allow_unicode | |
1280 return dump_all( | |
1281 [data], | |
1282 stream, | |
1283 Dumper=Dumper, | |
1284 default_style=default_style, | |
1285 default_flow_style=default_flow_style, | |
1286 canonical=canonical, | |
1287 indent=indent, | |
1288 width=width, | |
1289 allow_unicode=allow_unicode, | |
1290 line_break=line_break, | |
1291 encoding=encoding, | |
1292 explicit_start=explicit_start, | |
1293 explicit_end=explicit_end, | |
1294 version=version, | |
1295 tags=tags, | |
1296 block_seq_indent=block_seq_indent, | |
1297 top_level_colon_align=top_level_colon_align, | |
1298 prefix_colon=prefix_colon, | |
1299 ) | |
1300 | |
1301 | |
1302 # Loader/Dumper are no longer composites, to get to the associated | |
1303 # Resolver()/Representer(), etc., you need to instantiate the class | |
1304 | |
1305 | |
1306 def add_implicit_resolver( | |
1307 tag, regexp, first=None, Loader=None, Dumper=None, resolver=Resolver | |
1308 ): | |
1309 # type: (Any, Any, Any, Any, Any, Any) -> None | |
1310 """ | |
1311 Add an implicit scalar detector. | |
1312 If an implicit scalar value matches the given regexp, | |
1313 the corresponding tag is assigned to the scalar. | |
1314 first is a sequence of possible initial characters or None. | |
1315 """ | |
1316 if Loader is None and Dumper is None: | |
1317 resolver.add_implicit_resolver(tag, regexp, first) | |
1318 return | |
1319 if Loader: | |
1320 if hasattr(Loader, 'add_implicit_resolver'): | |
1321 Loader.add_implicit_resolver(tag, regexp, first) | |
1322 elif issubclass( | |
1323 Loader, (BaseLoader, SafeLoader, ruamel.yaml.loader.Loader, RoundTripLoader) | |
1324 ): | |
1325 Resolver.add_implicit_resolver(tag, regexp, first) | |
1326 else: | |
1327 raise NotImplementedError | |
1328 if Dumper: | |
1329 if hasattr(Dumper, 'add_implicit_resolver'): | |
1330 Dumper.add_implicit_resolver(tag, regexp, first) | |
1331 elif issubclass( | |
1332 Dumper, (BaseDumper, SafeDumper, ruamel.yaml.dumper.Dumper, RoundTripDumper) | |
1333 ): | |
1334 Resolver.add_implicit_resolver(tag, regexp, first) | |
1335 else: | |
1336 raise NotImplementedError | |
1337 | |
1338 | |
1339 # this code currently not tested | |
1340 def add_path_resolver(tag, path, kind=None, Loader=None, Dumper=None, resolver=Resolver): | |
1341 # type: (Any, Any, Any, Any, Any, Any) -> None | |
1342 """ | |
1343 Add a path based resolver for the given tag. | |
1344 A path is a list of keys that forms a path | |
1345 to a node in the representation tree. | |
1346 Keys can be string values, integers, or None. | |
1347 """ | |
1348 if Loader is None and Dumper is None: | |
1349 resolver.add_path_resolver(tag, path, kind) | |
1350 return | |
1351 if Loader: | |
1352 if hasattr(Loader, 'add_path_resolver'): | |
1353 Loader.add_path_resolver(tag, path, kind) | |
1354 elif issubclass( | |
1355 Loader, (BaseLoader, SafeLoader, ruamel.yaml.loader.Loader, RoundTripLoader) | |
1356 ): | |
1357 Resolver.add_path_resolver(tag, path, kind) | |
1358 else: | |
1359 raise NotImplementedError | |
1360 if Dumper: | |
1361 if hasattr(Dumper, 'add_path_resolver'): | |
1362 Dumper.add_path_resolver(tag, path, kind) | |
1363 elif issubclass( | |
1364 Dumper, (BaseDumper, SafeDumper, ruamel.yaml.dumper.Dumper, RoundTripDumper) | |
1365 ): | |
1366 Resolver.add_path_resolver(tag, path, kind) | |
1367 else: | |
1368 raise NotImplementedError | |
1369 | |
1370 | |
1371 def add_constructor(tag, object_constructor, Loader=None, constructor=Constructor): | |
1372 # type: (Any, Any, Any, Any) -> None | |
1373 """ | |
1374 Add an object constructor for the given tag. | |
1375 object_onstructor is a function that accepts a Loader instance | |
1376 and a node object and produces the corresponding Python object. | |
1377 """ | |
1378 if Loader is None: | |
1379 constructor.add_constructor(tag, object_constructor) | |
1380 else: | |
1381 if hasattr(Loader, 'add_constructor'): | |
1382 Loader.add_constructor(tag, object_constructor) | |
1383 return | |
1384 if issubclass(Loader, BaseLoader): | |
1385 BaseConstructor.add_constructor(tag, object_constructor) | |
1386 elif issubclass(Loader, SafeLoader): | |
1387 SafeConstructor.add_constructor(tag, object_constructor) | |
1388 elif issubclass(Loader, Loader): | |
1389 Constructor.add_constructor(tag, object_constructor) | |
1390 elif issubclass(Loader, RoundTripLoader): | |
1391 RoundTripConstructor.add_constructor(tag, object_constructor) | |
1392 else: | |
1393 raise NotImplementedError | |
1394 | |
1395 | |
1396 def add_multi_constructor(tag_prefix, multi_constructor, Loader=None, constructor=Constructor): | |
1397 # type: (Any, Any, Any, Any) -> None | |
1398 """ | |
1399 Add a multi-constructor for the given tag prefix. | |
1400 Multi-constructor is called for a node if its tag starts with tag_prefix. | |
1401 Multi-constructor accepts a Loader instance, a tag suffix, | |
1402 and a node object and produces the corresponding Python object. | |
1403 """ | |
1404 if Loader is None: | |
1405 constructor.add_multi_constructor(tag_prefix, multi_constructor) | |
1406 else: | |
1407 if False and hasattr(Loader, 'add_multi_constructor'): | |
1408 Loader.add_multi_constructor(tag_prefix, constructor) | |
1409 return | |
1410 if issubclass(Loader, BaseLoader): | |
1411 BaseConstructor.add_multi_constructor(tag_prefix, multi_constructor) | |
1412 elif issubclass(Loader, SafeLoader): | |
1413 SafeConstructor.add_multi_constructor(tag_prefix, multi_constructor) | |
1414 elif issubclass(Loader, ruamel.yaml.loader.Loader): | |
1415 Constructor.add_multi_constructor(tag_prefix, multi_constructor) | |
1416 elif issubclass(Loader, RoundTripLoader): | |
1417 RoundTripConstructor.add_multi_constructor(tag_prefix, multi_constructor) | |
1418 else: | |
1419 raise NotImplementedError | |
1420 | |
1421 | |
1422 def add_representer(data_type, object_representer, Dumper=None, representer=Representer): | |
1423 # type: (Any, Any, Any, Any) -> None | |
1424 """ | |
1425 Add a representer for the given type. | |
1426 object_representer is a function accepting a Dumper instance | |
1427 and an instance of the given data type | |
1428 and producing the corresponding representation node. | |
1429 """ | |
1430 if Dumper is None: | |
1431 representer.add_representer(data_type, object_representer) | |
1432 else: | |
1433 if hasattr(Dumper, 'add_representer'): | |
1434 Dumper.add_representer(data_type, object_representer) | |
1435 return | |
1436 if issubclass(Dumper, BaseDumper): | |
1437 BaseRepresenter.add_representer(data_type, object_representer) | |
1438 elif issubclass(Dumper, SafeDumper): | |
1439 SafeRepresenter.add_representer(data_type, object_representer) | |
1440 elif issubclass(Dumper, Dumper): | |
1441 Representer.add_representer(data_type, object_representer) | |
1442 elif issubclass(Dumper, RoundTripDumper): | |
1443 RoundTripRepresenter.add_representer(data_type, object_representer) | |
1444 else: | |
1445 raise NotImplementedError | |
1446 | |
1447 | |
1448 # this code currently not tested | |
1449 def add_multi_representer(data_type, multi_representer, Dumper=None, representer=Representer): | |
1450 # type: (Any, Any, Any, Any) -> None | |
1451 """ | |
1452 Add a representer for the given type. | |
1453 multi_representer is a function accepting a Dumper instance | |
1454 and an instance of the given data type or subtype | |
1455 and producing the corresponding representation node. | |
1456 """ | |
1457 if Dumper is None: | |
1458 representer.add_multi_representer(data_type, multi_representer) | |
1459 else: | |
1460 if hasattr(Dumper, 'add_multi_representer'): | |
1461 Dumper.add_multi_representer(data_type, multi_representer) | |
1462 return | |
1463 if issubclass(Dumper, BaseDumper): | |
1464 BaseRepresenter.add_multi_representer(data_type, multi_representer) | |
1465 elif issubclass(Dumper, SafeDumper): | |
1466 SafeRepresenter.add_multi_representer(data_type, multi_representer) | |
1467 elif issubclass(Dumper, Dumper): | |
1468 Representer.add_multi_representer(data_type, multi_representer) | |
1469 elif issubclass(Dumper, RoundTripDumper): | |
1470 RoundTripRepresenter.add_multi_representer(data_type, multi_representer) | |
1471 else: | |
1472 raise NotImplementedError | |
1473 | |
1474 | |
1475 class YAMLObjectMetaclass(type): | |
1476 """ | |
1477 The metaclass for YAMLObject. | |
1478 """ | |
1479 | |
1480 def __init__(cls, name, bases, kwds): | |
1481 # type: (Any, Any, Any) -> None | |
1482 super(YAMLObjectMetaclass, cls).__init__(name, bases, kwds) | |
1483 if 'yaml_tag' in kwds and kwds['yaml_tag'] is not None: | |
1484 cls.yaml_constructor.add_constructor(cls.yaml_tag, cls.from_yaml) # type: ignore | |
1485 cls.yaml_representer.add_representer(cls, cls.to_yaml) # type: ignore | |
1486 | |
1487 | |
1488 class YAMLObject(with_metaclass(YAMLObjectMetaclass)): # type: ignore | |
1489 """ | |
1490 An object that can dump itself to a YAML stream | |
1491 and load itself from a YAML stream. | |
1492 """ | |
1493 | |
1494 __slots__ = () # no direct instantiation, so allow immutable subclasses | |
1495 | |
1496 yaml_constructor = Constructor | |
1497 yaml_representer = Representer | |
1498 | |
1499 yaml_tag = None # type: Any | |
1500 yaml_flow_style = None # type: Any | |
1501 | |
1502 @classmethod | |
1503 def from_yaml(cls, constructor, node): | |
1504 # type: (Any, Any) -> Any | |
1505 """ | |
1506 Convert a representation node to a Python object. | |
1507 """ | |
1508 return constructor.construct_yaml_object(node, cls) | |
1509 | |
1510 @classmethod | |
1511 def to_yaml(cls, representer, data): | |
1512 # type: (Any, Any) -> Any | |
1513 """ | |
1514 Convert a Python object to a representation node. | |
1515 """ | |
1516 return representer.represent_yaml_object( | |
1517 cls.yaml_tag, data, cls, flow_style=cls.yaml_flow_style | |
1518 ) |