Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/ruamel/yaml/compat.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 # coding: utf-8 | |
2 | |
3 from __future__ import print_function | |
4 | |
5 # partially from package six by Benjamin Peterson | |
6 | |
7 import sys | |
8 import os | |
9 import types | |
10 import traceback | |
11 from abc import abstractmethod | |
12 | |
13 | |
14 # fmt: off | |
15 if False: # MYPY | |
16 from typing import Any, Dict, Optional, List, Union, BinaryIO, IO, Text, Tuple # NOQA | |
17 from typing import Optional # NOQA | |
18 # fmt: on | |
19 | |
20 _DEFAULT_YAML_VERSION = (1, 2) | |
21 | |
22 try: | |
23 from ruamel.ordereddict import ordereddict | |
24 except: # NOQA | |
25 try: | |
26 from collections import OrderedDict | |
27 except ImportError: | |
28 from ordereddict import OrderedDict # type: ignore | |
29 # to get the right name import ... as ordereddict doesn't do that | |
30 | |
31 class ordereddict(OrderedDict): # type: ignore | |
32 if not hasattr(OrderedDict, 'insert'): | |
33 | |
34 def insert(self, pos, key, value): | |
35 # type: (int, Any, Any) -> None | |
36 if pos >= len(self): | |
37 self[key] = value | |
38 return | |
39 od = ordereddict() | |
40 od.update(self) | |
41 for k in od: | |
42 del self[k] | |
43 for index, old_key in enumerate(od): | |
44 if pos == index: | |
45 self[key] = value | |
46 self[old_key] = od[old_key] | |
47 | |
48 | |
49 PY2 = sys.version_info[0] == 2 | |
50 PY3 = sys.version_info[0] == 3 | |
51 | |
52 | |
53 if PY3: | |
54 | |
55 def utf8(s): | |
56 # type: (str) -> str | |
57 return s | |
58 | |
59 def to_str(s): | |
60 # type: (str) -> str | |
61 return s | |
62 | |
63 def to_unicode(s): | |
64 # type: (str) -> str | |
65 return s | |
66 | |
67 | |
68 else: | |
69 if False: | |
70 unicode = str | |
71 | |
72 def utf8(s): | |
73 # type: (unicode) -> str | |
74 return s.encode('utf-8') | |
75 | |
76 def to_str(s): | |
77 # type: (str) -> str | |
78 return str(s) | |
79 | |
80 def to_unicode(s): | |
81 # type: (str) -> unicode | |
82 return unicode(s) # NOQA | |
83 | |
84 | |
85 if PY3: | |
86 string_types = str | |
87 integer_types = int | |
88 class_types = type | |
89 text_type = str | |
90 binary_type = bytes | |
91 | |
92 MAXSIZE = sys.maxsize | |
93 unichr = chr | |
94 import io | |
95 | |
96 StringIO = io.StringIO | |
97 BytesIO = io.BytesIO | |
98 # have unlimited precision | |
99 no_limit_int = int | |
100 from collections.abc import Hashable, MutableSequence, MutableMapping, Mapping # NOQA | |
101 | |
102 else: | |
103 string_types = basestring # NOQA | |
104 integer_types = (int, long) # NOQA | |
105 class_types = (type, types.ClassType) | |
106 text_type = unicode # NOQA | |
107 binary_type = str | |
108 | |
109 # to allow importing | |
110 unichr = unichr | |
111 from StringIO import StringIO as _StringIO | |
112 | |
113 StringIO = _StringIO | |
114 import cStringIO | |
115 | |
116 BytesIO = cStringIO.StringIO | |
117 # have unlimited precision | |
118 no_limit_int = long # NOQA not available on Python 3 | |
119 from collections import Hashable, MutableSequence, MutableMapping, Mapping # NOQA | |
120 | |
121 if False: # MYPY | |
122 # StreamType = Union[BinaryIO, IO[str], IO[unicode], StringIO] | |
123 # StreamType = Union[BinaryIO, IO[str], StringIO] # type: ignore | |
124 StreamType = Any | |
125 | |
126 StreamTextType = StreamType # Union[Text, StreamType] | |
127 VersionType = Union[List[int], str, Tuple[int, int]] | |
128 | |
129 if PY3: | |
130 builtins_module = 'builtins' | |
131 else: | |
132 builtins_module = '__builtin__' | |
133 | |
134 UNICODE_SIZE = 4 if sys.maxunicode > 65535 else 2 | |
135 | |
136 | |
137 def with_metaclass(meta, *bases): | |
138 # type: (Any, Any) -> Any | |
139 """Create a base class with a metaclass.""" | |
140 return meta('NewBase', bases, {}) | |
141 | |
142 | |
143 DBG_TOKEN = 1 | |
144 DBG_EVENT = 2 | |
145 DBG_NODE = 4 | |
146 | |
147 | |
148 _debug = None # type: Optional[int] | |
149 if 'RUAMELDEBUG' in os.environ: | |
150 _debugx = os.environ.get('RUAMELDEBUG') | |
151 if _debugx is None: | |
152 _debug = 0 | |
153 else: | |
154 _debug = int(_debugx) | |
155 | |
156 | |
157 if bool(_debug): | |
158 | |
159 class ObjectCounter(object): | |
160 def __init__(self): | |
161 # type: () -> None | |
162 self.map = {} # type: Dict[Any, Any] | |
163 | |
164 def __call__(self, k): | |
165 # type: (Any) -> None | |
166 self.map[k] = self.map.get(k, 0) + 1 | |
167 | |
168 def dump(self): | |
169 # type: () -> None | |
170 for k in sorted(self.map): | |
171 sys.stdout.write('{} -> {}'.format(k, self.map[k])) | |
172 | |
173 object_counter = ObjectCounter() | |
174 | |
175 | |
176 # used from yaml util when testing | |
177 def dbg(val=None): | |
178 # type: (Any) -> Any | |
179 global _debug | |
180 if _debug is None: | |
181 # set to true or false | |
182 _debugx = os.environ.get('YAMLDEBUG') | |
183 if _debugx is None: | |
184 _debug = 0 | |
185 else: | |
186 _debug = int(_debugx) | |
187 if val is None: | |
188 return _debug | |
189 return _debug & val | |
190 | |
191 | |
192 class Nprint(object): | |
193 def __init__(self, file_name=None): | |
194 # type: (Any) -> None | |
195 self._max_print = None # type: Any | |
196 self._count = None # type: Any | |
197 self._file_name = file_name | |
198 | |
199 def __call__(self, *args, **kw): | |
200 # type: (Any, Any) -> None | |
201 if not bool(_debug): | |
202 return | |
203 out = sys.stdout if self._file_name is None else open(self._file_name, 'a') | |
204 dbgprint = print # to fool checking for print statements by dv utility | |
205 kw1 = kw.copy() | |
206 kw1['file'] = out | |
207 dbgprint(*args, **kw1) | |
208 out.flush() | |
209 if self._max_print is not None: | |
210 if self._count is None: | |
211 self._count = self._max_print | |
212 self._count -= 1 | |
213 if self._count == 0: | |
214 dbgprint('forced exit\n') | |
215 traceback.print_stack() | |
216 out.flush() | |
217 sys.exit(0) | |
218 if self._file_name: | |
219 out.close() | |
220 | |
221 def set_max_print(self, i): | |
222 # type: (int) -> None | |
223 self._max_print = i | |
224 self._count = None | |
225 | |
226 | |
227 nprint = Nprint() | |
228 nprintf = Nprint('/var/tmp/ruamel.yaml.log') | |
229 | |
230 # char checkers following production rules | |
231 | |
232 | |
233 def check_namespace_char(ch): | |
234 # type: (Any) -> bool | |
235 if u'\x21' <= ch <= u'\x7E': # ! to ~ | |
236 return True | |
237 if u'\xA0' <= ch <= u'\uD7FF': | |
238 return True | |
239 if (u'\uE000' <= ch <= u'\uFFFD') and ch != u'\uFEFF': # excl. byte order mark | |
240 return True | |
241 if u'\U00010000' <= ch <= u'\U0010FFFF': | |
242 return True | |
243 return False | |
244 | |
245 | |
246 def check_anchorname_char(ch): | |
247 # type: (Any) -> bool | |
248 if ch in u',[]{}': | |
249 return False | |
250 return check_namespace_char(ch) | |
251 | |
252 | |
253 def version_tnf(t1, t2=None): | |
254 # type: (Any, Any) -> Any | |
255 """ | |
256 return True if ruamel.yaml version_info < t1, None if t2 is specified and bigger else False | |
257 """ | |
258 from ruamel.yaml import version_info # NOQA | |
259 | |
260 if version_info < t1: | |
261 return True | |
262 if t2 is not None and version_info < t2: | |
263 return None | |
264 return False | |
265 | |
266 | |
267 class MutableSliceableSequence(MutableSequence): # type: ignore | |
268 __slots__ = () | |
269 | |
270 def __getitem__(self, index): | |
271 # type: (Any) -> Any | |
272 if not isinstance(index, slice): | |
273 return self.__getsingleitem__(index) | |
274 return type(self)([self[i] for i in range(*index.indices(len(self)))]) # type: ignore | |
275 | |
276 def __setitem__(self, index, value): | |
277 # type: (Any, Any) -> None | |
278 if not isinstance(index, slice): | |
279 return self.__setsingleitem__(index, value) | |
280 assert iter(value) | |
281 # nprint(index.start, index.stop, index.step, index.indices(len(self))) | |
282 if index.step is None: | |
283 del self[index.start : index.stop] | |
284 for elem in reversed(value): | |
285 self.insert(0 if index.start is None else index.start, elem) | |
286 else: | |
287 range_parms = index.indices(len(self)) | |
288 nr_assigned_items = (range_parms[1] - range_parms[0] - 1) // range_parms[2] + 1 | |
289 # need to test before changing, in case TypeError is caught | |
290 if nr_assigned_items < len(value): | |
291 raise TypeError( | |
292 'too many elements in value {} < {}'.format(nr_assigned_items, len(value)) | |
293 ) | |
294 elif nr_assigned_items > len(value): | |
295 raise TypeError( | |
296 'not enough elements in value {} > {}'.format( | |
297 nr_assigned_items, len(value) | |
298 ) | |
299 ) | |
300 for idx, i in enumerate(range(*range_parms)): | |
301 self[i] = value[idx] | |
302 | |
303 def __delitem__(self, index): | |
304 # type: (Any) -> None | |
305 if not isinstance(index, slice): | |
306 return self.__delsingleitem__(index) | |
307 # nprint(index.start, index.stop, index.step, index.indices(len(self))) | |
308 for i in reversed(range(*index.indices(len(self)))): | |
309 del self[i] | |
310 | |
311 @abstractmethod | |
312 def __getsingleitem__(self, index): | |
313 # type: (Any) -> Any | |
314 raise IndexError | |
315 | |
316 @abstractmethod | |
317 def __setsingleitem__(self, index, value): | |
318 # type: (Any, Any) -> None | |
319 raise IndexError | |
320 | |
321 @abstractmethod | |
322 def __delsingleitem__(self, index): | |
323 # type: (Any) -> None | |
324 raise IndexError |