comparison env/lib/python3.9/site-packages/ruamel/yaml/main.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # coding: utf-8
2
3 from __future__ import absolute_import, unicode_literals, print_function
4
5 import sys
6 import os
7 import warnings
8 import glob
9 from importlib import import_module
10
11
12 import ruamel.yaml
13 from ruamel.yaml.error import UnsafeLoaderWarning, YAMLError # NOQA
14
15 from ruamel.yaml.tokens import * # NOQA
16 from ruamel.yaml.events import * # NOQA
17 from ruamel.yaml.nodes import * # NOQA
18
19 from ruamel.yaml.loader import BaseLoader, SafeLoader, Loader, RoundTripLoader # NOQA
20 from ruamel.yaml.dumper import BaseDumper, SafeDumper, Dumper, RoundTripDumper # NOQA
21 from ruamel.yaml.compat import StringIO, BytesIO, with_metaclass, PY3, nprint
22 from ruamel.yaml.resolver import VersionedResolver, Resolver # NOQA
23 from ruamel.yaml.representer import (
24 BaseRepresenter,
25 SafeRepresenter,
26 Representer,
27 RoundTripRepresenter,
28 )
29 from ruamel.yaml.constructor import (
30 BaseConstructor,
31 SafeConstructor,
32 Constructor,
33 RoundTripConstructor,
34 )
35 from ruamel.yaml.loader import Loader as UnsafeLoader
36
37 if False: # MYPY
38 from typing import List, Set, Dict, Union, Any, Callable, Optional, Text # NOQA
39 from ruamel.yaml.compat import StreamType, StreamTextType, VersionType # NOQA
40
41 if PY3:
42 from pathlib import Path
43 else:
44 Path = Any
45
46 try:
47 from _ruamel_yaml import CParser, CEmitter # type: ignore
48 except: # NOQA
49 CParser = CEmitter = None
50
51 # import io
52
53 enforce = object()
54
55
56 # YAML is an acronym, i.e. spoken: rhymes with "camel". And thus a
57 # subset of abbreviations, which should be all caps according to PEP8
58
59
60 class YAML(object):
61 def __init__(
62 self, _kw=enforce, typ=None, pure=False, output=None, plug_ins=None # input=None,
63 ):
64 # type: (Any, Optional[Text], Any, Any, Any) -> None
65 """
66 _kw: not used, forces keyword arguments in 2.7 (in 3 you can do (*, safe_load=..)
67 typ: 'rt'/None -> RoundTripLoader/RoundTripDumper, (default)
68 'safe' -> SafeLoader/SafeDumper,
69 'unsafe' -> normal/unsafe Loader/Dumper
70 'base' -> baseloader
71 pure: if True only use Python modules
72 input/output: needed to work as context manager
73 plug_ins: a list of plug-in files
74 """
75 if _kw is not enforce:
76 raise TypeError(
77 '{}.__init__() takes no positional argument but at least '
78 'one was given ({!r})'.format(self.__class__.__name__, _kw)
79 )
80
81 self.typ = ['rt'] if typ is None else (typ if isinstance(typ, list) else [typ])
82 self.pure = pure
83
84 # self._input = input
85 self._output = output
86 self._context_manager = None # type: Any
87
88 self.plug_ins = [] # type: List[Any]
89 for pu in ([] if plug_ins is None else plug_ins) + self.official_plug_ins():
90 file_name = pu.replace(os.sep, '.')
91 self.plug_ins.append(import_module(file_name))
92 self.Resolver = ruamel.yaml.resolver.VersionedResolver # type: Any
93 self.allow_unicode = True
94 self.Reader = None # type: Any
95 self.Scanner = None # type: Any
96 self.Serializer = None # type: Any
97 self.default_flow_style = None # type: Any
98 typ_found = 1
99 setup_rt = False
100 if 'rt' in self.typ:
101 setup_rt = True
102 elif 'safe' in self.typ:
103 self.Emitter = (
104 ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter
105 )
106 self.Representer = ruamel.yaml.representer.SafeRepresenter
107 self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
108 self.Composer = ruamel.yaml.composer.Composer
109 self.Constructor = ruamel.yaml.constructor.SafeConstructor
110 elif 'base' in self.typ:
111 self.Emitter = ruamel.yaml.emitter.Emitter
112 self.Representer = ruamel.yaml.representer.BaseRepresenter
113 self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
114 self.Composer = ruamel.yaml.composer.Composer
115 self.Constructor = ruamel.yaml.constructor.BaseConstructor
116 elif 'unsafe' in self.typ:
117 self.Emitter = (
118 ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter
119 )
120 self.Representer = ruamel.yaml.representer.Representer
121 self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
122 self.Composer = ruamel.yaml.composer.Composer
123 self.Constructor = ruamel.yaml.constructor.Constructor
124 else:
125 setup_rt = True
126 typ_found = 0
127 if setup_rt:
128 self.default_flow_style = False
129 # no optimized rt-dumper yet
130 self.Emitter = ruamel.yaml.emitter.Emitter # type: Any
131 self.Serializer = ruamel.yaml.serializer.Serializer
132 self.Representer = ruamel.yaml.representer.RoundTripRepresenter # type: Any
133 self.Scanner = ruamel.yaml.scanner.RoundTripScanner
134 # no optimized rt-parser yet
135 self.Parser = ruamel.yaml.parser.RoundTripParser # type: Any
136 self.Composer = ruamel.yaml.composer.Composer # type: Any
137 self.Constructor = ruamel.yaml.constructor.RoundTripConstructor # type: Any
138 del setup_rt
139 self.stream = None
140 self.canonical = None
141 self.old_indent = None
142 self.width = None
143 self.line_break = None
144
145 self.map_indent = None
146 self.sequence_indent = None
147 self.sequence_dash_offset = 0
148 self.compact_seq_seq = None
149 self.compact_seq_map = None
150 self.sort_base_mapping_type_on_output = None # default: sort
151
152 self.top_level_colon_align = None
153 self.prefix_colon = None
154 self.version = None
155 self.preserve_quotes = None
156 self.allow_duplicate_keys = False # duplicate keys in map, set
157 self.encoding = 'utf-8'
158 self.explicit_start = None
159 self.explicit_end = None
160 self.tags = None
161 self.default_style = None
162 self.top_level_block_style_scalar_no_indent_error_1_1 = False
163 # directives end indicator with single scalar document
164 self.scalar_after_indicator = None
165 # [a, b: 1, c: {d: 2}] vs. [a, {b: 1}, {c: {d: 2}}]
166 self.brace_single_entry_mapping_in_flow_sequence = False
167 for module in self.plug_ins:
168 if getattr(module, 'typ', None) in self.typ:
169 typ_found += 1
170 module.init_typ(self)
171 break
172 if typ_found == 0:
173 raise NotImplementedError(
174 'typ "{}"not recognised (need to install plug-in?)'.format(self.typ)
175 )
176
177 @property
178 def reader(self):
179 # type: () -> Any
180 try:
181 return self._reader # type: ignore
182 except AttributeError:
183 self._reader = self.Reader(None, loader=self)
184 return self._reader
185
186 @property
187 def scanner(self):
188 # type: () -> Any
189 try:
190 return self._scanner # type: ignore
191 except AttributeError:
192 self._scanner = self.Scanner(loader=self)
193 return self._scanner
194
195 @property
196 def parser(self):
197 # type: () -> Any
198 attr = '_' + sys._getframe().f_code.co_name
199 if not hasattr(self, attr):
200 if self.Parser is not CParser:
201 setattr(self, attr, self.Parser(loader=self))
202 else:
203 if getattr(self, '_stream', None) is None:
204 # wait for the stream
205 return None
206 else:
207 # if not hasattr(self._stream, 'read') and hasattr(self._stream, 'open'):
208 # # pathlib.Path() instance
209 # setattr(self, attr, CParser(self._stream))
210 # else:
211 setattr(self, attr, CParser(self._stream))
212 # self._parser = self._composer = self
213 # nprint('scanner', self.loader.scanner)
214
215 return getattr(self, attr)
216
217 @property
218 def composer(self):
219 # type: () -> Any
220 attr = '_' + sys._getframe().f_code.co_name
221 if not hasattr(self, attr):
222 setattr(self, attr, self.Composer(loader=self))
223 return getattr(self, attr)
224
225 @property
226 def constructor(self):
227 # type: () -> Any
228 attr = '_' + sys._getframe().f_code.co_name
229 if not hasattr(self, attr):
230 cnst = self.Constructor(preserve_quotes=self.preserve_quotes, loader=self)
231 cnst.allow_duplicate_keys = self.allow_duplicate_keys
232 setattr(self, attr, cnst)
233 return getattr(self, attr)
234
235 @property
236 def resolver(self):
237 # type: () -> Any
238 attr = '_' + sys._getframe().f_code.co_name
239 if not hasattr(self, attr):
240 setattr(self, attr, self.Resolver(version=self.version, loader=self))
241 return getattr(self, attr)
242
243 @property
244 def emitter(self):
245 # type: () -> Any
246 attr = '_' + sys._getframe().f_code.co_name
247 if not hasattr(self, attr):
248 if self.Emitter is not CEmitter:
249 _emitter = self.Emitter(
250 None,
251 canonical=self.canonical,
252 indent=self.old_indent,
253 width=self.width,
254 allow_unicode=self.allow_unicode,
255 line_break=self.line_break,
256 prefix_colon=self.prefix_colon,
257 brace_single_entry_mapping_in_flow_sequence=self.brace_single_entry_mapping_in_flow_sequence, # NOQA
258 dumper=self,
259 )
260 setattr(self, attr, _emitter)
261 if self.map_indent is not None:
262 _emitter.best_map_indent = self.map_indent
263 if self.sequence_indent is not None:
264 _emitter.best_sequence_indent = self.sequence_indent
265 if self.sequence_dash_offset is not None:
266 _emitter.sequence_dash_offset = self.sequence_dash_offset
267 # _emitter.block_seq_indent = self.sequence_dash_offset
268 if self.compact_seq_seq is not None:
269 _emitter.compact_seq_seq = self.compact_seq_seq
270 if self.compact_seq_map is not None:
271 _emitter.compact_seq_map = self.compact_seq_map
272 else:
273 if getattr(self, '_stream', None) is None:
274 # wait for the stream
275 return None
276 return None
277 return getattr(self, attr)
278
279 @property
280 def serializer(self):
281 # type: () -> Any
282 attr = '_' + sys._getframe().f_code.co_name
283 if not hasattr(self, attr):
284 setattr(
285 self,
286 attr,
287 self.Serializer(
288 encoding=self.encoding,
289 explicit_start=self.explicit_start,
290 explicit_end=self.explicit_end,
291 version=self.version,
292 tags=self.tags,
293 dumper=self,
294 ),
295 )
296 return getattr(self, attr)
297
298 @property
299 def representer(self):
300 # type: () -> Any
301 attr = '_' + sys._getframe().f_code.co_name
302 if not hasattr(self, attr):
303 repres = self.Representer(
304 default_style=self.default_style,
305 default_flow_style=self.default_flow_style,
306 dumper=self,
307 )
308 if self.sort_base_mapping_type_on_output is not None:
309 repres.sort_base_mapping_type_on_output = self.sort_base_mapping_type_on_output
310 setattr(self, attr, repres)
311 return getattr(self, attr)
312
313 # separate output resolver?
314
315 # def load(self, stream=None):
316 # if self._context_manager:
317 # if not self._input:
318 # raise TypeError("Missing input stream while dumping from context manager")
319 # for data in self._context_manager.load():
320 # yield data
321 # return
322 # if stream is None:
323 # raise TypeError("Need a stream argument when not loading from context manager")
324 # return self.load_one(stream)
325
326 def load(self, stream):
327 # type: (Union[Path, StreamTextType]) -> Any
328 """
329 at this point you either have the non-pure Parser (which has its own reader and
330 scanner) or you have the pure Parser.
331 If the pure Parser is set, then set the Reader and Scanner, if not already set.
332 If either the Scanner or Reader are set, you cannot use the non-pure Parser,
333 so reset it to the pure parser and set the Reader resp. Scanner if necessary
334 """
335 if not hasattr(stream, 'read') and hasattr(stream, 'open'):
336 # pathlib.Path() instance
337 with stream.open('rb') as fp:
338 return self.load(fp)
339 constructor, parser = self.get_constructor_parser(stream)
340 try:
341 return constructor.get_single_data()
342 finally:
343 parser.dispose()
344 try:
345 self._reader.reset_reader()
346 except AttributeError:
347 pass
348 try:
349 self._scanner.reset_scanner()
350 except AttributeError:
351 pass
352
353 def load_all(self, stream, _kw=enforce): # , skip=None):
354 # type: (Union[Path, StreamTextType], Any) -> Any
355 if _kw is not enforce:
356 raise TypeError(
357 '{}.__init__() takes no positional argument but at least '
358 'one was given ({!r})'.format(self.__class__.__name__, _kw)
359 )
360 if not hasattr(stream, 'read') and hasattr(stream, 'open'):
361 # pathlib.Path() instance
362 with stream.open('r') as fp:
363 for d in self.load_all(fp, _kw=enforce):
364 yield d
365 return
366 # if skip is None:
367 # skip = []
368 # elif isinstance(skip, int):
369 # skip = [skip]
370 constructor, parser = self.get_constructor_parser(stream)
371 try:
372 while constructor.check_data():
373 yield constructor.get_data()
374 finally:
375 parser.dispose()
376 try:
377 self._reader.reset_reader()
378 except AttributeError:
379 pass
380 try:
381 self._scanner.reset_scanner()
382 except AttributeError:
383 pass
384
385 def get_constructor_parser(self, stream):
386 # type: (StreamTextType) -> Any
387 """
388 the old cyaml needs special setup, and therefore the stream
389 """
390 if self.Parser is not CParser:
391 if self.Reader is None:
392 self.Reader = ruamel.yaml.reader.Reader
393 if self.Scanner is None:
394 self.Scanner = ruamel.yaml.scanner.Scanner
395 self.reader.stream = stream
396 else:
397 if self.Reader is not None:
398 if self.Scanner is None:
399 self.Scanner = ruamel.yaml.scanner.Scanner
400 self.Parser = ruamel.yaml.parser.Parser
401 self.reader.stream = stream
402 elif self.Scanner is not None:
403 if self.Reader is None:
404 self.Reader = ruamel.yaml.reader.Reader
405 self.Parser = ruamel.yaml.parser.Parser
406 self.reader.stream = stream
407 else:
408 # combined C level reader>scanner>parser
409 # does some calls to the resolver, e.g. BaseResolver.descend_resolver
410 # if you just initialise the CParser, to much of resolver.py
411 # is actually used
412 rslvr = self.Resolver
413 # if rslvr is ruamel.yaml.resolver.VersionedResolver:
414 # rslvr = ruamel.yaml.resolver.Resolver
415
416 class XLoader(self.Parser, self.Constructor, rslvr): # type: ignore
417 def __init__(selfx, stream, version=self.version, preserve_quotes=None):
418 # type: (StreamTextType, Optional[VersionType], Optional[bool]) -> None # NOQA
419 CParser.__init__(selfx, stream)
420 selfx._parser = selfx._composer = selfx
421 self.Constructor.__init__(selfx, loader=selfx)
422 selfx.allow_duplicate_keys = self.allow_duplicate_keys
423 rslvr.__init__(selfx, version=version, loadumper=selfx)
424
425 self._stream = stream
426 loader = XLoader(stream)
427 return loader, loader
428 return self.constructor, self.parser
429
430 def dump(self, data, stream=None, _kw=enforce, transform=None):
431 # type: (Any, Union[Path, StreamType], Any, Any) -> Any
432 if self._context_manager:
433 if not self._output:
434 raise TypeError('Missing output stream while dumping from context manager')
435 if _kw is not enforce:
436 raise TypeError(
437 '{}.dump() takes one positional argument but at least '
438 'two were given ({!r})'.format(self.__class__.__name__, _kw)
439 )
440 if transform is not None:
441 raise TypeError(
442 '{}.dump() in the context manager cannot have transform keyword '
443 ''.format(self.__class__.__name__)
444 )
445 self._context_manager.dump(data)
446 else: # old style
447 if stream is None:
448 raise TypeError('Need a stream argument when not dumping from context manager')
449 return self.dump_all([data], stream, _kw, transform=transform)
450
451 def dump_all(self, documents, stream, _kw=enforce, transform=None):
452 # type: (Any, Union[Path, StreamType], Any, Any) -> Any
453 if self._context_manager:
454 raise NotImplementedError
455 if _kw is not enforce:
456 raise TypeError(
457 '{}.dump(_all) takes two positional argument but at least '
458 'three were given ({!r})'.format(self.__class__.__name__, _kw)
459 )
460 self._output = stream
461 self._context_manager = YAMLContextManager(self, transform=transform)
462 for data in documents:
463 self._context_manager.dump(data)
464 self._context_manager.teardown_output()
465 self._output = None
466 self._context_manager = None
467
468 def Xdump_all(self, documents, stream, _kw=enforce, transform=None):
469 # type: (Any, Union[Path, StreamType], Any, Any) -> Any
470 """
471 Serialize a sequence of Python objects into a YAML stream.
472 """
473 if not hasattr(stream, 'write') and hasattr(stream, 'open'):
474 # pathlib.Path() instance
475 with stream.open('w') as fp:
476 return self.dump_all(documents, fp, _kw, transform=transform)
477 if _kw is not enforce:
478 raise TypeError(
479 '{}.dump(_all) takes two positional argument but at least '
480 'three were given ({!r})'.format(self.__class__.__name__, _kw)
481 )
482 # The stream should have the methods `write` and possibly `flush`.
483 if self.top_level_colon_align is True:
484 tlca = max([len(str(x)) for x in documents[0]]) # type: Any
485 else:
486 tlca = self.top_level_colon_align
487 if transform is not None:
488 fstream = stream
489 if self.encoding is None:
490 stream = StringIO()
491 else:
492 stream = BytesIO()
493 serializer, representer, emitter = self.get_serializer_representer_emitter(
494 stream, tlca
495 )
496 try:
497 self.serializer.open()
498 for data in documents:
499 try:
500 self.representer.represent(data)
501 except AttributeError:
502 # nprint(dir(dumper._representer))
503 raise
504 self.serializer.close()
505 finally:
506 try:
507 self.emitter.dispose()
508 except AttributeError:
509 raise
510 # self.dumper.dispose() # cyaml
511 delattr(self, '_serializer')
512 delattr(self, '_emitter')
513 if transform:
514 val = stream.getvalue()
515 if self.encoding:
516 val = val.decode(self.encoding)
517 if fstream is None:
518 transform(val)
519 else:
520 fstream.write(transform(val))
521 return None
522
523 def get_serializer_representer_emitter(self, stream, tlca):
524 # type: (StreamType, Any) -> Any
525 # we have only .Serializer to deal with (vs .Reader & .Scanner), much simpler
526 if self.Emitter is not CEmitter:
527 if self.Serializer is None:
528 self.Serializer = ruamel.yaml.serializer.Serializer
529 self.emitter.stream = stream
530 self.emitter.top_level_colon_align = tlca
531 if self.scalar_after_indicator is not None:
532 self.emitter.scalar_after_indicator = self.scalar_after_indicator
533 return self.serializer, self.representer, self.emitter
534 if self.Serializer is not None:
535 # cannot set serializer with CEmitter
536 self.Emitter = ruamel.yaml.emitter.Emitter
537 self.emitter.stream = stream
538 self.emitter.top_level_colon_align = tlca
539 if self.scalar_after_indicator is not None:
540 self.emitter.scalar_after_indicator = self.scalar_after_indicator
541 return self.serializer, self.representer, self.emitter
542 # C routines
543
544 rslvr = (
545 ruamel.yaml.resolver.BaseResolver
546 if 'base' in self.typ
547 else ruamel.yaml.resolver.Resolver
548 )
549
550 class XDumper(CEmitter, self.Representer, rslvr): # type: ignore
551 def __init__(
552 selfx,
553 stream,
554 default_style=None,
555 default_flow_style=None,
556 canonical=None,
557 indent=None,
558 width=None,
559 allow_unicode=None,
560 line_break=None,
561 encoding=None,
562 explicit_start=None,
563 explicit_end=None,
564 version=None,
565 tags=None,
566 block_seq_indent=None,
567 top_level_colon_align=None,
568 prefix_colon=None,
569 ):
570 # type: (StreamType, Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Any, Any, Any, Any, Any) -> None # NOQA
571 CEmitter.__init__(
572 selfx,
573 stream,
574 canonical=canonical,
575 indent=indent,
576 width=width,
577 encoding=encoding,
578 allow_unicode=allow_unicode,
579 line_break=line_break,
580 explicit_start=explicit_start,
581 explicit_end=explicit_end,
582 version=version,
583 tags=tags,
584 )
585 selfx._emitter = selfx._serializer = selfx._representer = selfx
586 self.Representer.__init__(
587 selfx, default_style=default_style, default_flow_style=default_flow_style
588 )
589 rslvr.__init__(selfx)
590
591 self._stream = stream
592 dumper = XDumper(
593 stream,
594 default_style=self.default_style,
595 default_flow_style=self.default_flow_style,
596 canonical=self.canonical,
597 indent=self.old_indent,
598 width=self.width,
599 allow_unicode=self.allow_unicode,
600 line_break=self.line_break,
601 explicit_start=self.explicit_start,
602 explicit_end=self.explicit_end,
603 version=self.version,
604 tags=self.tags,
605 )
606 self._emitter = self._serializer = dumper
607 return dumper, dumper, dumper
608
609 # basic types
610 def map(self, **kw):
611 # type: (Any) -> Any
612 if 'rt' in self.typ:
613 from ruamel.yaml.comments import CommentedMap
614
615 return CommentedMap(**kw)
616 else:
617 return dict(**kw)
618
619 def seq(self, *args):
620 # type: (Any) -> Any
621 if 'rt' in self.typ:
622 from ruamel.yaml.comments import CommentedSeq
623
624 return CommentedSeq(*args)
625 else:
626 return list(*args)
627
628 # helpers
629 def official_plug_ins(self):
630 # type: () -> Any
631 bd = os.path.dirname(__file__)
632 gpbd = os.path.dirname(os.path.dirname(bd))
633 res = [x.replace(gpbd, "")[1:-3] for x in glob.glob(bd + '/*/__plug_in__.py')]
634 return res
635
636 def register_class(self, cls):
637 # type:(Any) -> Any
638 """
639 register a class for dumping loading
640 - if it has attribute yaml_tag use that to register, else use class name
641 - if it has methods to_yaml/from_yaml use those to dump/load else dump attributes
642 as mapping
643 """
644 tag = getattr(cls, 'yaml_tag', '!' + cls.__name__)
645 try:
646 self.representer.add_representer(cls, cls.to_yaml)
647 except AttributeError:
648
649 def t_y(representer, data):
650 # type: (Any, Any) -> Any
651 return representer.represent_yaml_object(
652 tag, data, cls, flow_style=representer.default_flow_style
653 )
654
655 self.representer.add_representer(cls, t_y)
656 try:
657 self.constructor.add_constructor(tag, cls.from_yaml)
658 except AttributeError:
659
660 def f_y(constructor, node):
661 # type: (Any, Any) -> Any
662 return constructor.construct_yaml_object(node, cls)
663
664 self.constructor.add_constructor(tag, f_y)
665 return cls
666
667 def parse(self, stream):
668 # type: (StreamTextType) -> Any
669 """
670 Parse a YAML stream and produce parsing events.
671 """
672 _, parser = self.get_constructor_parser(stream)
673 try:
674 while parser.check_event():
675 yield parser.get_event()
676 finally:
677 parser.dispose()
678 try:
679 self._reader.reset_reader()
680 except AttributeError:
681 pass
682 try:
683 self._scanner.reset_scanner()
684 except AttributeError:
685 pass
686
687 # ### context manager
688
689 def __enter__(self):
690 # type: () -> Any
691 self._context_manager = YAMLContextManager(self)
692 return self
693
694 def __exit__(self, typ, value, traceback):
695 # type: (Any, Any, Any) -> None
696 if typ:
697 nprint('typ', typ)
698 self._context_manager.teardown_output()
699 # self._context_manager.teardown_input()
700 self._context_manager = None
701
702 # ### backwards compatibility
703 def _indent(self, mapping=None, sequence=None, offset=None):
704 # type: (Any, Any, Any) -> None
705 if mapping is not None:
706 self.map_indent = mapping
707 if sequence is not None:
708 self.sequence_indent = sequence
709 if offset is not None:
710 self.sequence_dash_offset = offset
711
712 @property
713 def indent(self):
714 # type: () -> Any
715 return self._indent
716
717 @indent.setter
718 def indent(self, val):
719 # type: (Any) -> None
720 self.old_indent = val
721
722 @property
723 def block_seq_indent(self):
724 # type: () -> Any
725 return self.sequence_dash_offset
726
727 @block_seq_indent.setter
728 def block_seq_indent(self, val):
729 # type: (Any) -> None
730 self.sequence_dash_offset = val
731
732 def compact(self, seq_seq=None, seq_map=None):
733 # type: (Any, Any) -> None
734 self.compact_seq_seq = seq_seq
735 self.compact_seq_map = seq_map
736
737
738 class YAMLContextManager(object):
739 def __init__(self, yaml, transform=None):
740 # type: (Any, Any) -> None # used to be: (Any, Optional[Callable]) -> None
741 self._yaml = yaml
742 self._output_inited = False
743 self._output_path = None
744 self._output = self._yaml._output
745 self._transform = transform
746
747 # self._input_inited = False
748 # self._input = input
749 # self._input_path = None
750 # self._transform = yaml.transform
751 # self._fstream = None
752
753 if not hasattr(self._output, 'write') and hasattr(self._output, 'open'):
754 # pathlib.Path() instance, open with the same mode
755 self._output_path = self._output
756 self._output = self._output_path.open('w')
757
758 # if not hasattr(self._stream, 'write') and hasattr(stream, 'open'):
759 # if not hasattr(self._input, 'read') and hasattr(self._input, 'open'):
760 # # pathlib.Path() instance, open with the same mode
761 # self._input_path = self._input
762 # self._input = self._input_path.open('r')
763
764 if self._transform is not None:
765 self._fstream = self._output
766 if self._yaml.encoding is None:
767 self._output = StringIO()
768 else:
769 self._output = BytesIO()
770
771 def teardown_output(self):
772 # type: () -> None
773 if self._output_inited:
774 self._yaml.serializer.close()
775 else:
776 return
777 try:
778 self._yaml.emitter.dispose()
779 except AttributeError:
780 raise
781 # self.dumper.dispose() # cyaml
782 try:
783 delattr(self._yaml, '_serializer')
784 delattr(self._yaml, '_emitter')
785 except AttributeError:
786 raise
787 if self._transform:
788 val = self._output.getvalue()
789 if self._yaml.encoding:
790 val = val.decode(self._yaml.encoding)
791 if self._fstream is None:
792 self._transform(val)
793 else:
794 self._fstream.write(self._transform(val))
795 self._fstream.flush()
796 self._output = self._fstream # maybe not necessary
797 if self._output_path is not None:
798 self._output.close()
799
800 def init_output(self, first_data):
801 # type: (Any) -> None
802 if self._yaml.top_level_colon_align is True:
803 tlca = max([len(str(x)) for x in first_data]) # type: Any
804 else:
805 tlca = self._yaml.top_level_colon_align
806 self._yaml.get_serializer_representer_emitter(self._output, tlca)
807 self._yaml.serializer.open()
808 self._output_inited = True
809
810 def dump(self, data):
811 # type: (Any) -> None
812 if not self._output_inited:
813 self.init_output(data)
814 try:
815 self._yaml.representer.represent(data)
816 except AttributeError:
817 # nprint(dir(dumper._representer))
818 raise
819
820 # def teardown_input(self):
821 # pass
822 #
823 # def init_input(self):
824 # # set the constructor and parser on YAML() instance
825 # self._yaml.get_constructor_parser(stream)
826 #
827 # def load(self):
828 # if not self._input_inited:
829 # self.init_input()
830 # try:
831 # while self._yaml.constructor.check_data():
832 # yield self._yaml.constructor.get_data()
833 # finally:
834 # parser.dispose()
835 # try:
836 # self._reader.reset_reader() # type: ignore
837 # except AttributeError:
838 # pass
839 # try:
840 # self._scanner.reset_scanner() # type: ignore
841 # except AttributeError:
842 # pass
843
844
845 def yaml_object(yml):
846 # type: (Any) -> Any
847 """ decorator for classes that needs to dump/load objects
848 The tag for such objects is taken from the class attribute yaml_tag (or the
849 class name in lowercase in case unavailable)
850 If methods to_yaml and/or from_yaml are available, these are called for dumping resp.
851 loading, default routines (dumping a mapping of the attributes) used otherwise.
852 """
853
854 def yo_deco(cls):
855 # type: (Any) -> Any
856 tag = getattr(cls, 'yaml_tag', '!' + cls.__name__)
857 try:
858 yml.representer.add_representer(cls, cls.to_yaml)
859 except AttributeError:
860
861 def t_y(representer, data):
862 # type: (Any, Any) -> Any
863 return representer.represent_yaml_object(
864 tag, data, cls, flow_style=representer.default_flow_style
865 )
866
867 yml.representer.add_representer(cls, t_y)
868 try:
869 yml.constructor.add_constructor(tag, cls.from_yaml)
870 except AttributeError:
871
872 def f_y(constructor, node):
873 # type: (Any, Any) -> Any
874 return constructor.construct_yaml_object(node, cls)
875
876 yml.constructor.add_constructor(tag, f_y)
877 return cls
878
879 return yo_deco
880
881
882 ########################################################################################
883
884
885 def scan(stream, Loader=Loader):
886 # type: (StreamTextType, Any) -> Any
887 """
888 Scan a YAML stream and produce scanning tokens.
889 """
890 loader = Loader(stream)
891 try:
892 while loader.scanner.check_token():
893 yield loader.scanner.get_token()
894 finally:
895 loader._parser.dispose()
896
897
898 def parse(stream, Loader=Loader):
899 # type: (StreamTextType, Any) -> Any
900 """
901 Parse a YAML stream and produce parsing events.
902 """
903 loader = Loader(stream)
904 try:
905 while loader._parser.check_event():
906 yield loader._parser.get_event()
907 finally:
908 loader._parser.dispose()
909
910
911 def compose(stream, Loader=Loader):
912 # type: (StreamTextType, Any) -> Any
913 """
914 Parse the first YAML document in a stream
915 and produce the corresponding representation tree.
916 """
917 loader = Loader(stream)
918 try:
919 return loader.get_single_node()
920 finally:
921 loader.dispose()
922
923
924 def compose_all(stream, Loader=Loader):
925 # type: (StreamTextType, Any) -> Any
926 """
927 Parse all YAML documents in a stream
928 and produce corresponding representation trees.
929 """
930 loader = Loader(stream)
931 try:
932 while loader.check_node():
933 yield loader._composer.get_node()
934 finally:
935 loader._parser.dispose()
936
937
938 def load(stream, Loader=None, version=None, preserve_quotes=None):
939 # type: (StreamTextType, Any, Optional[VersionType], Any) -> Any
940 """
941 Parse the first YAML document in a stream
942 and produce the corresponding Python object.
943 """
944 if Loader is None:
945 warnings.warn(UnsafeLoaderWarning.text, UnsafeLoaderWarning, stacklevel=2)
946 Loader = UnsafeLoader
947 loader = Loader(stream, version, preserve_quotes=preserve_quotes)
948 try:
949 return loader._constructor.get_single_data()
950 finally:
951 loader._parser.dispose()
952 try:
953 loader._reader.reset_reader()
954 except AttributeError:
955 pass
956 try:
957 loader._scanner.reset_scanner()
958 except AttributeError:
959 pass
960
961
962 def load_all(stream, Loader=None, version=None, preserve_quotes=None):
963 # type: (Optional[StreamTextType], Any, Optional[VersionType], Optional[bool]) -> Any # NOQA
964 """
965 Parse all YAML documents in a stream
966 and produce corresponding Python objects.
967 """
968 if Loader is None:
969 warnings.warn(UnsafeLoaderWarning.text, UnsafeLoaderWarning, stacklevel=2)
970 Loader = UnsafeLoader
971 loader = Loader(stream, version, preserve_quotes=preserve_quotes)
972 try:
973 while loader._constructor.check_data():
974 yield loader._constructor.get_data()
975 finally:
976 loader._parser.dispose()
977 try:
978 loader._reader.reset_reader()
979 except AttributeError:
980 pass
981 try:
982 loader._scanner.reset_scanner()
983 except AttributeError:
984 pass
985
986
987 def safe_load(stream, version=None):
988 # type: (StreamTextType, Optional[VersionType]) -> Any
989 """
990 Parse the first YAML document in a stream
991 and produce the corresponding Python object.
992 Resolve only basic YAML tags.
993 """
994 return load(stream, SafeLoader, version)
995
996
997 def safe_load_all(stream, version=None):
998 # type: (StreamTextType, Optional[VersionType]) -> Any
999 """
1000 Parse all YAML documents in a stream
1001 and produce corresponding Python objects.
1002 Resolve only basic YAML tags.
1003 """
1004 return load_all(stream, SafeLoader, version)
1005
1006
1007 def round_trip_load(stream, version=None, preserve_quotes=None):
1008 # type: (StreamTextType, Optional[VersionType], Optional[bool]) -> Any
1009 """
1010 Parse the first YAML document in a stream
1011 and produce the corresponding Python object.
1012 Resolve only basic YAML tags.
1013 """
1014 return load(stream, RoundTripLoader, version, preserve_quotes=preserve_quotes)
1015
1016
1017 def round_trip_load_all(stream, version=None, preserve_quotes=None):
1018 # type: (StreamTextType, Optional[VersionType], Optional[bool]) -> Any
1019 """
1020 Parse all YAML documents in a stream
1021 and produce corresponding Python objects.
1022 Resolve only basic YAML tags.
1023 """
1024 return load_all(stream, RoundTripLoader, version, preserve_quotes=preserve_quotes)
1025
1026
1027 def emit(
1028 events,
1029 stream=None,
1030 Dumper=Dumper,
1031 canonical=None,
1032 indent=None,
1033 width=None,
1034 allow_unicode=None,
1035 line_break=None,
1036 ):
1037 # type: (Any, Optional[StreamType], Any, Optional[bool], Union[int, None], Optional[int], Optional[bool], Any) -> Any # NOQA
1038 """
1039 Emit YAML parsing events into a stream.
1040 If stream is None, return the produced string instead.
1041 """
1042 getvalue = None
1043 if stream is None:
1044 stream = StringIO()
1045 getvalue = stream.getvalue
1046 dumper = Dumper(
1047 stream,
1048 canonical=canonical,
1049 indent=indent,
1050 width=width,
1051 allow_unicode=allow_unicode,
1052 line_break=line_break,
1053 )
1054 try:
1055 for event in events:
1056 dumper.emit(event)
1057 finally:
1058 try:
1059 dumper._emitter.dispose()
1060 except AttributeError:
1061 raise
1062 dumper.dispose() # cyaml
1063 if getvalue is not None:
1064 return getvalue()
1065
1066
1067 enc = None if PY3 else 'utf-8'
1068
1069
1070 def serialize_all(
1071 nodes,
1072 stream=None,
1073 Dumper=Dumper,
1074 canonical=None,
1075 indent=None,
1076 width=None,
1077 allow_unicode=None,
1078 line_break=None,
1079 encoding=enc,
1080 explicit_start=None,
1081 explicit_end=None,
1082 version=None,
1083 tags=None,
1084 ):
1085 # type: (Any, Optional[StreamType], Any, Any, Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Optional[VersionType], Any) -> Any # NOQA
1086 """
1087 Serialize a sequence of representation trees into a YAML stream.
1088 If stream is None, return the produced string instead.
1089 """
1090 getvalue = None
1091 if stream is None:
1092 if encoding is None:
1093 stream = StringIO()
1094 else:
1095 stream = BytesIO()
1096 getvalue = stream.getvalue
1097 dumper = Dumper(
1098 stream,
1099 canonical=canonical,
1100 indent=indent,
1101 width=width,
1102 allow_unicode=allow_unicode,
1103 line_break=line_break,
1104 encoding=encoding,
1105 version=version,
1106 tags=tags,
1107 explicit_start=explicit_start,
1108 explicit_end=explicit_end,
1109 )
1110 try:
1111 dumper._serializer.open()
1112 for node in nodes:
1113 dumper.serialize(node)
1114 dumper._serializer.close()
1115 finally:
1116 try:
1117 dumper._emitter.dispose()
1118 except AttributeError:
1119 raise
1120 dumper.dispose() # cyaml
1121 if getvalue is not None:
1122 return getvalue()
1123
1124
1125 def serialize(node, stream=None, Dumper=Dumper, **kwds):
1126 # type: (Any, Optional[StreamType], Any, Any) -> Any
1127 """
1128 Serialize a representation tree into a YAML stream.
1129 If stream is None, return the produced string instead.
1130 """
1131 return serialize_all([node], stream, Dumper=Dumper, **kwds)
1132
1133
1134 def dump_all(
1135 documents,
1136 stream=None,
1137 Dumper=Dumper,
1138 default_style=None,
1139 default_flow_style=None,
1140 canonical=None,
1141 indent=None,
1142 width=None,
1143 allow_unicode=None,
1144 line_break=None,
1145 encoding=enc,
1146 explicit_start=None,
1147 explicit_end=None,
1148 version=None,
1149 tags=None,
1150 block_seq_indent=None,
1151 top_level_colon_align=None,
1152 prefix_colon=None,
1153 ):
1154 # type: (Any, Optional[StreamType], Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Any, Any, Any, Any, Any) -> Optional[str] # NOQA
1155 """
1156 Serialize a sequence of Python objects into a YAML stream.
1157 If stream is None, return the produced string instead.
1158 """
1159 getvalue = None
1160 if top_level_colon_align is True:
1161 top_level_colon_align = max([len(str(x)) for x in documents[0]])
1162 if stream is None:
1163 if encoding is None:
1164 stream = StringIO()
1165 else:
1166 stream = BytesIO()
1167 getvalue = stream.getvalue
1168 dumper = Dumper(
1169 stream,
1170 default_style=default_style,
1171 default_flow_style=default_flow_style,
1172 canonical=canonical,
1173 indent=indent,
1174 width=width,
1175 allow_unicode=allow_unicode,
1176 line_break=line_break,
1177 encoding=encoding,
1178 explicit_start=explicit_start,
1179 explicit_end=explicit_end,
1180 version=version,
1181 tags=tags,
1182 block_seq_indent=block_seq_indent,
1183 top_level_colon_align=top_level_colon_align,
1184 prefix_colon=prefix_colon,
1185 )
1186 try:
1187 dumper._serializer.open()
1188 for data in documents:
1189 try:
1190 dumper._representer.represent(data)
1191 except AttributeError:
1192 # nprint(dir(dumper._representer))
1193 raise
1194 dumper._serializer.close()
1195 finally:
1196 try:
1197 dumper._emitter.dispose()
1198 except AttributeError:
1199 raise
1200 dumper.dispose() # cyaml
1201 if getvalue is not None:
1202 return getvalue()
1203 return None
1204
1205
1206 def dump(
1207 data,
1208 stream=None,
1209 Dumper=Dumper,
1210 default_style=None,
1211 default_flow_style=None,
1212 canonical=None,
1213 indent=None,
1214 width=None,
1215 allow_unicode=None,
1216 line_break=None,
1217 encoding=enc,
1218 explicit_start=None,
1219 explicit_end=None,
1220 version=None,
1221 tags=None,
1222 block_seq_indent=None,
1223 ):
1224 # type: (Any, Optional[StreamType], Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Optional[VersionType], Any, Any) -> Optional[str] # NOQA
1225 """
1226 Serialize a Python object into a YAML stream.
1227 If stream is None, return the produced string instead.
1228
1229 default_style ∈ None, '', '"', "'", '|', '>'
1230
1231 """
1232 return dump_all(
1233 [data],
1234 stream,
1235 Dumper=Dumper,
1236 default_style=default_style,
1237 default_flow_style=default_flow_style,
1238 canonical=canonical,
1239 indent=indent,
1240 width=width,
1241 allow_unicode=allow_unicode,
1242 line_break=line_break,
1243 encoding=encoding,
1244 explicit_start=explicit_start,
1245 explicit_end=explicit_end,
1246 version=version,
1247 tags=tags,
1248 block_seq_indent=block_seq_indent,
1249 )
1250
1251
1252 def safe_dump_all(documents, stream=None, **kwds):
1253 # type: (Any, Optional[StreamType], Any) -> Optional[str]
1254 """
1255 Serialize a sequence of Python objects into a YAML stream.
1256 Produce only basic YAML tags.
1257 If stream is None, return the produced string instead.
1258 """
1259 return dump_all(documents, stream, Dumper=SafeDumper, **kwds)
1260
1261
1262 def safe_dump(data, stream=None, **kwds):
1263 # type: (Any, Optional[StreamType], Any) -> Optional[str]
1264 """
1265 Serialize a Python object into a YAML stream.
1266 Produce only basic YAML tags.
1267 If stream is None, return the produced string instead.
1268 """
1269 return dump_all([data], stream, Dumper=SafeDumper, **kwds)
1270
1271
1272 def round_trip_dump(
1273 data,
1274 stream=None,
1275 Dumper=RoundTripDumper,
1276 default_style=None,
1277 default_flow_style=None,
1278 canonical=None,
1279 indent=None,
1280 width=None,
1281 allow_unicode=None,
1282 line_break=None,
1283 encoding=enc,
1284 explicit_start=None,
1285 explicit_end=None,
1286 version=None,
1287 tags=None,
1288 block_seq_indent=None,
1289 top_level_colon_align=None,
1290 prefix_colon=None,
1291 ):
1292 # type: (Any, Optional[StreamType], Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Optional[VersionType], Any, Any, Any, Any) -> Optional[str] # NOQA
1293 allow_unicode = True if allow_unicode is None else allow_unicode
1294 return dump_all(
1295 [data],
1296 stream,
1297 Dumper=Dumper,
1298 default_style=default_style,
1299 default_flow_style=default_flow_style,
1300 canonical=canonical,
1301 indent=indent,
1302 width=width,
1303 allow_unicode=allow_unicode,
1304 line_break=line_break,
1305 encoding=encoding,
1306 explicit_start=explicit_start,
1307 explicit_end=explicit_end,
1308 version=version,
1309 tags=tags,
1310 block_seq_indent=block_seq_indent,
1311 top_level_colon_align=top_level_colon_align,
1312 prefix_colon=prefix_colon,
1313 )
1314
1315
1316 # Loader/Dumper are no longer composites, to get to the associated
1317 # Resolver()/Representer(), etc., you need to instantiate the class
1318
1319
1320 def add_implicit_resolver(
1321 tag, regexp, first=None, Loader=None, Dumper=None, resolver=Resolver
1322 ):
1323 # type: (Any, Any, Any, Any, Any, Any) -> None
1324 """
1325 Add an implicit scalar detector.
1326 If an implicit scalar value matches the given regexp,
1327 the corresponding tag is assigned to the scalar.
1328 first is a sequence of possible initial characters or None.
1329 """
1330 if Loader is None and Dumper is None:
1331 resolver.add_implicit_resolver(tag, regexp, first)
1332 return
1333 if Loader:
1334 if hasattr(Loader, 'add_implicit_resolver'):
1335 Loader.add_implicit_resolver(tag, regexp, first)
1336 elif issubclass(
1337 Loader, (BaseLoader, SafeLoader, ruamel.yaml.loader.Loader, RoundTripLoader)
1338 ):
1339 Resolver.add_implicit_resolver(tag, regexp, first)
1340 else:
1341 raise NotImplementedError
1342 if Dumper:
1343 if hasattr(Dumper, 'add_implicit_resolver'):
1344 Dumper.add_implicit_resolver(tag, regexp, first)
1345 elif issubclass(
1346 Dumper, (BaseDumper, SafeDumper, ruamel.yaml.dumper.Dumper, RoundTripDumper)
1347 ):
1348 Resolver.add_implicit_resolver(tag, regexp, first)
1349 else:
1350 raise NotImplementedError
1351
1352
1353 # this code currently not tested
1354 def add_path_resolver(tag, path, kind=None, Loader=None, Dumper=None, resolver=Resolver):
1355 # type: (Any, Any, Any, Any, Any, Any) -> None
1356 """
1357 Add a path based resolver for the given tag.
1358 A path is a list of keys that forms a path
1359 to a node in the representation tree.
1360 Keys can be string values, integers, or None.
1361 """
1362 if Loader is None and Dumper is None:
1363 resolver.add_path_resolver(tag, path, kind)
1364 return
1365 if Loader:
1366 if hasattr(Loader, 'add_path_resolver'):
1367 Loader.add_path_resolver(tag, path, kind)
1368 elif issubclass(
1369 Loader, (BaseLoader, SafeLoader, ruamel.yaml.loader.Loader, RoundTripLoader)
1370 ):
1371 Resolver.add_path_resolver(tag, path, kind)
1372 else:
1373 raise NotImplementedError
1374 if Dumper:
1375 if hasattr(Dumper, 'add_path_resolver'):
1376 Dumper.add_path_resolver(tag, path, kind)
1377 elif issubclass(
1378 Dumper, (BaseDumper, SafeDumper, ruamel.yaml.dumper.Dumper, RoundTripDumper)
1379 ):
1380 Resolver.add_path_resolver(tag, path, kind)
1381 else:
1382 raise NotImplementedError
1383
1384
1385 def add_constructor(tag, object_constructor, Loader=None, constructor=Constructor):
1386 # type: (Any, Any, Any, Any) -> None
1387 """
1388 Add an object constructor for the given tag.
1389 object_onstructor is a function that accepts a Loader instance
1390 and a node object and produces the corresponding Python object.
1391 """
1392 if Loader is None:
1393 constructor.add_constructor(tag, object_constructor)
1394 else:
1395 if hasattr(Loader, 'add_constructor'):
1396 Loader.add_constructor(tag, object_constructor)
1397 return
1398 if issubclass(Loader, BaseLoader):
1399 BaseConstructor.add_constructor(tag, object_constructor)
1400 elif issubclass(Loader, SafeLoader):
1401 SafeConstructor.add_constructor(tag, object_constructor)
1402 elif issubclass(Loader, Loader):
1403 Constructor.add_constructor(tag, object_constructor)
1404 elif issubclass(Loader, RoundTripLoader):
1405 RoundTripConstructor.add_constructor(tag, object_constructor)
1406 else:
1407 raise NotImplementedError
1408
1409
1410 def add_multi_constructor(tag_prefix, multi_constructor, Loader=None, constructor=Constructor):
1411 # type: (Any, Any, Any, Any) -> None
1412 """
1413 Add a multi-constructor for the given tag prefix.
1414 Multi-constructor is called for a node if its tag starts with tag_prefix.
1415 Multi-constructor accepts a Loader instance, a tag suffix,
1416 and a node object and produces the corresponding Python object.
1417 """
1418 if Loader is None:
1419 constructor.add_multi_constructor(tag_prefix, multi_constructor)
1420 else:
1421 if False and hasattr(Loader, 'add_multi_constructor'):
1422 Loader.add_multi_constructor(tag_prefix, constructor)
1423 return
1424 if issubclass(Loader, BaseLoader):
1425 BaseConstructor.add_multi_constructor(tag_prefix, multi_constructor)
1426 elif issubclass(Loader, SafeLoader):
1427 SafeConstructor.add_multi_constructor(tag_prefix, multi_constructor)
1428 elif issubclass(Loader, ruamel.yaml.loader.Loader):
1429 Constructor.add_multi_constructor(tag_prefix, multi_constructor)
1430 elif issubclass(Loader, RoundTripLoader):
1431 RoundTripConstructor.add_multi_constructor(tag_prefix, multi_constructor)
1432 else:
1433 raise NotImplementedError
1434
1435
1436 def add_representer(data_type, object_representer, Dumper=None, representer=Representer):
1437 # type: (Any, Any, Any, Any) -> None
1438 """
1439 Add a representer for the given type.
1440 object_representer is a function accepting a Dumper instance
1441 and an instance of the given data type
1442 and producing the corresponding representation node.
1443 """
1444 if Dumper is None:
1445 representer.add_representer(data_type, object_representer)
1446 else:
1447 if hasattr(Dumper, 'add_representer'):
1448 Dumper.add_representer(data_type, object_representer)
1449 return
1450 if issubclass(Dumper, BaseDumper):
1451 BaseRepresenter.add_representer(data_type, object_representer)
1452 elif issubclass(Dumper, SafeDumper):
1453 SafeRepresenter.add_representer(data_type, object_representer)
1454 elif issubclass(Dumper, Dumper):
1455 Representer.add_representer(data_type, object_representer)
1456 elif issubclass(Dumper, RoundTripDumper):
1457 RoundTripRepresenter.add_representer(data_type, object_representer)
1458 else:
1459 raise NotImplementedError
1460
1461
1462 # this code currently not tested
1463 def add_multi_representer(data_type, multi_representer, Dumper=None, representer=Representer):
1464 # type: (Any, Any, Any, Any) -> None
1465 """
1466 Add a representer for the given type.
1467 multi_representer is a function accepting a Dumper instance
1468 and an instance of the given data type or subtype
1469 and producing the corresponding representation node.
1470 """
1471 if Dumper is None:
1472 representer.add_multi_representer(data_type, multi_representer)
1473 else:
1474 if hasattr(Dumper, 'add_multi_representer'):
1475 Dumper.add_multi_representer(data_type, multi_representer)
1476 return
1477 if issubclass(Dumper, BaseDumper):
1478 BaseRepresenter.add_multi_representer(data_type, multi_representer)
1479 elif issubclass(Dumper, SafeDumper):
1480 SafeRepresenter.add_multi_representer(data_type, multi_representer)
1481 elif issubclass(Dumper, Dumper):
1482 Representer.add_multi_representer(data_type, multi_representer)
1483 elif issubclass(Dumper, RoundTripDumper):
1484 RoundTripRepresenter.add_multi_representer(data_type, multi_representer)
1485 else:
1486 raise NotImplementedError
1487
1488
1489 class YAMLObjectMetaclass(type):
1490 """
1491 The metaclass for YAMLObject.
1492 """
1493
1494 def __init__(cls, name, bases, kwds):
1495 # type: (Any, Any, Any) -> None
1496 super(YAMLObjectMetaclass, cls).__init__(name, bases, kwds)
1497 if 'yaml_tag' in kwds and kwds['yaml_tag'] is not None:
1498 cls.yaml_constructor.add_constructor(cls.yaml_tag, cls.from_yaml) # type: ignore
1499 cls.yaml_representer.add_representer(cls, cls.to_yaml) # type: ignore
1500
1501
1502 class YAMLObject(with_metaclass(YAMLObjectMetaclass)): # type: ignore
1503 """
1504 An object that can dump itself to a YAML stream
1505 and load itself from a YAML stream.
1506 """
1507
1508 __slots__ = () # no direct instantiation, so allow immutable subclasses
1509
1510 yaml_constructor = Constructor
1511 yaml_representer = Representer
1512
1513 yaml_tag = None # type: Any
1514 yaml_flow_style = None # type: Any
1515
1516 @classmethod
1517 def from_yaml(cls, constructor, node):
1518 # type: (Any, Any) -> Any
1519 """
1520 Convert a representation node to a Python object.
1521 """
1522 return constructor.construct_yaml_object(node, cls)
1523
1524 @classmethod
1525 def to_yaml(cls, representer, data):
1526 # type: (Any, Any) -> Any
1527 """
1528 Convert a Python object to a representation node.
1529 """
1530 return representer.represent_yaml_object(
1531 cls.yaml_tag, data, cls, flow_style=cls.yaml_flow_style
1532 )