Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/ruamel/yaml/compat.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 # coding: utf-8 | |
2 | |
3 from __future__ import print_function | |
4 | |
5 # partially from package six by Benjamin Peterson | |
6 | |
7 import sys | |
8 import os | |
9 import types | |
10 import traceback | |
11 from abc import abstractmethod | |
12 | |
13 | |
14 # fmt: off | |
15 if False: # MYPY | |
16 from typing import Any, Dict, Optional, List, Union, BinaryIO, IO, Text, Tuple, Optional # NOQA | |
17 # fmt: on | |
18 | |
19 _DEFAULT_YAML_VERSION = (1, 2) | |
20 | |
21 try: | |
22 from ruamel.ordereddict import ordereddict | |
23 except: # NOQA | |
24 try: | |
25 from collections import OrderedDict | |
26 except ImportError: | |
27 from ordereddict import OrderedDict # type: ignore | |
28 # to get the right name import ... as ordereddict doesn't do that | |
29 | |
30 class ordereddict(OrderedDict): # type: ignore | |
31 if not hasattr(OrderedDict, 'insert'): | |
32 | |
33 def insert(self, pos, key, value): | |
34 # type: (int, Any, Any) -> None | |
35 if pos >= len(self): | |
36 self[key] = value | |
37 return | |
38 od = ordereddict() | |
39 od.update(self) | |
40 for k in od: | |
41 del self[k] | |
42 for index, old_key in enumerate(od): | |
43 if pos == index: | |
44 self[key] = value | |
45 self[old_key] = od[old_key] | |
46 | |
47 | |
48 PY2 = sys.version_info[0] == 2 | |
49 PY3 = sys.version_info[0] == 3 | |
50 | |
51 | |
52 if PY3: | |
53 | |
54 def utf8(s): | |
55 # type: (str) -> str | |
56 return s | |
57 | |
58 def to_str(s): | |
59 # type: (str) -> str | |
60 return s | |
61 | |
62 def to_unicode(s): | |
63 # type: (str) -> str | |
64 return s | |
65 | |
66 | |
67 else: | |
68 if False: | |
69 unicode = str | |
70 | |
71 def utf8(s): | |
72 # type: (unicode) -> str | |
73 return s.encode('utf-8') | |
74 | |
75 def to_str(s): | |
76 # type: (str) -> str | |
77 return str(s) | |
78 | |
79 def to_unicode(s): | |
80 # type: (str) -> unicode | |
81 return unicode(s) # NOQA | |
82 | |
83 | |
84 if PY3: | |
85 string_types = str | |
86 integer_types = int | |
87 class_types = type | |
88 text_type = str | |
89 binary_type = bytes | |
90 | |
91 MAXSIZE = sys.maxsize | |
92 unichr = chr | |
93 import io | |
94 | |
95 StringIO = io.StringIO | |
96 BytesIO = io.BytesIO | |
97 # have unlimited precision | |
98 no_limit_int = int | |
99 from collections.abc import Hashable, MutableSequence, MutableMapping, Mapping # NOQA | |
100 | |
101 else: | |
102 string_types = basestring # NOQA | |
103 integer_types = (int, long) # NOQA | |
104 class_types = (type, types.ClassType) | |
105 text_type = unicode # NOQA | |
106 binary_type = str | |
107 | |
108 # to allow importing | |
109 unichr = unichr # type: ignore | |
110 from StringIO import StringIO as _StringIO | |
111 | |
112 StringIO = _StringIO | |
113 import cStringIO | |
114 | |
115 BytesIO = cStringIO.StringIO | |
116 # have unlimited precision | |
117 no_limit_int = long # NOQA not available on Python 3 | |
118 from collections import Hashable, MutableSequence, MutableMapping, Mapping # NOQA | |
119 | |
120 if False: # MYPY | |
121 # StreamType = Union[BinaryIO, IO[str], IO[unicode], StringIO] | |
122 # StreamType = Union[BinaryIO, IO[str], StringIO] # type: ignore | |
123 StreamType = Any | |
124 | |
125 StreamTextType = Union[Text, StreamType] | |
126 VersionType = Union[List[int], str, Tuple[int, int]] | |
127 | |
128 if PY3: | |
129 builtins_module = 'builtins' | |
130 else: | |
131 builtins_module = '__builtin__' | |
132 | |
133 UNICODE_SIZE = 4 if sys.maxunicode > 65535 else 2 | |
134 | |
135 | |
136 def with_metaclass(meta, *bases): | |
137 # type: (Any, Any) -> Any | |
138 """Create a base class with a metaclass.""" | |
139 return meta('NewBase', bases, {}) | |
140 | |
141 | |
142 DBG_TOKEN = 1 | |
143 DBG_EVENT = 2 | |
144 DBG_NODE = 4 | |
145 | |
146 | |
147 _debug = None # type: Optional[int] | |
148 if 'RUAMELDEBUG' in os.environ: | |
149 _debugx = os.environ.get('RUAMELDEBUG') | |
150 if _debugx is None: | |
151 _debug = 0 | |
152 else: | |
153 _debug = int(_debugx) | |
154 | |
155 | |
156 if bool(_debug): | |
157 | |
158 class ObjectCounter(object): | |
159 def __init__(self): | |
160 # type: () -> None | |
161 self.map = {} # type: Dict[Any, Any] | |
162 | |
163 def __call__(self, k): | |
164 # type: (Any) -> None | |
165 self.map[k] = self.map.get(k, 0) + 1 | |
166 | |
167 def dump(self): | |
168 # type: () -> None | |
169 for k in sorted(self.map): | |
170 sys.stdout.write('{} -> {}'.format(k, self.map[k])) | |
171 | |
172 object_counter = ObjectCounter() | |
173 | |
174 | |
175 # used from yaml util when testing | |
176 def dbg(val=None): | |
177 # type: (Any) -> Any | |
178 global _debug | |
179 if _debug is None: | |
180 # set to true or false | |
181 _debugx = os.environ.get('YAMLDEBUG') | |
182 if _debugx is None: | |
183 _debug = 0 | |
184 else: | |
185 _debug = int(_debugx) | |
186 if val is None: | |
187 return _debug | |
188 return _debug & val | |
189 | |
190 | |
191 class Nprint(object): | |
192 def __init__(self, file_name=None): | |
193 # type: (Any) -> None | |
194 self._max_print = None # type: Any | |
195 self._count = None # type: Any | |
196 self._file_name = file_name | |
197 | |
198 def __call__(self, *args, **kw): | |
199 # type: (Any, Any) -> None | |
200 if not bool(_debug): | |
201 return | |
202 out = sys.stdout if self._file_name is None else open(self._file_name, 'a') | |
203 dbgprint = print # to fool checking for print statements by dv utility | |
204 kw1 = kw.copy() | |
205 kw1['file'] = out | |
206 dbgprint(*args, **kw1) | |
207 out.flush() | |
208 if self._max_print is not None: | |
209 if self._count is None: | |
210 self._count = self._max_print | |
211 self._count -= 1 | |
212 if self._count == 0: | |
213 dbgprint('forced exit\n') | |
214 traceback.print_stack() | |
215 out.flush() | |
216 sys.exit(0) | |
217 if self._file_name: | |
218 out.close() | |
219 | |
220 def set_max_print(self, i): | |
221 # type: (int) -> None | |
222 self._max_print = i | |
223 self._count = None | |
224 | |
225 | |
226 nprint = Nprint() | |
227 nprintf = Nprint('/var/tmp/ruamel.yaml.log') | |
228 | |
229 # char checkers following production rules | |
230 | |
231 | |
232 def check_namespace_char(ch): | |
233 # type: (Any) -> bool | |
234 if u'\x21' <= ch <= u'\x7E': # ! to ~ | |
235 return True | |
236 if u'\xA0' <= ch <= u'\uD7FF': | |
237 return True | |
238 if (u'\uE000' <= ch <= u'\uFFFD') and ch != u'\uFEFF': # excl. byte order mark | |
239 return True | |
240 if u'\U00010000' <= ch <= u'\U0010FFFF': | |
241 return True | |
242 return False | |
243 | |
244 | |
245 def check_anchorname_char(ch): | |
246 # type: (Any) -> bool | |
247 if ch in u',[]{}': | |
248 return False | |
249 return check_namespace_char(ch) | |
250 | |
251 | |
252 def version_tnf(t1, t2=None): | |
253 # type: (Any, Any) -> Any | |
254 """ | |
255 return True if ruamel.yaml version_info < t1, None if t2 is specified and bigger else False | |
256 """ | |
257 from ruamel.yaml import version_info # NOQA | |
258 | |
259 if version_info < t1: | |
260 return True | |
261 if t2 is not None and version_info < t2: | |
262 return None | |
263 return False | |
264 | |
265 | |
266 class MutableSliceableSequence(MutableSequence): # type: ignore | |
267 __slots__ = () | |
268 | |
269 def __getitem__(self, index): | |
270 # type: (Any) -> Any | |
271 if not isinstance(index, slice): | |
272 return self.__getsingleitem__(index) | |
273 return type(self)([self[i] for i in range(*index.indices(len(self)))]) # type: ignore | |
274 | |
275 def __setitem__(self, index, value): | |
276 # type: (Any, Any) -> None | |
277 if not isinstance(index, slice): | |
278 return self.__setsingleitem__(index, value) | |
279 assert iter(value) | |
280 # nprint(index.start, index.stop, index.step, index.indices(len(self))) | |
281 if index.step is None: | |
282 del self[index.start : index.stop] | |
283 for elem in reversed(value): | |
284 self.insert(0 if index.start is None else index.start, elem) | |
285 else: | |
286 range_parms = index.indices(len(self)) | |
287 nr_assigned_items = (range_parms[1] - range_parms[0] - 1) // range_parms[2] + 1 | |
288 # need to test before changing, in case TypeError is caught | |
289 if nr_assigned_items < len(value): | |
290 raise TypeError( | |
291 'too many elements in value {} < {}'.format(nr_assigned_items, len(value)) | |
292 ) | |
293 elif nr_assigned_items > len(value): | |
294 raise TypeError( | |
295 'not enough elements in value {} > {}'.format( | |
296 nr_assigned_items, len(value) | |
297 ) | |
298 ) | |
299 for idx, i in enumerate(range(*range_parms)): | |
300 self[i] = value[idx] | |
301 | |
302 def __delitem__(self, index): | |
303 # type: (Any) -> None | |
304 if not isinstance(index, slice): | |
305 return self.__delsingleitem__(index) | |
306 # nprint(index.start, index.stop, index.step, index.indices(len(self))) | |
307 for i in reversed(range(*index.indices(len(self)))): | |
308 del self[i] | |
309 | |
310 @abstractmethod | |
311 def __getsingleitem__(self, index): | |
312 # type: (Any) -> Any | |
313 raise IndexError | |
314 | |
315 @abstractmethod | |
316 def __setsingleitem__(self, index, value): | |
317 # type: (Any, Any) -> None | |
318 raise IndexError | |
319 | |
320 @abstractmethod | |
321 def __delsingleitem__(self, index): | |
322 # type: (Any) -> None | |
323 raise IndexError |