Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/click/_compat.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 # flake8: noqa | |
2 import codecs | |
3 import io | |
4 import os | |
5 import re | |
6 import sys | |
7 from weakref import WeakKeyDictionary | |
8 | |
9 PY2 = sys.version_info[0] == 2 | |
10 CYGWIN = sys.platform.startswith("cygwin") | |
11 MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version) | |
12 # Determine local App Engine environment, per Google's own suggestion | |
13 APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get( | |
14 "SERVER_SOFTWARE", "" | |
15 ) | |
16 WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2 | |
17 DEFAULT_COLUMNS = 80 | |
18 | |
19 | |
20 _ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") | |
21 | |
22 | |
23 def get_filesystem_encoding(): | |
24 return sys.getfilesystemencoding() or sys.getdefaultencoding() | |
25 | |
26 | |
27 def _make_text_stream( | |
28 stream, encoding, errors, force_readable=False, force_writable=False | |
29 ): | |
30 if encoding is None: | |
31 encoding = get_best_encoding(stream) | |
32 if errors is None: | |
33 errors = "replace" | |
34 return _NonClosingTextIOWrapper( | |
35 stream, | |
36 encoding, | |
37 errors, | |
38 line_buffering=True, | |
39 force_readable=force_readable, | |
40 force_writable=force_writable, | |
41 ) | |
42 | |
43 | |
44 def is_ascii_encoding(encoding): | |
45 """Checks if a given encoding is ascii.""" | |
46 try: | |
47 return codecs.lookup(encoding).name == "ascii" | |
48 except LookupError: | |
49 return False | |
50 | |
51 | |
52 def get_best_encoding(stream): | |
53 """Returns the default stream encoding if not found.""" | |
54 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding() | |
55 if is_ascii_encoding(rv): | |
56 return "utf-8" | |
57 return rv | |
58 | |
59 | |
60 class _NonClosingTextIOWrapper(io.TextIOWrapper): | |
61 def __init__( | |
62 self, | |
63 stream, | |
64 encoding, | |
65 errors, | |
66 force_readable=False, | |
67 force_writable=False, | |
68 **extra | |
69 ): | |
70 self._stream = stream = _FixupStream(stream, force_readable, force_writable) | |
71 io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra) | |
72 | |
73 # The io module is a place where the Python 3 text behavior | |
74 # was forced upon Python 2, so we need to unbreak | |
75 # it to look like Python 2. | |
76 if PY2: | |
77 | |
78 def write(self, x): | |
79 if isinstance(x, str) or is_bytes(x): | |
80 try: | |
81 self.flush() | |
82 except Exception: | |
83 pass | |
84 return self.buffer.write(str(x)) | |
85 return io.TextIOWrapper.write(self, x) | |
86 | |
87 def writelines(self, lines): | |
88 for line in lines: | |
89 self.write(line) | |
90 | |
91 def __del__(self): | |
92 try: | |
93 self.detach() | |
94 except Exception: | |
95 pass | |
96 | |
97 def isatty(self): | |
98 # https://bitbucket.org/pypy/pypy/issue/1803 | |
99 return self._stream.isatty() | |
100 | |
101 | |
102 class _FixupStream(object): | |
103 """The new io interface needs more from streams than streams | |
104 traditionally implement. As such, this fix-up code is necessary in | |
105 some circumstances. | |
106 | |
107 The forcing of readable and writable flags are there because some tools | |
108 put badly patched objects on sys (one such offender are certain version | |
109 of jupyter notebook). | |
110 """ | |
111 | |
112 def __init__(self, stream, force_readable=False, force_writable=False): | |
113 self._stream = stream | |
114 self._force_readable = force_readable | |
115 self._force_writable = force_writable | |
116 | |
117 def __getattr__(self, name): | |
118 return getattr(self._stream, name) | |
119 | |
120 def read1(self, size): | |
121 f = getattr(self._stream, "read1", None) | |
122 if f is not None: | |
123 return f(size) | |
124 # We only dispatch to readline instead of read in Python 2 as we | |
125 # do not want cause problems with the different implementation | |
126 # of line buffering. | |
127 if PY2: | |
128 return self._stream.readline(size) | |
129 return self._stream.read(size) | |
130 | |
131 def readable(self): | |
132 if self._force_readable: | |
133 return True | |
134 x = getattr(self._stream, "readable", None) | |
135 if x is not None: | |
136 return x() | |
137 try: | |
138 self._stream.read(0) | |
139 except Exception: | |
140 return False | |
141 return True | |
142 | |
143 def writable(self): | |
144 if self._force_writable: | |
145 return True | |
146 x = getattr(self._stream, "writable", None) | |
147 if x is not None: | |
148 return x() | |
149 try: | |
150 self._stream.write("") | |
151 except Exception: | |
152 try: | |
153 self._stream.write(b"") | |
154 except Exception: | |
155 return False | |
156 return True | |
157 | |
158 def seekable(self): | |
159 x = getattr(self._stream, "seekable", None) | |
160 if x is not None: | |
161 return x() | |
162 try: | |
163 self._stream.seek(self._stream.tell()) | |
164 except Exception: | |
165 return False | |
166 return True | |
167 | |
168 | |
169 if PY2: | |
170 text_type = unicode | |
171 raw_input = raw_input | |
172 string_types = (str, unicode) | |
173 int_types = (int, long) | |
174 iteritems = lambda x: x.iteritems() | |
175 range_type = xrange | |
176 | |
177 def is_bytes(x): | |
178 return isinstance(x, (buffer, bytearray)) | |
179 | |
180 _identifier_re = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") | |
181 | |
182 # For Windows, we need to force stdout/stdin/stderr to binary if it's | |
183 # fetched for that. This obviously is not the most correct way to do | |
184 # it as it changes global state. Unfortunately, there does not seem to | |
185 # be a clear better way to do it as just reopening the file in binary | |
186 # mode does not change anything. | |
187 # | |
188 # An option would be to do what Python 3 does and to open the file as | |
189 # binary only, patch it back to the system, and then use a wrapper | |
190 # stream that converts newlines. It's not quite clear what's the | |
191 # correct option here. | |
192 # | |
193 # This code also lives in _winconsole for the fallback to the console | |
194 # emulation stream. | |
195 # | |
196 # There are also Windows environments where the `msvcrt` module is not | |
197 # available (which is why we use try-catch instead of the WIN variable | |
198 # here), such as the Google App Engine development server on Windows. In | |
199 # those cases there is just nothing we can do. | |
200 def set_binary_mode(f): | |
201 return f | |
202 | |
203 try: | |
204 import msvcrt | |
205 except ImportError: | |
206 pass | |
207 else: | |
208 | |
209 def set_binary_mode(f): | |
210 try: | |
211 fileno = f.fileno() | |
212 except Exception: | |
213 pass | |
214 else: | |
215 msvcrt.setmode(fileno, os.O_BINARY) | |
216 return f | |
217 | |
218 try: | |
219 import fcntl | |
220 except ImportError: | |
221 pass | |
222 else: | |
223 | |
224 def set_binary_mode(f): | |
225 try: | |
226 fileno = f.fileno() | |
227 except Exception: | |
228 pass | |
229 else: | |
230 flags = fcntl.fcntl(fileno, fcntl.F_GETFL) | |
231 fcntl.fcntl(fileno, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) | |
232 return f | |
233 | |
234 def isidentifier(x): | |
235 return _identifier_re.search(x) is not None | |
236 | |
237 def get_binary_stdin(): | |
238 return set_binary_mode(sys.stdin) | |
239 | |
240 def get_binary_stdout(): | |
241 _wrap_std_stream("stdout") | |
242 return set_binary_mode(sys.stdout) | |
243 | |
244 def get_binary_stderr(): | |
245 _wrap_std_stream("stderr") | |
246 return set_binary_mode(sys.stderr) | |
247 | |
248 def get_text_stdin(encoding=None, errors=None): | |
249 rv = _get_windows_console_stream(sys.stdin, encoding, errors) | |
250 if rv is not None: | |
251 return rv | |
252 return _make_text_stream(sys.stdin, encoding, errors, force_readable=True) | |
253 | |
254 def get_text_stdout(encoding=None, errors=None): | |
255 _wrap_std_stream("stdout") | |
256 rv = _get_windows_console_stream(sys.stdout, encoding, errors) | |
257 if rv is not None: | |
258 return rv | |
259 return _make_text_stream(sys.stdout, encoding, errors, force_writable=True) | |
260 | |
261 def get_text_stderr(encoding=None, errors=None): | |
262 _wrap_std_stream("stderr") | |
263 rv = _get_windows_console_stream(sys.stderr, encoding, errors) | |
264 if rv is not None: | |
265 return rv | |
266 return _make_text_stream(sys.stderr, encoding, errors, force_writable=True) | |
267 | |
268 def filename_to_ui(value): | |
269 if isinstance(value, bytes): | |
270 value = value.decode(get_filesystem_encoding(), "replace") | |
271 return value | |
272 | |
273 | |
274 else: | |
275 import io | |
276 | |
277 text_type = str | |
278 raw_input = input | |
279 string_types = (str,) | |
280 int_types = (int,) | |
281 range_type = range | |
282 isidentifier = lambda x: x.isidentifier() | |
283 iteritems = lambda x: iter(x.items()) | |
284 | |
285 def is_bytes(x): | |
286 return isinstance(x, (bytes, memoryview, bytearray)) | |
287 | |
288 def _is_binary_reader(stream, default=False): | |
289 try: | |
290 return isinstance(stream.read(0), bytes) | |
291 except Exception: | |
292 return default | |
293 # This happens in some cases where the stream was already | |
294 # closed. In this case, we assume the default. | |
295 | |
296 def _is_binary_writer(stream, default=False): | |
297 try: | |
298 stream.write(b"") | |
299 except Exception: | |
300 try: | |
301 stream.write("") | |
302 return False | |
303 except Exception: | |
304 pass | |
305 return default | |
306 return True | |
307 | |
308 def _find_binary_reader(stream): | |
309 # We need to figure out if the given stream is already binary. | |
310 # This can happen because the official docs recommend detaching | |
311 # the streams to get binary streams. Some code might do this, so | |
312 # we need to deal with this case explicitly. | |
313 if _is_binary_reader(stream, False): | |
314 return stream | |
315 | |
316 buf = getattr(stream, "buffer", None) | |
317 | |
318 # Same situation here; this time we assume that the buffer is | |
319 # actually binary in case it's closed. | |
320 if buf is not None and _is_binary_reader(buf, True): | |
321 return buf | |
322 | |
323 def _find_binary_writer(stream): | |
324 # We need to figure out if the given stream is already binary. | |
325 # This can happen because the official docs recommend detatching | |
326 # the streams to get binary streams. Some code might do this, so | |
327 # we need to deal with this case explicitly. | |
328 if _is_binary_writer(stream, False): | |
329 return stream | |
330 | |
331 buf = getattr(stream, "buffer", None) | |
332 | |
333 # Same situation here; this time we assume that the buffer is | |
334 # actually binary in case it's closed. | |
335 if buf is not None and _is_binary_writer(buf, True): | |
336 return buf | |
337 | |
338 def _stream_is_misconfigured(stream): | |
339 """A stream is misconfigured if its encoding is ASCII.""" | |
340 # If the stream does not have an encoding set, we assume it's set | |
341 # to ASCII. This appears to happen in certain unittest | |
342 # environments. It's not quite clear what the correct behavior is | |
343 # but this at least will force Click to recover somehow. | |
344 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii") | |
345 | |
346 def _is_compat_stream_attr(stream, attr, value): | |
347 """A stream attribute is compatible if it is equal to the | |
348 desired value or the desired value is unset and the attribute | |
349 has a value. | |
350 """ | |
351 stream_value = getattr(stream, attr, None) | |
352 return stream_value == value or (value is None and stream_value is not None) | |
353 | |
354 def _is_compatible_text_stream(stream, encoding, errors): | |
355 """Check if a stream's encoding and errors attributes are | |
356 compatible with the desired values. | |
357 """ | |
358 return _is_compat_stream_attr( | |
359 stream, "encoding", encoding | |
360 ) and _is_compat_stream_attr(stream, "errors", errors) | |
361 | |
362 def _force_correct_text_stream( | |
363 text_stream, | |
364 encoding, | |
365 errors, | |
366 is_binary, | |
367 find_binary, | |
368 force_readable=False, | |
369 force_writable=False, | |
370 ): | |
371 if is_binary(text_stream, False): | |
372 binary_reader = text_stream | |
373 else: | |
374 # If the stream looks compatible, and won't default to a | |
375 # misconfigured ascii encoding, return it as-is. | |
376 if _is_compatible_text_stream(text_stream, encoding, errors) and not ( | |
377 encoding is None and _stream_is_misconfigured(text_stream) | |
378 ): | |
379 return text_stream | |
380 | |
381 # Otherwise, get the underlying binary reader. | |
382 binary_reader = find_binary(text_stream) | |
383 | |
384 # If that's not possible, silently use the original reader | |
385 # and get mojibake instead of exceptions. | |
386 if binary_reader is None: | |
387 return text_stream | |
388 | |
389 # Default errors to replace instead of strict in order to get | |
390 # something that works. | |
391 if errors is None: | |
392 errors = "replace" | |
393 | |
394 # Wrap the binary stream in a text stream with the correct | |
395 # encoding parameters. | |
396 return _make_text_stream( | |
397 binary_reader, | |
398 encoding, | |
399 errors, | |
400 force_readable=force_readable, | |
401 force_writable=force_writable, | |
402 ) | |
403 | |
404 def _force_correct_text_reader(text_reader, encoding, errors, force_readable=False): | |
405 return _force_correct_text_stream( | |
406 text_reader, | |
407 encoding, | |
408 errors, | |
409 _is_binary_reader, | |
410 _find_binary_reader, | |
411 force_readable=force_readable, | |
412 ) | |
413 | |
414 def _force_correct_text_writer(text_writer, encoding, errors, force_writable=False): | |
415 return _force_correct_text_stream( | |
416 text_writer, | |
417 encoding, | |
418 errors, | |
419 _is_binary_writer, | |
420 _find_binary_writer, | |
421 force_writable=force_writable, | |
422 ) | |
423 | |
424 def get_binary_stdin(): | |
425 reader = _find_binary_reader(sys.stdin) | |
426 if reader is None: | |
427 raise RuntimeError("Was not able to determine binary stream for sys.stdin.") | |
428 return reader | |
429 | |
430 def get_binary_stdout(): | |
431 writer = _find_binary_writer(sys.stdout) | |
432 if writer is None: | |
433 raise RuntimeError( | |
434 "Was not able to determine binary stream for sys.stdout." | |
435 ) | |
436 return writer | |
437 | |
438 def get_binary_stderr(): | |
439 writer = _find_binary_writer(sys.stderr) | |
440 if writer is None: | |
441 raise RuntimeError( | |
442 "Was not able to determine binary stream for sys.stderr." | |
443 ) | |
444 return writer | |
445 | |
446 def get_text_stdin(encoding=None, errors=None): | |
447 rv = _get_windows_console_stream(sys.stdin, encoding, errors) | |
448 if rv is not None: | |
449 return rv | |
450 return _force_correct_text_reader( | |
451 sys.stdin, encoding, errors, force_readable=True | |
452 ) | |
453 | |
454 def get_text_stdout(encoding=None, errors=None): | |
455 rv = _get_windows_console_stream(sys.stdout, encoding, errors) | |
456 if rv is not None: | |
457 return rv | |
458 return _force_correct_text_writer( | |
459 sys.stdout, encoding, errors, force_writable=True | |
460 ) | |
461 | |
462 def get_text_stderr(encoding=None, errors=None): | |
463 rv = _get_windows_console_stream(sys.stderr, encoding, errors) | |
464 if rv is not None: | |
465 return rv | |
466 return _force_correct_text_writer( | |
467 sys.stderr, encoding, errors, force_writable=True | |
468 ) | |
469 | |
470 def filename_to_ui(value): | |
471 if isinstance(value, bytes): | |
472 value = value.decode(get_filesystem_encoding(), "replace") | |
473 else: | |
474 value = value.encode("utf-8", "surrogateescape").decode("utf-8", "replace") | |
475 return value | |
476 | |
477 | |
478 def get_streerror(e, default=None): | |
479 if hasattr(e, "strerror"): | |
480 msg = e.strerror | |
481 else: | |
482 if default is not None: | |
483 msg = default | |
484 else: | |
485 msg = str(e) | |
486 if isinstance(msg, bytes): | |
487 msg = msg.decode("utf-8", "replace") | |
488 return msg | |
489 | |
490 | |
491 def _wrap_io_open(file, mode, encoding, errors): | |
492 """On Python 2, :func:`io.open` returns a text file wrapper that | |
493 requires passing ``unicode`` to ``write``. Need to open the file in | |
494 binary mode then wrap it in a subclass that can write ``str`` and | |
495 ``unicode``. | |
496 | |
497 Also handles not passing ``encoding`` and ``errors`` in binary mode. | |
498 """ | |
499 binary = "b" in mode | |
500 | |
501 if binary: | |
502 kwargs = {} | |
503 else: | |
504 kwargs = {"encoding": encoding, "errors": errors} | |
505 | |
506 if not PY2 or binary: | |
507 return io.open(file, mode, **kwargs) | |
508 | |
509 f = io.open(file, "{}b".format(mode.replace("t", ""))) | |
510 return _make_text_stream(f, **kwargs) | |
511 | |
512 | |
513 def open_stream(filename, mode="r", encoding=None, errors="strict", atomic=False): | |
514 binary = "b" in mode | |
515 | |
516 # Standard streams first. These are simple because they don't need | |
517 # special handling for the atomic flag. It's entirely ignored. | |
518 if filename == "-": | |
519 if any(m in mode for m in ["w", "a", "x"]): | |
520 if binary: | |
521 return get_binary_stdout(), False | |
522 return get_text_stdout(encoding=encoding, errors=errors), False | |
523 if binary: | |
524 return get_binary_stdin(), False | |
525 return get_text_stdin(encoding=encoding, errors=errors), False | |
526 | |
527 # Non-atomic writes directly go out through the regular open functions. | |
528 if not atomic: | |
529 return _wrap_io_open(filename, mode, encoding, errors), True | |
530 | |
531 # Some usability stuff for atomic writes | |
532 if "a" in mode: | |
533 raise ValueError( | |
534 "Appending to an existing file is not supported, because that" | |
535 " would involve an expensive `copy`-operation to a temporary" | |
536 " file. Open the file in normal `w`-mode and copy explicitly" | |
537 " if that's what you're after." | |
538 ) | |
539 if "x" in mode: | |
540 raise ValueError("Use the `overwrite`-parameter instead.") | |
541 if "w" not in mode: | |
542 raise ValueError("Atomic writes only make sense with `w`-mode.") | |
543 | |
544 # Atomic writes are more complicated. They work by opening a file | |
545 # as a proxy in the same folder and then using the fdopen | |
546 # functionality to wrap it in a Python file. Then we wrap it in an | |
547 # atomic file that moves the file over on close. | |
548 import errno | |
549 import random | |
550 | |
551 try: | |
552 perm = os.stat(filename).st_mode | |
553 except OSError: | |
554 perm = None | |
555 | |
556 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL | |
557 | |
558 if binary: | |
559 flags |= getattr(os, "O_BINARY", 0) | |
560 | |
561 while True: | |
562 tmp_filename = os.path.join( | |
563 os.path.dirname(filename), | |
564 ".__atomic-write{:08x}".format(random.randrange(1 << 32)), | |
565 ) | |
566 try: | |
567 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm) | |
568 break | |
569 except OSError as e: | |
570 if e.errno == errno.EEXIST or ( | |
571 os.name == "nt" | |
572 and e.errno == errno.EACCES | |
573 and os.path.isdir(e.filename) | |
574 and os.access(e.filename, os.W_OK) | |
575 ): | |
576 continue | |
577 raise | |
578 | |
579 if perm is not None: | |
580 os.chmod(tmp_filename, perm) # in case perm includes bits in umask | |
581 | |
582 f = _wrap_io_open(fd, mode, encoding, errors) | |
583 return _AtomicFile(f, tmp_filename, os.path.realpath(filename)), True | |
584 | |
585 | |
586 # Used in a destructor call, needs extra protection from interpreter cleanup. | |
587 if hasattr(os, "replace"): | |
588 _replace = os.replace | |
589 _can_replace = True | |
590 else: | |
591 _replace = os.rename | |
592 _can_replace = not WIN | |
593 | |
594 | |
595 class _AtomicFile(object): | |
596 def __init__(self, f, tmp_filename, real_filename): | |
597 self._f = f | |
598 self._tmp_filename = tmp_filename | |
599 self._real_filename = real_filename | |
600 self.closed = False | |
601 | |
602 @property | |
603 def name(self): | |
604 return self._real_filename | |
605 | |
606 def close(self, delete=False): | |
607 if self.closed: | |
608 return | |
609 self._f.close() | |
610 if not _can_replace: | |
611 try: | |
612 os.remove(self._real_filename) | |
613 except OSError: | |
614 pass | |
615 _replace(self._tmp_filename, self._real_filename) | |
616 self.closed = True | |
617 | |
618 def __getattr__(self, name): | |
619 return getattr(self._f, name) | |
620 | |
621 def __enter__(self): | |
622 return self | |
623 | |
624 def __exit__(self, exc_type, exc_value, tb): | |
625 self.close(delete=exc_type is not None) | |
626 | |
627 def __repr__(self): | |
628 return repr(self._f) | |
629 | |
630 | |
631 auto_wrap_for_ansi = None | |
632 colorama = None | |
633 get_winterm_size = None | |
634 | |
635 | |
636 def strip_ansi(value): | |
637 return _ansi_re.sub("", value) | |
638 | |
639 | |
640 def _is_jupyter_kernel_output(stream): | |
641 if WIN: | |
642 # TODO: Couldn't test on Windows, should't try to support until | |
643 # someone tests the details wrt colorama. | |
644 return | |
645 | |
646 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)): | |
647 stream = stream._stream | |
648 | |
649 return stream.__class__.__module__.startswith("ipykernel.") | |
650 | |
651 | |
652 def should_strip_ansi(stream=None, color=None): | |
653 if color is None: | |
654 if stream is None: | |
655 stream = sys.stdin | |
656 return not isatty(stream) and not _is_jupyter_kernel_output(stream) | |
657 return not color | |
658 | |
659 | |
660 # If we're on Windows, we provide transparent integration through | |
661 # colorama. This will make ANSI colors through the echo function | |
662 # work automatically. | |
663 if WIN: | |
664 # Windows has a smaller terminal | |
665 DEFAULT_COLUMNS = 79 | |
666 | |
667 from ._winconsole import _get_windows_console_stream, _wrap_std_stream | |
668 | |
669 def _get_argv_encoding(): | |
670 import locale | |
671 | |
672 return locale.getpreferredencoding() | |
673 | |
674 if PY2: | |
675 | |
676 def raw_input(prompt=""): | |
677 sys.stderr.flush() | |
678 if prompt: | |
679 stdout = _default_text_stdout() | |
680 stdout.write(prompt) | |
681 stdin = _default_text_stdin() | |
682 return stdin.readline().rstrip("\r\n") | |
683 | |
684 try: | |
685 import colorama | |
686 except ImportError: | |
687 pass | |
688 else: | |
689 _ansi_stream_wrappers = WeakKeyDictionary() | |
690 | |
691 def auto_wrap_for_ansi(stream, color=None): | |
692 """This function wraps a stream so that calls through colorama | |
693 are issued to the win32 console API to recolor on demand. It | |
694 also ensures to reset the colors if a write call is interrupted | |
695 to not destroy the console afterwards. | |
696 """ | |
697 try: | |
698 cached = _ansi_stream_wrappers.get(stream) | |
699 except Exception: | |
700 cached = None | |
701 if cached is not None: | |
702 return cached | |
703 strip = should_strip_ansi(stream, color) | |
704 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip) | |
705 rv = ansi_wrapper.stream | |
706 _write = rv.write | |
707 | |
708 def _safe_write(s): | |
709 try: | |
710 return _write(s) | |
711 except: | |
712 ansi_wrapper.reset_all() | |
713 raise | |
714 | |
715 rv.write = _safe_write | |
716 try: | |
717 _ansi_stream_wrappers[stream] = rv | |
718 except Exception: | |
719 pass | |
720 return rv | |
721 | |
722 def get_winterm_size(): | |
723 win = colorama.win32.GetConsoleScreenBufferInfo( | |
724 colorama.win32.STDOUT | |
725 ).srWindow | |
726 return win.Right - win.Left, win.Bottom - win.Top | |
727 | |
728 | |
729 else: | |
730 | |
731 def _get_argv_encoding(): | |
732 return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding() | |
733 | |
734 _get_windows_console_stream = lambda *x: None | |
735 _wrap_std_stream = lambda *x: None | |
736 | |
737 | |
738 def term_len(x): | |
739 return len(strip_ansi(x)) | |
740 | |
741 | |
742 def isatty(stream): | |
743 try: | |
744 return stream.isatty() | |
745 except Exception: | |
746 return False | |
747 | |
748 | |
749 def _make_cached_stream_func(src_func, wrapper_func): | |
750 cache = WeakKeyDictionary() | |
751 | |
752 def func(): | |
753 stream = src_func() | |
754 try: | |
755 rv = cache.get(stream) | |
756 except Exception: | |
757 rv = None | |
758 if rv is not None: | |
759 return rv | |
760 rv = wrapper_func() | |
761 try: | |
762 stream = src_func() # In case wrapper_func() modified the stream | |
763 cache[stream] = rv | |
764 except Exception: | |
765 pass | |
766 return rv | |
767 | |
768 return func | |
769 | |
770 | |
771 _default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin) | |
772 _default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout) | |
773 _default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr) | |
774 | |
775 | |
776 binary_streams = { | |
777 "stdin": get_binary_stdin, | |
778 "stdout": get_binary_stdout, | |
779 "stderr": get_binary_stderr, | |
780 } | |
781 | |
782 text_streams = { | |
783 "stdin": get_text_stdin, | |
784 "stdout": get_text_stdout, | |
785 "stderr": get_text_stderr, | |
786 } |