diff env/lib/python3.7/site-packages/ruamel/yaml/main.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/ruamel/yaml/main.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,1518 @@
+# coding: utf-8
+
+from __future__ import absolute_import, unicode_literals, print_function
+
+import sys
+import os
+import warnings
+import glob
+from importlib import import_module
+
+
+import ruamel.yaml
+from ruamel.yaml.error import UnsafeLoaderWarning, YAMLError  # NOQA
+
+from ruamel.yaml.tokens import *  # NOQA
+from ruamel.yaml.events import *  # NOQA
+from ruamel.yaml.nodes import *  # NOQA
+
+from ruamel.yaml.loader import BaseLoader, SafeLoader, Loader, RoundTripLoader  # NOQA
+from ruamel.yaml.dumper import BaseDumper, SafeDumper, Dumper, RoundTripDumper  # NOQA
+from ruamel.yaml.compat import StringIO, BytesIO, with_metaclass, PY3, nprint
+from ruamel.yaml.resolver import VersionedResolver, Resolver  # NOQA
+from ruamel.yaml.representer import (
+    BaseRepresenter,
+    SafeRepresenter,
+    Representer,
+    RoundTripRepresenter,
+)
+from ruamel.yaml.constructor import (
+    BaseConstructor,
+    SafeConstructor,
+    Constructor,
+    RoundTripConstructor,
+)
+from ruamel.yaml.loader import Loader as UnsafeLoader
+
+if False:  # MYPY
+    from typing import List, Set, Dict, Union, Any, Callable  # NOQA
+    from ruamel.yaml.compat import StreamType, StreamTextType, VersionType  # NOQA
+
+    if PY3:
+        from pathlib import Path
+    else:
+        Path = Any
+
+try:
+    from _ruamel_yaml import CParser, CEmitter  # type: ignore
+except:  # NOQA
+    CParser = CEmitter = None
+
+# import io
+
+enforce = object()
+
+
+# YAML is an acronym, i.e. spoken: rhymes with "camel". And thus a
+# subset of abbreviations, which should be all caps according to PEP8
+
+
+class YAML(object):
+    def __init__(
+        self, _kw=enforce, typ=None, pure=False, output=None, plug_ins=None  # input=None,
+    ):
+        # type: (Any, Optional[Text], Any, Any, Any) -> None
+        """
+        _kw: not used, forces keyword arguments in 2.7 (in 3 you can do (*, safe_load=..)
+        typ: 'rt'/None -> RoundTripLoader/RoundTripDumper,  (default)
+             'safe'    -> SafeLoader/SafeDumper,
+             'unsafe'  -> normal/unsafe Loader/Dumper
+             'base'    -> baseloader
+        pure: if True only use Python modules
+        input/output: needed to work as context manager
+        plug_ins: a list of plug-in files
+        """
+        if _kw is not enforce:
+            raise TypeError(
+                '{}.__init__() takes no positional argument but at least '
+                'one was given ({!r})'.format(self.__class__.__name__, _kw)
+            )
+
+        self.typ = 'rt' if typ is None else typ
+        self.pure = pure
+
+        # self._input = input
+        self._output = output
+        self._context_manager = None  # type: Any
+
+        self.plug_ins = []  # type: List[Any]
+        for pu in ([] if plug_ins is None else plug_ins) + self.official_plug_ins():
+            file_name = pu.replace(os.sep, '.')
+            self.plug_ins.append(import_module(file_name))
+        self.Resolver = ruamel.yaml.resolver.VersionedResolver  # type: Any
+        self.allow_unicode = True
+        self.Reader = None  # type: Any
+        self.Scanner = None  # type: Any
+        self.Serializer = None  # type: Any
+        self.default_flow_style = None  # type: Any
+        if self.typ == 'rt':
+            self.default_flow_style = False
+            # no optimized rt-dumper yet
+            self.Emitter = ruamel.yaml.emitter.Emitter  # type: Any
+            self.Serializer = ruamel.yaml.serializer.Serializer  # type: Any
+            self.Representer = ruamel.yaml.representer.RoundTripRepresenter  # type: Any
+            self.Scanner = ruamel.yaml.scanner.RoundTripScanner  # type: Any
+            # no optimized rt-parser yet
+            self.Parser = ruamel.yaml.parser.RoundTripParser  # type: Any
+            self.Composer = ruamel.yaml.composer.Composer  # type: Any
+            self.Constructor = ruamel.yaml.constructor.RoundTripConstructor  # type: Any
+        elif self.typ == 'safe':
+            self.Emitter = (
+                ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter
+            )
+            self.Representer = ruamel.yaml.representer.SafeRepresenter
+            self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
+            self.Composer = ruamel.yaml.composer.Composer
+            self.Constructor = ruamel.yaml.constructor.SafeConstructor
+        elif self.typ == 'base':
+            self.Emitter = ruamel.yaml.emitter.Emitter
+            self.Representer = ruamel.yaml.representer.BaseRepresenter
+            self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
+            self.Composer = ruamel.yaml.composer.Composer
+            self.Constructor = ruamel.yaml.constructor.BaseConstructor
+        elif self.typ == 'unsafe':
+            self.Emitter = (
+                ruamel.yaml.emitter.Emitter if pure or CEmitter is None else CEmitter
+            )
+            self.Representer = ruamel.yaml.representer.Representer
+            self.Parser = ruamel.yaml.parser.Parser if pure or CParser is None else CParser
+            self.Composer = ruamel.yaml.composer.Composer
+            self.Constructor = ruamel.yaml.constructor.Constructor
+        else:
+            for module in self.plug_ins:
+                if getattr(module, 'typ', None) == self.typ:
+                    module.init_typ(self)
+                    break
+            else:
+                raise NotImplementedError(
+                    'typ "{}"not recognised (need to install plug-in?)'.format(self.typ)
+                )
+        self.stream = None
+        self.canonical = None
+        self.old_indent = None
+        self.width = None
+        self.line_break = None
+
+        self.map_indent = None
+        self.sequence_indent = None
+        self.sequence_dash_offset = 0
+        self.compact_seq_seq = None
+        self.compact_seq_map = None
+        self.sort_base_mapping_type_on_output = None  # default: sort
+
+        self.top_level_colon_align = None
+        self.prefix_colon = None
+        self.version = None
+        self.preserve_quotes = None
+        self.allow_duplicate_keys = False  # duplicate keys in map, set
+        self.encoding = 'utf-8'
+        self.explicit_start = None
+        self.explicit_end = None
+        self.tags = None
+        self.default_style = None
+        self.top_level_block_style_scalar_no_indent_error_1_1 = False
+        # [a, b: 1, c: {d: 2}]  vs. [a, {b: 1}, {c: {d: 2}}]
+        self.brace_single_entry_mapping_in_flow_sequence = False
+
+    @property
+    def reader(self):
+        # type: () -> Any
+        try:
+            return self._reader  # type: ignore
+        except AttributeError:
+            self._reader = self.Reader(None, loader=self)
+            return self._reader
+
+    @property
+    def scanner(self):
+        # type: () -> Any
+        try:
+            return self._scanner  # type: ignore
+        except AttributeError:
+            self._scanner = self.Scanner(loader=self)
+            return self._scanner
+
+    @property
+    def parser(self):
+        # type: () -> Any
+        attr = '_' + sys._getframe().f_code.co_name
+        if not hasattr(self, attr):
+            if self.Parser is not CParser:
+                setattr(self, attr, self.Parser(loader=self))
+            else:
+                if getattr(self, '_stream', None) is None:
+                    # wait for the stream
+                    return None
+                else:
+                    # if not hasattr(self._stream, 'read') and hasattr(self._stream, 'open'):
+                    #     # pathlib.Path() instance
+                    #     setattr(self, attr, CParser(self._stream))
+                    # else:
+                    setattr(self, attr, CParser(self._stream))
+                    # self._parser = self._composer = self
+                    # nprint('scanner', self.loader.scanner)
+
+        return getattr(self, attr)
+
+    @property
+    def composer(self):
+        # type: () -> Any
+        attr = '_' + sys._getframe().f_code.co_name
+        if not hasattr(self, attr):
+            setattr(self, attr, self.Composer(loader=self))
+        return getattr(self, attr)
+
+    @property
+    def constructor(self):
+        # type: () -> Any
+        attr = '_' + sys._getframe().f_code.co_name
+        if not hasattr(self, attr):
+            cnst = self.Constructor(preserve_quotes=self.preserve_quotes, loader=self)
+            cnst.allow_duplicate_keys = self.allow_duplicate_keys
+            setattr(self, attr, cnst)
+        return getattr(self, attr)
+
+    @property
+    def resolver(self):
+        # type: () -> Any
+        attr = '_' + sys._getframe().f_code.co_name
+        if not hasattr(self, attr):
+            setattr(self, attr, self.Resolver(version=self.version, loader=self))
+        return getattr(self, attr)
+
+    @property
+    def emitter(self):
+        # type: () -> Any
+        attr = '_' + sys._getframe().f_code.co_name
+        if not hasattr(self, attr):
+            if self.Emitter is not CEmitter:
+                _emitter = self.Emitter(
+                    None,
+                    canonical=self.canonical,
+                    indent=self.old_indent,
+                    width=self.width,
+                    allow_unicode=self.allow_unicode,
+                    line_break=self.line_break,
+                    prefix_colon=self.prefix_colon,
+                    brace_single_entry_mapping_in_flow_sequence=self.brace_single_entry_mapping_in_flow_sequence,  # NOQA
+                    dumper=self,
+                )
+                setattr(self, attr, _emitter)
+                if self.map_indent is not None:
+                    _emitter.best_map_indent = self.map_indent
+                if self.sequence_indent is not None:
+                    _emitter.best_sequence_indent = self.sequence_indent
+                if self.sequence_dash_offset is not None:
+                    _emitter.sequence_dash_offset = self.sequence_dash_offset
+                    # _emitter.block_seq_indent = self.sequence_dash_offset
+                if self.compact_seq_seq is not None:
+                    _emitter.compact_seq_seq = self.compact_seq_seq
+                if self.compact_seq_map is not None:
+                    _emitter.compact_seq_map = self.compact_seq_map
+            else:
+                if getattr(self, '_stream', None) is None:
+                    # wait for the stream
+                    return None
+                return None
+        return getattr(self, attr)
+
+    @property
+    def serializer(self):
+        # type: () -> Any
+        attr = '_' + sys._getframe().f_code.co_name
+        if not hasattr(self, attr):
+            setattr(
+                self,
+                attr,
+                self.Serializer(
+                    encoding=self.encoding,
+                    explicit_start=self.explicit_start,
+                    explicit_end=self.explicit_end,
+                    version=self.version,
+                    tags=self.tags,
+                    dumper=self,
+                ),
+            )
+        return getattr(self, attr)
+
+    @property
+    def representer(self):
+        # type: () -> Any
+        attr = '_' + sys._getframe().f_code.co_name
+        if not hasattr(self, attr):
+            repres = self.Representer(
+                default_style=self.default_style,
+                default_flow_style=self.default_flow_style,
+                dumper=self,
+            )
+            if self.sort_base_mapping_type_on_output is not None:
+                repres.sort_base_mapping_type_on_output = self.sort_base_mapping_type_on_output
+            setattr(self, attr, repres)
+        return getattr(self, attr)
+
+    # separate output resolver?
+
+    # def load(self, stream=None):
+    #     if self._context_manager:
+    #        if not self._input:
+    #             raise TypeError("Missing input stream while dumping from context manager")
+    #         for data in self._context_manager.load():
+    #             yield data
+    #         return
+    #     if stream is None:
+    #         raise TypeError("Need a stream argument when not loading from context manager")
+    #     return self.load_one(stream)
+
+    def load(self, stream):
+        # type: (Union[Path, StreamTextType]) -> Any
+        """
+        at this point you either have the non-pure Parser (which has its own reader and
+        scanner) or you have the pure Parser.
+        If the pure Parser is set, then set the Reader and Scanner, if not already set.
+        If either the Scanner or Reader are set, you cannot use the non-pure Parser,
+            so reset it to the pure parser and set the Reader resp. Scanner if necessary
+        """
+        if not hasattr(stream, 'read') and hasattr(stream, 'open'):
+            # pathlib.Path() instance
+            with stream.open('rb') as fp:  # type: ignore
+                return self.load(fp)
+        constructor, parser = self.get_constructor_parser(stream)
+        try:
+            return constructor.get_single_data()
+        finally:
+            parser.dispose()
+            try:
+                self._reader.reset_reader()
+            except AttributeError:
+                pass
+            try:
+                self._scanner.reset_scanner()
+            except AttributeError:
+                pass
+
+    def load_all(self, stream, _kw=enforce):  # , skip=None):
+        # type: (Union[Path, StreamTextType], Any) -> Any
+        if _kw is not enforce:
+            raise TypeError(
+                '{}.__init__() takes no positional argument but at least '
+                'one was given ({!r})'.format(self.__class__.__name__, _kw)
+            )
+        if not hasattr(stream, 'read') and hasattr(stream, 'open'):
+            # pathlib.Path() instance
+            with stream.open('r') as fp:  # type: ignore
+                for d in self.load_all(fp, _kw=enforce):
+                    yield d
+                return
+        # if skip is None:
+        #     skip = []
+        # elif isinstance(skip, int):
+        #     skip = [skip]
+        constructor, parser = self.get_constructor_parser(stream)
+        try:
+            while constructor.check_data():
+                yield constructor.get_data()
+        finally:
+            parser.dispose()
+            try:
+                self._reader.reset_reader()
+            except AttributeError:
+                pass
+            try:
+                self._scanner.reset_scanner()
+            except AttributeError:
+                pass
+
+    def get_constructor_parser(self, stream):
+        # type: (StreamTextType) -> Any
+        """
+        the old cyaml needs special setup, and therefore the stream
+        """
+        if self.Parser is not CParser:
+            if self.Reader is None:
+                self.Reader = ruamel.yaml.reader.Reader
+            if self.Scanner is None:
+                self.Scanner = ruamel.yaml.scanner.Scanner
+            self.reader.stream = stream
+        else:
+            if self.Reader is not None:
+                if self.Scanner is None:
+                    self.Scanner = ruamel.yaml.scanner.Scanner
+                self.Parser = ruamel.yaml.parser.Parser
+                self.reader.stream = stream
+            elif self.Scanner is not None:
+                if self.Reader is None:
+                    self.Reader = ruamel.yaml.reader.Reader
+                self.Parser = ruamel.yaml.parser.Parser
+                self.reader.stream = stream
+            else:
+                # combined C level reader>scanner>parser
+                # does some calls to the resolver, e.g. BaseResolver.descend_resolver
+                # if you just initialise the CParser, to much of resolver.py
+                # is actually used
+                rslvr = self.Resolver
+                # if rslvr is ruamel.yaml.resolver.VersionedResolver:
+                #     rslvr = ruamel.yaml.resolver.Resolver
+
+                class XLoader(self.Parser, self.Constructor, rslvr):  # type: ignore
+                    def __init__(selfx, stream, version=self.version, preserve_quotes=None):
+                        # type: (StreamTextType, Optional[VersionType], Optional[bool]) -> None  # NOQA
+                        CParser.__init__(selfx, stream)
+                        selfx._parser = selfx._composer = selfx
+                        self.Constructor.__init__(selfx, loader=selfx)
+                        selfx.allow_duplicate_keys = self.allow_duplicate_keys
+                        rslvr.__init__(selfx, version=version, loadumper=selfx)
+
+                self._stream = stream
+                loader = XLoader(stream)
+                return loader, loader
+        return self.constructor, self.parser
+
+    def dump(self, data, stream=None, _kw=enforce, transform=None):
+        # type: (Any, Union[Path, StreamType], Any, Any) -> Any
+        if self._context_manager:
+            if not self._output:
+                raise TypeError('Missing output stream while dumping from context manager')
+            if _kw is not enforce:
+                raise TypeError(
+                    '{}.dump() takes one positional argument but at least '
+                    'two were given ({!r})'.format(self.__class__.__name__, _kw)
+                )
+            if transform is not None:
+                raise TypeError(
+                    '{}.dump() in the context manager cannot have transform keyword '
+                    ''.format(self.__class__.__name__)
+                )
+            self._context_manager.dump(data)
+        else:  # old style
+            if stream is None:
+                raise TypeError('Need a stream argument when not dumping from context manager')
+            return self.dump_all([data], stream, _kw, transform=transform)
+
+    def dump_all(self, documents, stream, _kw=enforce, transform=None):
+        # type: (Any, Union[Path, StreamType], Any, Any) -> Any
+        if self._context_manager:
+            raise NotImplementedError
+        if _kw is not enforce:
+            raise TypeError(
+                '{}.dump(_all) takes two positional argument but at least '
+                'three were given ({!r})'.format(self.__class__.__name__, _kw)
+            )
+        self._output = stream
+        self._context_manager = YAMLContextManager(self, transform=transform)
+        for data in documents:
+            self._context_manager.dump(data)
+        self._context_manager.teardown_output()
+        self._output = None
+        self._context_manager = None
+
+    def Xdump_all(self, documents, stream, _kw=enforce, transform=None):
+        # type: (Any, Union[Path, StreamType], Any, Any) -> Any
+        """
+        Serialize a sequence of Python objects into a YAML stream.
+        """
+        if not hasattr(stream, 'write') and hasattr(stream, 'open'):
+            # pathlib.Path() instance
+            with stream.open('w') as fp:
+                return self.dump_all(documents, fp, _kw, transform=transform)
+        if _kw is not enforce:
+            raise TypeError(
+                '{}.dump(_all) takes two positional argument but at least '
+                'three were given ({!r})'.format(self.__class__.__name__, _kw)
+            )
+        # The stream should have the methods `write` and possibly `flush`.
+        if self.top_level_colon_align is True:
+            tlca = max([len(str(x)) for x in documents[0]])  # type: Any
+        else:
+            tlca = self.top_level_colon_align
+        if transform is not None:
+            fstream = stream
+            if self.encoding is None:
+                stream = StringIO()
+            else:
+                stream = BytesIO()
+        serializer, representer, emitter = self.get_serializer_representer_emitter(
+            stream, tlca
+        )
+        try:
+            self.serializer.open()
+            for data in documents:
+                try:
+                    self.representer.represent(data)
+                except AttributeError:
+                    # nprint(dir(dumper._representer))
+                    raise
+            self.serializer.close()
+        finally:
+            try:
+                self.emitter.dispose()
+            except AttributeError:
+                raise
+                # self.dumper.dispose()  # cyaml
+            delattr(self, '_serializer')
+            delattr(self, '_emitter')
+        if transform:
+            val = stream.getvalue()
+            if self.encoding:
+                val = val.decode(self.encoding)
+            if fstream is None:
+                transform(val)
+            else:
+                fstream.write(transform(val))
+        return None
+
+    def get_serializer_representer_emitter(self, stream, tlca):
+        # type: (StreamType, Any) -> Any
+        # we have only .Serializer to deal with (vs .Reader & .Scanner), much simpler
+        if self.Emitter is not CEmitter:
+            if self.Serializer is None:
+                self.Serializer = ruamel.yaml.serializer.Serializer
+            self.emitter.stream = stream
+            self.emitter.top_level_colon_align = tlca
+            return self.serializer, self.representer, self.emitter
+        if self.Serializer is not None:
+            # cannot set serializer with CEmitter
+            self.Emitter = ruamel.yaml.emitter.Emitter
+            self.emitter.stream = stream
+            self.emitter.top_level_colon_align = tlca
+            return self.serializer, self.representer, self.emitter
+        # C routines
+
+        rslvr = (
+            ruamel.yaml.resolver.BaseResolver
+            if self.typ == 'base'
+            else ruamel.yaml.resolver.Resolver
+        )
+
+        class XDumper(CEmitter, self.Representer, rslvr):  # type: ignore
+            def __init__(
+                selfx,
+                stream,
+                default_style=None,
+                default_flow_style=None,
+                canonical=None,
+                indent=None,
+                width=None,
+                allow_unicode=None,
+                line_break=None,
+                encoding=None,
+                explicit_start=None,
+                explicit_end=None,
+                version=None,
+                tags=None,
+                block_seq_indent=None,
+                top_level_colon_align=None,
+                prefix_colon=None,
+            ):
+                # type: (StreamType, Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Any, Any, Any, Any, Any) -> None   # NOQA
+                CEmitter.__init__(
+                    selfx,
+                    stream,
+                    canonical=canonical,
+                    indent=indent,
+                    width=width,
+                    encoding=encoding,
+                    allow_unicode=allow_unicode,
+                    line_break=line_break,
+                    explicit_start=explicit_start,
+                    explicit_end=explicit_end,
+                    version=version,
+                    tags=tags,
+                )
+                selfx._emitter = selfx._serializer = selfx._representer = selfx
+                self.Representer.__init__(
+                    selfx, default_style=default_style, default_flow_style=default_flow_style
+                )
+                rslvr.__init__(selfx)
+
+        self._stream = stream
+        dumper = XDumper(
+            stream,
+            default_style=self.default_style,
+            default_flow_style=self.default_flow_style,
+            canonical=self.canonical,
+            indent=self.old_indent,
+            width=self.width,
+            allow_unicode=self.allow_unicode,
+            line_break=self.line_break,
+            explicit_start=self.explicit_start,
+            explicit_end=self.explicit_end,
+            version=self.version,
+            tags=self.tags,
+        )
+        self._emitter = self._serializer = dumper
+        return dumper, dumper, dumper
+
+    # basic types
+    def map(self, **kw):
+        # type: (Any) -> Any
+        if self.typ == 'rt':
+            from ruamel.yaml.comments import CommentedMap
+
+            return CommentedMap(**kw)
+        else:
+            return dict(**kw)
+
+    def seq(self, *args):
+        # type: (Any) -> Any
+        if self.typ == 'rt':
+            from ruamel.yaml.comments import CommentedSeq
+
+            return CommentedSeq(*args)
+        else:
+            return list(*args)
+
+    # helpers
+    def official_plug_ins(self):
+        # type: () -> Any
+        bd = os.path.dirname(__file__)
+        gpbd = os.path.dirname(os.path.dirname(bd))
+        res = [x.replace(gpbd, "")[1:-3] for x in glob.glob(bd + '/*/__plug_in__.py')]
+        return res
+
+    def register_class(self, cls):
+        # type:(Any) -> Any
+        """
+        register a class for dumping loading
+        - if it has attribute yaml_tag use that to register, else use class name
+        - if it has methods to_yaml/from_yaml use those to dump/load else dump attributes
+          as mapping
+        """
+        tag = getattr(cls, 'yaml_tag', '!' + cls.__name__)
+        try:
+            self.representer.add_representer(cls, cls.to_yaml)
+        except AttributeError:
+
+            def t_y(representer, data):
+                # type: (Any, Any) -> Any
+                return representer.represent_yaml_object(
+                    tag, data, cls, flow_style=representer.default_flow_style
+                )
+
+            self.representer.add_representer(cls, t_y)
+        try:
+            self.constructor.add_constructor(tag, cls.from_yaml)
+        except AttributeError:
+
+            def f_y(constructor, node):
+                # type: (Any, Any) -> Any
+                return constructor.construct_yaml_object(node, cls)
+
+            self.constructor.add_constructor(tag, f_y)
+        return cls
+
+    def parse(self, stream):
+        # type: (StreamTextType) -> Any
+        """
+        Parse a YAML stream and produce parsing events.
+        """
+        _, parser = self.get_constructor_parser(stream)
+        try:
+            while parser.check_event():
+                yield parser.get_event()
+        finally:
+            parser.dispose()
+            try:
+                self._reader.reset_reader()
+            except AttributeError:
+                pass
+            try:
+                self._scanner.reset_scanner()
+            except AttributeError:
+                pass
+
+    # ### context manager
+
+    def __enter__(self):
+        # type: () -> Any
+        self._context_manager = YAMLContextManager(self)
+        return self
+
+    def __exit__(self, typ, value, traceback):
+        # type: (Any, Any, Any) -> None
+        if typ:
+            nprint('typ', typ)
+        self._context_manager.teardown_output()
+        # self._context_manager.teardown_input()
+        self._context_manager = None
+
+    # ### backwards compatibility
+    def _indent(self, mapping=None, sequence=None, offset=None):
+        # type: (Any, Any, Any) -> None
+        if mapping is not None:
+            self.map_indent = mapping
+        if sequence is not None:
+            self.sequence_indent = sequence
+        if offset is not None:
+            self.sequence_dash_offset = offset
+
+    @property
+    def indent(self):
+        # type: () -> Any
+        return self._indent
+
+    @indent.setter
+    def indent(self, val):
+        # type: (Any) -> None
+        self.old_indent = val
+
+    @property
+    def block_seq_indent(self):
+        # type: () -> Any
+        return self.sequence_dash_offset
+
+    @block_seq_indent.setter
+    def block_seq_indent(self, val):
+        # type: (Any) -> None
+        self.sequence_dash_offset = val
+
+    def compact(self, seq_seq=None, seq_map=None):
+        # type: (Any, Any) -> None
+        self.compact_seq_seq = seq_seq
+        self.compact_seq_map = seq_map
+
+
+class YAMLContextManager(object):
+    def __init__(self, yaml, transform=None):
+        # type: (Any, Any) -> None  # used to be: (Any, Optional[Callable]) -> None
+        self._yaml = yaml
+        self._output_inited = False
+        self._output_path = None
+        self._output = self._yaml._output
+        self._transform = transform
+
+        # self._input_inited = False
+        # self._input = input
+        # self._input_path = None
+        # self._transform = yaml.transform
+        # self._fstream = None
+
+        if not hasattr(self._output, 'write') and hasattr(self._output, 'open'):
+            # pathlib.Path() instance, open with the same mode
+            self._output_path = self._output
+            self._output = self._output_path.open('w')
+
+        # if not hasattr(self._stream, 'write') and hasattr(stream, 'open'):
+        # if not hasattr(self._input, 'read') and hasattr(self._input, 'open'):
+        #    # pathlib.Path() instance, open with the same mode
+        #    self._input_path = self._input
+        #    self._input = self._input_path.open('r')
+
+        if self._transform is not None:
+            self._fstream = self._output
+            if self._yaml.encoding is None:
+                self._output = StringIO()
+            else:
+                self._output = BytesIO()
+
+    def teardown_output(self):
+        # type: () -> None
+        if self._output_inited:
+            self._yaml.serializer.close()
+        else:
+            return
+        try:
+            self._yaml.emitter.dispose()
+        except AttributeError:
+            raise
+            # self.dumper.dispose()  # cyaml
+        try:
+            delattr(self._yaml, '_serializer')
+            delattr(self._yaml, '_emitter')
+        except AttributeError:
+            raise
+        if self._transform:
+            val = self._output.getvalue()
+            if self._yaml.encoding:
+                val = val.decode(self._yaml.encoding)
+            if self._fstream is None:
+                self._transform(val)
+            else:
+                self._fstream.write(self._transform(val))
+                self._fstream.flush()
+                self._output = self._fstream  # maybe not necessary
+        if self._output_path is not None:
+            self._output.close()
+
+    def init_output(self, first_data):
+        # type: (Any) -> None
+        if self._yaml.top_level_colon_align is True:
+            tlca = max([len(str(x)) for x in first_data])  # type: Any
+        else:
+            tlca = self._yaml.top_level_colon_align
+        self._yaml.get_serializer_representer_emitter(self._output, tlca)
+        self._yaml.serializer.open()
+        self._output_inited = True
+
+    def dump(self, data):
+        # type: (Any) -> None
+        if not self._output_inited:
+            self.init_output(data)
+        try:
+            self._yaml.representer.represent(data)
+        except AttributeError:
+            # nprint(dir(dumper._representer))
+            raise
+
+    # def teardown_input(self):
+    #     pass
+    #
+    # def init_input(self):
+    #     # set the constructor and parser on YAML() instance
+    #     self._yaml.get_constructor_parser(stream)
+    #
+    # def load(self):
+    #     if not self._input_inited:
+    #         self.init_input()
+    #     try:
+    #         while self._yaml.constructor.check_data():
+    #             yield self._yaml.constructor.get_data()
+    #     finally:
+    #         parser.dispose()
+    #         try:
+    #             self._reader.reset_reader()  # type: ignore
+    #         except AttributeError:
+    #             pass
+    #         try:
+    #             self._scanner.reset_scanner()  # type: ignore
+    #         except AttributeError:
+    #             pass
+
+
+def yaml_object(yml):
+    # type: (Any) -> Any
+    """ decorator for classes that needs to dump/load objects
+    The tag for such objects is taken from the class attribute yaml_tag (or the
+    class name in lowercase in case unavailable)
+    If methods to_yaml and/or from_yaml are available, these are called for dumping resp.
+    loading, default routines (dumping a mapping of the attributes) used otherwise.
+    """
+
+    def yo_deco(cls):
+        # type: (Any) -> Any
+        tag = getattr(cls, 'yaml_tag', '!' + cls.__name__)
+        try:
+            yml.representer.add_representer(cls, cls.to_yaml)
+        except AttributeError:
+
+            def t_y(representer, data):
+                # type: (Any, Any) -> Any
+                return representer.represent_yaml_object(
+                    tag, data, cls, flow_style=representer.default_flow_style
+                )
+
+            yml.representer.add_representer(cls, t_y)
+        try:
+            yml.constructor.add_constructor(tag, cls.from_yaml)
+        except AttributeError:
+
+            def f_y(constructor, node):
+                # type: (Any, Any) -> Any
+                return constructor.construct_yaml_object(node, cls)
+
+            yml.constructor.add_constructor(tag, f_y)
+        return cls
+
+    return yo_deco
+
+
+########################################################################################
+
+
+def scan(stream, Loader=Loader):
+    # type: (StreamTextType, Any) -> Any
+    """
+    Scan a YAML stream and produce scanning tokens.
+    """
+    loader = Loader(stream)
+    try:
+        while loader.scanner.check_token():
+            yield loader.scanner.get_token()
+    finally:
+        loader._parser.dispose()
+
+
+def parse(stream, Loader=Loader):
+    # type: (StreamTextType, Any) -> Any
+    """
+    Parse a YAML stream and produce parsing events.
+    """
+    loader = Loader(stream)
+    try:
+        while loader._parser.check_event():
+            yield loader._parser.get_event()
+    finally:
+        loader._parser.dispose()
+
+
+def compose(stream, Loader=Loader):
+    # type: (StreamTextType, Any) -> Any
+    """
+    Parse the first YAML document in a stream
+    and produce the corresponding representation tree.
+    """
+    loader = Loader(stream)
+    try:
+        return loader.get_single_node()
+    finally:
+        loader.dispose()
+
+
+def compose_all(stream, Loader=Loader):
+    # type: (StreamTextType, Any) -> Any
+    """
+    Parse all YAML documents in a stream
+    and produce corresponding representation trees.
+    """
+    loader = Loader(stream)
+    try:
+        while loader.check_node():
+            yield loader._composer.get_node()
+    finally:
+        loader._parser.dispose()
+
+
+def load(stream, Loader=None, version=None, preserve_quotes=None):
+    # type: (StreamTextType, Any, Optional[VersionType], Any) -> Any
+    """
+    Parse the first YAML document in a stream
+    and produce the corresponding Python object.
+    """
+    if Loader is None:
+        warnings.warn(UnsafeLoaderWarning.text, UnsafeLoaderWarning, stacklevel=2)
+        Loader = UnsafeLoader
+    loader = Loader(stream, version, preserve_quotes=preserve_quotes)
+    try:
+        return loader._constructor.get_single_data()
+    finally:
+        loader._parser.dispose()
+        try:
+            loader._reader.reset_reader()
+        except AttributeError:
+            pass
+        try:
+            loader._scanner.reset_scanner()
+        except AttributeError:
+            pass
+
+
+def load_all(stream, Loader=None, version=None, preserve_quotes=None):
+    # type: (Optional[StreamTextType], Any, Optional[VersionType], Optional[bool]) -> Any  # NOQA
+    """
+    Parse all YAML documents in a stream
+    and produce corresponding Python objects.
+    """
+    if Loader is None:
+        warnings.warn(UnsafeLoaderWarning.text, UnsafeLoaderWarning, stacklevel=2)
+        Loader = UnsafeLoader
+    loader = Loader(stream, version, preserve_quotes=preserve_quotes)
+    try:
+        while loader._constructor.check_data():
+            yield loader._constructor.get_data()
+    finally:
+        loader._parser.dispose()
+        try:
+            loader._reader.reset_reader()
+        except AttributeError:
+            pass
+        try:
+            loader._scanner.reset_scanner()
+        except AttributeError:
+            pass
+
+
+def safe_load(stream, version=None):
+    # type: (StreamTextType, Optional[VersionType]) -> Any
+    """
+    Parse the first YAML document in a stream
+    and produce the corresponding Python object.
+    Resolve only basic YAML tags.
+    """
+    return load(stream, SafeLoader, version)
+
+
+def safe_load_all(stream, version=None):
+    # type: (StreamTextType, Optional[VersionType]) -> Any
+    """
+    Parse all YAML documents in a stream
+    and produce corresponding Python objects.
+    Resolve only basic YAML tags.
+    """
+    return load_all(stream, SafeLoader, version)
+
+
+def round_trip_load(stream, version=None, preserve_quotes=None):
+    # type: (StreamTextType, Optional[VersionType], Optional[bool]) -> Any
+    """
+    Parse the first YAML document in a stream
+    and produce the corresponding Python object.
+    Resolve only basic YAML tags.
+    """
+    return load(stream, RoundTripLoader, version, preserve_quotes=preserve_quotes)
+
+
+def round_trip_load_all(stream, version=None, preserve_quotes=None):
+    # type: (StreamTextType, Optional[VersionType], Optional[bool]) -> Any
+    """
+    Parse all YAML documents in a stream
+    and produce corresponding Python objects.
+    Resolve only basic YAML tags.
+    """
+    return load_all(stream, RoundTripLoader, version, preserve_quotes=preserve_quotes)
+
+
+def emit(
+    events,
+    stream=None,
+    Dumper=Dumper,
+    canonical=None,
+    indent=None,
+    width=None,
+    allow_unicode=None,
+    line_break=None,
+):
+    # type: (Any, Optional[StreamType], Any, Optional[bool], Union[int, None], Optional[int], Optional[bool], Any) -> Any  # NOQA
+    """
+    Emit YAML parsing events into a stream.
+    If stream is None, return the produced string instead.
+    """
+    getvalue = None
+    if stream is None:
+        stream = StringIO()
+        getvalue = stream.getvalue
+    dumper = Dumper(
+        stream,
+        canonical=canonical,
+        indent=indent,
+        width=width,
+        allow_unicode=allow_unicode,
+        line_break=line_break,
+    )
+    try:
+        for event in events:
+            dumper.emit(event)
+    finally:
+        try:
+            dumper._emitter.dispose()
+        except AttributeError:
+            raise
+            dumper.dispose()  # cyaml
+    if getvalue is not None:
+        return getvalue()
+
+
+enc = None if PY3 else 'utf-8'
+
+
+def serialize_all(
+    nodes,
+    stream=None,
+    Dumper=Dumper,
+    canonical=None,
+    indent=None,
+    width=None,
+    allow_unicode=None,
+    line_break=None,
+    encoding=enc,
+    explicit_start=None,
+    explicit_end=None,
+    version=None,
+    tags=None,
+):
+    # type: (Any, Optional[StreamType], Any, Any, Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Optional[VersionType], Any) -> Any # NOQA
+    """
+    Serialize a sequence of representation trees into a YAML stream.
+    If stream is None, return the produced string instead.
+    """
+    getvalue = None
+    if stream is None:
+        if encoding is None:
+            stream = StringIO()
+        else:
+            stream = BytesIO()
+        getvalue = stream.getvalue
+    dumper = Dumper(
+        stream,
+        canonical=canonical,
+        indent=indent,
+        width=width,
+        allow_unicode=allow_unicode,
+        line_break=line_break,
+        encoding=encoding,
+        version=version,
+        tags=tags,
+        explicit_start=explicit_start,
+        explicit_end=explicit_end,
+    )
+    try:
+        dumper._serializer.open()
+        for node in nodes:
+            dumper.serialize(node)
+        dumper._serializer.close()
+    finally:
+        try:
+            dumper._emitter.dispose()
+        except AttributeError:
+            raise
+            dumper.dispose()  # cyaml
+    if getvalue is not None:
+        return getvalue()
+
+
+def serialize(node, stream=None, Dumper=Dumper, **kwds):
+    # type: (Any, Optional[StreamType], Any, Any) -> Any
+    """
+    Serialize a representation tree into a YAML stream.
+    If stream is None, return the produced string instead.
+    """
+    return serialize_all([node], stream, Dumper=Dumper, **kwds)
+
+
+def dump_all(
+    documents,
+    stream=None,
+    Dumper=Dumper,
+    default_style=None,
+    default_flow_style=None,
+    canonical=None,
+    indent=None,
+    width=None,
+    allow_unicode=None,
+    line_break=None,
+    encoding=enc,
+    explicit_start=None,
+    explicit_end=None,
+    version=None,
+    tags=None,
+    block_seq_indent=None,
+    top_level_colon_align=None,
+    prefix_colon=None,
+):
+    # type: (Any, Optional[StreamType], Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Any, Any, Any, Any, Any) -> Optional[str]   # NOQA
+    """
+    Serialize a sequence of Python objects into a YAML stream.
+    If stream is None, return the produced string instead.
+    """
+    getvalue = None
+    if top_level_colon_align is True:
+        top_level_colon_align = max([len(str(x)) for x in documents[0]])
+    if stream is None:
+        if encoding is None:
+            stream = StringIO()
+        else:
+            stream = BytesIO()
+        getvalue = stream.getvalue
+    dumper = Dumper(
+        stream,
+        default_style=default_style,
+        default_flow_style=default_flow_style,
+        canonical=canonical,
+        indent=indent,
+        width=width,
+        allow_unicode=allow_unicode,
+        line_break=line_break,
+        encoding=encoding,
+        explicit_start=explicit_start,
+        explicit_end=explicit_end,
+        version=version,
+        tags=tags,
+        block_seq_indent=block_seq_indent,
+        top_level_colon_align=top_level_colon_align,
+        prefix_colon=prefix_colon,
+    )
+    try:
+        dumper._serializer.open()
+        for data in documents:
+            try:
+                dumper._representer.represent(data)
+            except AttributeError:
+                # nprint(dir(dumper._representer))
+                raise
+        dumper._serializer.close()
+    finally:
+        try:
+            dumper._emitter.dispose()
+        except AttributeError:
+            raise
+            dumper.dispose()  # cyaml
+    if getvalue is not None:
+        return getvalue()
+    return None
+
+
+def dump(
+    data,
+    stream=None,
+    Dumper=Dumper,
+    default_style=None,
+    default_flow_style=None,
+    canonical=None,
+    indent=None,
+    width=None,
+    allow_unicode=None,
+    line_break=None,
+    encoding=enc,
+    explicit_start=None,
+    explicit_end=None,
+    version=None,
+    tags=None,
+    block_seq_indent=None,
+):
+    # type: (Any, Optional[StreamType], Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Optional[VersionType], Any, Any) -> Optional[str]   # NOQA
+    """
+    Serialize a Python object into a YAML stream.
+    If stream is None, return the produced string instead.
+
+    default_style ∈ None, '', '"', "'", '|', '>'
+
+    """
+    return dump_all(
+        [data],
+        stream,
+        Dumper=Dumper,
+        default_style=default_style,
+        default_flow_style=default_flow_style,
+        canonical=canonical,
+        indent=indent,
+        width=width,
+        allow_unicode=allow_unicode,
+        line_break=line_break,
+        encoding=encoding,
+        explicit_start=explicit_start,
+        explicit_end=explicit_end,
+        version=version,
+        tags=tags,
+        block_seq_indent=block_seq_indent,
+    )
+
+
+def safe_dump_all(documents, stream=None, **kwds):
+    # type: (Any, Optional[StreamType], Any) -> Optional[str]
+    """
+    Serialize a sequence of Python objects into a YAML stream.
+    Produce only basic YAML tags.
+    If stream is None, return the produced string instead.
+    """
+    return dump_all(documents, stream, Dumper=SafeDumper, **kwds)
+
+
+def safe_dump(data, stream=None, **kwds):
+    # type: (Any, Optional[StreamType], Any) -> Optional[str]
+    """
+    Serialize a Python object into a YAML stream.
+    Produce only basic YAML tags.
+    If stream is None, return the produced string instead.
+    """
+    return dump_all([data], stream, Dumper=SafeDumper, **kwds)
+
+
+def round_trip_dump(
+    data,
+    stream=None,
+    Dumper=RoundTripDumper,
+    default_style=None,
+    default_flow_style=None,
+    canonical=None,
+    indent=None,
+    width=None,
+    allow_unicode=None,
+    line_break=None,
+    encoding=enc,
+    explicit_start=None,
+    explicit_end=None,
+    version=None,
+    tags=None,
+    block_seq_indent=None,
+    top_level_colon_align=None,
+    prefix_colon=None,
+):
+    # type: (Any, Optional[StreamType], Any, Any, Any, Optional[bool], Optional[int], Optional[int], Optional[bool], Any, Any, Optional[bool], Optional[bool], Optional[VersionType], Any, Any, Any, Any) -> Optional[str]   # NOQA
+    allow_unicode = True if allow_unicode is None else allow_unicode
+    return dump_all(
+        [data],
+        stream,
+        Dumper=Dumper,
+        default_style=default_style,
+        default_flow_style=default_flow_style,
+        canonical=canonical,
+        indent=indent,
+        width=width,
+        allow_unicode=allow_unicode,
+        line_break=line_break,
+        encoding=encoding,
+        explicit_start=explicit_start,
+        explicit_end=explicit_end,
+        version=version,
+        tags=tags,
+        block_seq_indent=block_seq_indent,
+        top_level_colon_align=top_level_colon_align,
+        prefix_colon=prefix_colon,
+    )
+
+
+# Loader/Dumper are no longer composites, to get to the associated
+# Resolver()/Representer(), etc., you need to instantiate the class
+
+
+def add_implicit_resolver(
+    tag, regexp, first=None, Loader=None, Dumper=None, resolver=Resolver
+):
+    # type: (Any, Any, Any, Any, Any, Any) -> None
+    """
+    Add an implicit scalar detector.
+    If an implicit scalar value matches the given regexp,
+    the corresponding tag is assigned to the scalar.
+    first is a sequence of possible initial characters or None.
+    """
+    if Loader is None and Dumper is None:
+        resolver.add_implicit_resolver(tag, regexp, first)
+        return
+    if Loader:
+        if hasattr(Loader, 'add_implicit_resolver'):
+            Loader.add_implicit_resolver(tag, regexp, first)
+        elif issubclass(
+            Loader, (BaseLoader, SafeLoader, ruamel.yaml.loader.Loader, RoundTripLoader)
+        ):
+            Resolver.add_implicit_resolver(tag, regexp, first)
+        else:
+            raise NotImplementedError
+    if Dumper:
+        if hasattr(Dumper, 'add_implicit_resolver'):
+            Dumper.add_implicit_resolver(tag, regexp, first)
+        elif issubclass(
+            Dumper, (BaseDumper, SafeDumper, ruamel.yaml.dumper.Dumper, RoundTripDumper)
+        ):
+            Resolver.add_implicit_resolver(tag, regexp, first)
+        else:
+            raise NotImplementedError
+
+
+# this code currently not tested
+def add_path_resolver(tag, path, kind=None, Loader=None, Dumper=None, resolver=Resolver):
+    # type: (Any, Any, Any, Any, Any, Any) -> None
+    """
+    Add a path based resolver for the given tag.
+    A path is a list of keys that forms a path
+    to a node in the representation tree.
+    Keys can be string values, integers, or None.
+    """
+    if Loader is None and Dumper is None:
+        resolver.add_path_resolver(tag, path, kind)
+        return
+    if Loader:
+        if hasattr(Loader, 'add_path_resolver'):
+            Loader.add_path_resolver(tag, path, kind)
+        elif issubclass(
+            Loader, (BaseLoader, SafeLoader, ruamel.yaml.loader.Loader, RoundTripLoader)
+        ):
+            Resolver.add_path_resolver(tag, path, kind)
+        else:
+            raise NotImplementedError
+    if Dumper:
+        if hasattr(Dumper, 'add_path_resolver'):
+            Dumper.add_path_resolver(tag, path, kind)
+        elif issubclass(
+            Dumper, (BaseDumper, SafeDumper, ruamel.yaml.dumper.Dumper, RoundTripDumper)
+        ):
+            Resolver.add_path_resolver(tag, path, kind)
+        else:
+            raise NotImplementedError
+
+
+def add_constructor(tag, object_constructor, Loader=None, constructor=Constructor):
+    # type: (Any, Any, Any, Any) -> None
+    """
+    Add an object constructor for the given tag.
+    object_onstructor is a function that accepts a Loader instance
+    and a node object and produces the corresponding Python object.
+    """
+    if Loader is None:
+        constructor.add_constructor(tag, object_constructor)
+    else:
+        if hasattr(Loader, 'add_constructor'):
+            Loader.add_constructor(tag, object_constructor)
+            return
+        if issubclass(Loader, BaseLoader):
+            BaseConstructor.add_constructor(tag, object_constructor)
+        elif issubclass(Loader, SafeLoader):
+            SafeConstructor.add_constructor(tag, object_constructor)
+        elif issubclass(Loader, Loader):
+            Constructor.add_constructor(tag, object_constructor)
+        elif issubclass(Loader, RoundTripLoader):
+            RoundTripConstructor.add_constructor(tag, object_constructor)
+        else:
+            raise NotImplementedError
+
+
+def add_multi_constructor(tag_prefix, multi_constructor, Loader=None, constructor=Constructor):
+    # type: (Any, Any, Any, Any) -> None
+    """
+    Add a multi-constructor for the given tag prefix.
+    Multi-constructor is called for a node if its tag starts with tag_prefix.
+    Multi-constructor accepts a Loader instance, a tag suffix,
+    and a node object and produces the corresponding Python object.
+    """
+    if Loader is None:
+        constructor.add_multi_constructor(tag_prefix, multi_constructor)
+    else:
+        if False and hasattr(Loader, 'add_multi_constructor'):
+            Loader.add_multi_constructor(tag_prefix, constructor)
+            return
+        if issubclass(Loader, BaseLoader):
+            BaseConstructor.add_multi_constructor(tag_prefix, multi_constructor)
+        elif issubclass(Loader, SafeLoader):
+            SafeConstructor.add_multi_constructor(tag_prefix, multi_constructor)
+        elif issubclass(Loader, ruamel.yaml.loader.Loader):
+            Constructor.add_multi_constructor(tag_prefix, multi_constructor)
+        elif issubclass(Loader, RoundTripLoader):
+            RoundTripConstructor.add_multi_constructor(tag_prefix, multi_constructor)
+        else:
+            raise NotImplementedError
+
+
+def add_representer(data_type, object_representer, Dumper=None, representer=Representer):
+    # type: (Any, Any, Any, Any) -> None
+    """
+    Add a representer for the given type.
+    object_representer is a function accepting a Dumper instance
+    and an instance of the given data type
+    and producing the corresponding representation node.
+    """
+    if Dumper is None:
+        representer.add_representer(data_type, object_representer)
+    else:
+        if hasattr(Dumper, 'add_representer'):
+            Dumper.add_representer(data_type, object_representer)
+            return
+        if issubclass(Dumper, BaseDumper):
+            BaseRepresenter.add_representer(data_type, object_representer)
+        elif issubclass(Dumper, SafeDumper):
+            SafeRepresenter.add_representer(data_type, object_representer)
+        elif issubclass(Dumper, Dumper):
+            Representer.add_representer(data_type, object_representer)
+        elif issubclass(Dumper, RoundTripDumper):
+            RoundTripRepresenter.add_representer(data_type, object_representer)
+        else:
+            raise NotImplementedError
+
+
+# this code currently not tested
+def add_multi_representer(data_type, multi_representer, Dumper=None, representer=Representer):
+    # type: (Any, Any, Any, Any) -> None
+    """
+    Add a representer for the given type.
+    multi_representer is a function accepting a Dumper instance
+    and an instance of the given data type or subtype
+    and producing the corresponding representation node.
+    """
+    if Dumper is None:
+        representer.add_multi_representer(data_type, multi_representer)
+    else:
+        if hasattr(Dumper, 'add_multi_representer'):
+            Dumper.add_multi_representer(data_type, multi_representer)
+            return
+        if issubclass(Dumper, BaseDumper):
+            BaseRepresenter.add_multi_representer(data_type, multi_representer)
+        elif issubclass(Dumper, SafeDumper):
+            SafeRepresenter.add_multi_representer(data_type, multi_representer)
+        elif issubclass(Dumper, Dumper):
+            Representer.add_multi_representer(data_type, multi_representer)
+        elif issubclass(Dumper, RoundTripDumper):
+            RoundTripRepresenter.add_multi_representer(data_type, multi_representer)
+        else:
+            raise NotImplementedError
+
+
+class YAMLObjectMetaclass(type):
+    """
+    The metaclass for YAMLObject.
+    """
+
+    def __init__(cls, name, bases, kwds):
+        # type: (Any, Any, Any) -> None
+        super(YAMLObjectMetaclass, cls).__init__(name, bases, kwds)
+        if 'yaml_tag' in kwds and kwds['yaml_tag'] is not None:
+            cls.yaml_constructor.add_constructor(cls.yaml_tag, cls.from_yaml)  # type: ignore
+            cls.yaml_representer.add_representer(cls, cls.to_yaml)  # type: ignore
+
+
+class YAMLObject(with_metaclass(YAMLObjectMetaclass)):  # type: ignore
+    """
+    An object that can dump itself to a YAML stream
+    and load itself from a YAML stream.
+    """
+
+    __slots__ = ()  # no direct instantiation, so allow immutable subclasses
+
+    yaml_constructor = Constructor
+    yaml_representer = Representer
+
+    yaml_tag = None  # type: Any
+    yaml_flow_style = None  # type: Any
+
+    @classmethod
+    def from_yaml(cls, constructor, node):
+        # type: (Any, Any) -> Any
+        """
+        Convert a representation node to a Python object.
+        """
+        return constructor.construct_yaml_object(node, cls)
+
+    @classmethod
+    def to_yaml(cls, representer, data):
+        # type: (Any, Any) -> Any
+        """
+        Convert a Python object to a representation node.
+        """
+        return representer.represent_yaml_object(
+            cls.yaml_tag, data, cls, flow_style=cls.yaml_flow_style
+        )