Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/utils/ui.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:47:31 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9e54283cc701 |
---|---|
1 # The following comment should be removed at some point in the future. | |
2 # mypy: strict-optional=False | |
3 # mypy: disallow-untyped-defs=False | |
4 | |
5 from __future__ import absolute_import, division | |
6 | |
7 import contextlib | |
8 import itertools | |
9 import logging | |
10 import sys | |
11 import time | |
12 from signal import SIGINT, default_int_handler, signal | |
13 | |
14 from pip._vendor import six | |
15 from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR | |
16 from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar | |
17 from pip._vendor.progress.spinner import Spinner | |
18 | |
19 from pip._internal.utils.compat import WINDOWS | |
20 from pip._internal.utils.logging import get_indentation | |
21 from pip._internal.utils.misc import format_size | |
22 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
23 | |
24 if MYPY_CHECK_RUNNING: | |
25 from typing import Any, Iterator, IO | |
26 | |
27 try: | |
28 from pip._vendor import colorama | |
29 # Lots of different errors can come from this, including SystemError and | |
30 # ImportError. | |
31 except Exception: | |
32 colorama = None | |
33 | |
34 logger = logging.getLogger(__name__) | |
35 | |
36 | |
37 def _select_progress_class(preferred, fallback): | |
38 encoding = getattr(preferred.file, "encoding", None) | |
39 | |
40 # If we don't know what encoding this file is in, then we'll just assume | |
41 # that it doesn't support unicode and use the ASCII bar. | |
42 if not encoding: | |
43 return fallback | |
44 | |
45 # Collect all of the possible characters we want to use with the preferred | |
46 # bar. | |
47 characters = [ | |
48 getattr(preferred, "empty_fill", six.text_type()), | |
49 getattr(preferred, "fill", six.text_type()), | |
50 ] | |
51 characters += list(getattr(preferred, "phases", [])) | |
52 | |
53 # Try to decode the characters we're using for the bar using the encoding | |
54 # of the given file, if this works then we'll assume that we can use the | |
55 # fancier bar and if not we'll fall back to the plaintext bar. | |
56 try: | |
57 six.text_type().join(characters).encode(encoding) | |
58 except UnicodeEncodeError: | |
59 return fallback | |
60 else: | |
61 return preferred | |
62 | |
63 | |
64 _BaseBar = _select_progress_class(IncrementalBar, Bar) # type: Any | |
65 | |
66 | |
67 class InterruptibleMixin(object): | |
68 """ | |
69 Helper to ensure that self.finish() gets called on keyboard interrupt. | |
70 | |
71 This allows downloads to be interrupted without leaving temporary state | |
72 (like hidden cursors) behind. | |
73 | |
74 This class is similar to the progress library's existing SigIntMixin | |
75 helper, but as of version 1.2, that helper has the following problems: | |
76 | |
77 1. It calls sys.exit(). | |
78 2. It discards the existing SIGINT handler completely. | |
79 3. It leaves its own handler in place even after an uninterrupted finish, | |
80 which will have unexpected delayed effects if the user triggers an | |
81 unrelated keyboard interrupt some time after a progress-displaying | |
82 download has already completed, for example. | |
83 """ | |
84 | |
85 def __init__(self, *args, **kwargs): | |
86 """ | |
87 Save the original SIGINT handler for later. | |
88 """ | |
89 super(InterruptibleMixin, self).__init__(*args, **kwargs) | |
90 | |
91 self.original_handler = signal(SIGINT, self.handle_sigint) | |
92 | |
93 # If signal() returns None, the previous handler was not installed from | |
94 # Python, and we cannot restore it. This probably should not happen, | |
95 # but if it does, we must restore something sensible instead, at least. | |
96 # The least bad option should be Python's default SIGINT handler, which | |
97 # just raises KeyboardInterrupt. | |
98 if self.original_handler is None: | |
99 self.original_handler = default_int_handler | |
100 | |
101 def finish(self): | |
102 """ | |
103 Restore the original SIGINT handler after finishing. | |
104 | |
105 This should happen regardless of whether the progress display finishes | |
106 normally, or gets interrupted. | |
107 """ | |
108 super(InterruptibleMixin, self).finish() | |
109 signal(SIGINT, self.original_handler) | |
110 | |
111 def handle_sigint(self, signum, frame): | |
112 """ | |
113 Call self.finish() before delegating to the original SIGINT handler. | |
114 | |
115 This handler should only be in place while the progress display is | |
116 active. | |
117 """ | |
118 self.finish() | |
119 self.original_handler(signum, frame) | |
120 | |
121 | |
122 class SilentBar(Bar): | |
123 | |
124 def update(self): | |
125 pass | |
126 | |
127 | |
128 class BlueEmojiBar(IncrementalBar): | |
129 | |
130 suffix = "%(percent)d%%" | |
131 bar_prefix = " " | |
132 bar_suffix = " " | |
133 phases = (u"\U0001F539", u"\U0001F537", u"\U0001F535") # type: Any | |
134 | |
135 | |
136 class DownloadProgressMixin(object): | |
137 | |
138 def __init__(self, *args, **kwargs): | |
139 super(DownloadProgressMixin, self).__init__(*args, **kwargs) | |
140 self.message = (" " * (get_indentation() + 2)) + self.message | |
141 | |
142 @property | |
143 def downloaded(self): | |
144 return format_size(self.index) | |
145 | |
146 @property | |
147 def download_speed(self): | |
148 # Avoid zero division errors... | |
149 if self.avg == 0.0: | |
150 return "..." | |
151 return format_size(1 / self.avg) + "/s" | |
152 | |
153 @property | |
154 def pretty_eta(self): | |
155 if self.eta: | |
156 return "eta %s" % self.eta_td | |
157 return "" | |
158 | |
159 def iter(self, it): | |
160 for x in it: | |
161 yield x | |
162 self.next(len(x)) | |
163 self.finish() | |
164 | |
165 | |
166 class WindowsMixin(object): | |
167 | |
168 def __init__(self, *args, **kwargs): | |
169 # The Windows terminal does not support the hide/show cursor ANSI codes | |
170 # even with colorama. So we'll ensure that hide_cursor is False on | |
171 # Windows. | |
172 # This call needs to go before the super() call, so that hide_cursor | |
173 # is set in time. The base progress bar class writes the "hide cursor" | |
174 # code to the terminal in its init, so if we don't set this soon | |
175 # enough, we get a "hide" with no corresponding "show"... | |
176 if WINDOWS and self.hide_cursor: | |
177 self.hide_cursor = False | |
178 | |
179 super(WindowsMixin, self).__init__(*args, **kwargs) | |
180 | |
181 # Check if we are running on Windows and we have the colorama module, | |
182 # if we do then wrap our file with it. | |
183 if WINDOWS and colorama: | |
184 self.file = colorama.AnsiToWin32(self.file) | |
185 # The progress code expects to be able to call self.file.isatty() | |
186 # but the colorama.AnsiToWin32() object doesn't have that, so we'll | |
187 # add it. | |
188 self.file.isatty = lambda: self.file.wrapped.isatty() | |
189 # The progress code expects to be able to call self.file.flush() | |
190 # but the colorama.AnsiToWin32() object doesn't have that, so we'll | |
191 # add it. | |
192 self.file.flush = lambda: self.file.wrapped.flush() | |
193 | |
194 | |
195 class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin, | |
196 DownloadProgressMixin): | |
197 | |
198 file = sys.stdout | |
199 message = "%(percent)d%%" | |
200 suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s" | |
201 | |
202 # NOTE: The "type: ignore" comments on the following classes are there to | |
203 # work around https://github.com/python/typing/issues/241 | |
204 | |
205 | |
206 class DefaultDownloadProgressBar(BaseDownloadProgressBar, | |
207 _BaseBar): | |
208 pass | |
209 | |
210 | |
211 class DownloadSilentBar(BaseDownloadProgressBar, SilentBar): # type: ignore | |
212 pass | |
213 | |
214 | |
215 class DownloadBar(BaseDownloadProgressBar, # type: ignore | |
216 Bar): | |
217 pass | |
218 | |
219 | |
220 class DownloadFillingCirclesBar(BaseDownloadProgressBar, # type: ignore | |
221 FillingCirclesBar): | |
222 pass | |
223 | |
224 | |
225 class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, # type: ignore | |
226 BlueEmojiBar): | |
227 pass | |
228 | |
229 | |
230 class DownloadProgressSpinner(WindowsMixin, InterruptibleMixin, | |
231 DownloadProgressMixin, Spinner): | |
232 | |
233 file = sys.stdout | |
234 suffix = "%(downloaded)s %(download_speed)s" | |
235 | |
236 def next_phase(self): | |
237 if not hasattr(self, "_phaser"): | |
238 self._phaser = itertools.cycle(self.phases) | |
239 return next(self._phaser) | |
240 | |
241 def update(self): | |
242 message = self.message % self | |
243 phase = self.next_phase() | |
244 suffix = self.suffix % self | |
245 line = ''.join([ | |
246 message, | |
247 " " if message else "", | |
248 phase, | |
249 " " if suffix else "", | |
250 suffix, | |
251 ]) | |
252 | |
253 self.writeln(line) | |
254 | |
255 | |
256 BAR_TYPES = { | |
257 "off": (DownloadSilentBar, DownloadSilentBar), | |
258 "on": (DefaultDownloadProgressBar, DownloadProgressSpinner), | |
259 "ascii": (DownloadBar, DownloadProgressSpinner), | |
260 "pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner), | |
261 "emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner) | |
262 } | |
263 | |
264 | |
265 def DownloadProgressProvider(progress_bar, max=None): | |
266 if max is None or max == 0: | |
267 return BAR_TYPES[progress_bar][1]().iter | |
268 else: | |
269 return BAR_TYPES[progress_bar][0](max=max).iter | |
270 | |
271 | |
272 ################################################################ | |
273 # Generic "something is happening" spinners | |
274 # | |
275 # We don't even try using progress.spinner.Spinner here because it's actually | |
276 # simpler to reimplement from scratch than to coerce their code into doing | |
277 # what we need. | |
278 ################################################################ | |
279 | |
280 @contextlib.contextmanager | |
281 def hidden_cursor(file): | |
282 # type: (IO[Any]) -> Iterator[None] | |
283 # The Windows terminal does not support the hide/show cursor ANSI codes, | |
284 # even via colorama. So don't even try. | |
285 if WINDOWS: | |
286 yield | |
287 # We don't want to clutter the output with control characters if we're | |
288 # writing to a file, or if the user is running with --quiet. | |
289 # See https://github.com/pypa/pip/issues/3418 | |
290 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO: | |
291 yield | |
292 else: | |
293 file.write(HIDE_CURSOR) | |
294 try: | |
295 yield | |
296 finally: | |
297 file.write(SHOW_CURSOR) | |
298 | |
299 | |
300 class RateLimiter(object): | |
301 def __init__(self, min_update_interval_seconds): | |
302 # type: (float) -> None | |
303 self._min_update_interval_seconds = min_update_interval_seconds | |
304 self._last_update = 0 # type: float | |
305 | |
306 def ready(self): | |
307 # type: () -> bool | |
308 now = time.time() | |
309 delta = now - self._last_update | |
310 return delta >= self._min_update_interval_seconds | |
311 | |
312 def reset(self): | |
313 # type: () -> None | |
314 self._last_update = time.time() | |
315 | |
316 | |
317 class SpinnerInterface(object): | |
318 def spin(self): | |
319 # type: () -> None | |
320 raise NotImplementedError() | |
321 | |
322 def finish(self, final_status): | |
323 # type: (str) -> None | |
324 raise NotImplementedError() | |
325 | |
326 | |
327 class InteractiveSpinner(SpinnerInterface): | |
328 def __init__(self, message, file=None, spin_chars="-\\|/", | |
329 # Empirically, 8 updates/second looks nice | |
330 min_update_interval_seconds=0.125): | |
331 self._message = message | |
332 if file is None: | |
333 file = sys.stdout | |
334 self._file = file | |
335 self._rate_limiter = RateLimiter(min_update_interval_seconds) | |
336 self._finished = False | |
337 | |
338 self._spin_cycle = itertools.cycle(spin_chars) | |
339 | |
340 self._file.write(" " * get_indentation() + self._message + " ... ") | |
341 self._width = 0 | |
342 | |
343 def _write(self, status): | |
344 assert not self._finished | |
345 # Erase what we wrote before by backspacing to the beginning, writing | |
346 # spaces to overwrite the old text, and then backspacing again | |
347 backup = "\b" * self._width | |
348 self._file.write(backup + " " * self._width + backup) | |
349 # Now we have a blank slate to add our status | |
350 self._file.write(status) | |
351 self._width = len(status) | |
352 self._file.flush() | |
353 self._rate_limiter.reset() | |
354 | |
355 def spin(self): | |
356 # type: () -> None | |
357 if self._finished: | |
358 return | |
359 if not self._rate_limiter.ready(): | |
360 return | |
361 self._write(next(self._spin_cycle)) | |
362 | |
363 def finish(self, final_status): | |
364 # type: (str) -> None | |
365 if self._finished: | |
366 return | |
367 self._write(final_status) | |
368 self._file.write("\n") | |
369 self._file.flush() | |
370 self._finished = True | |
371 | |
372 | |
373 # Used for dumb terminals, non-interactive installs (no tty), etc. | |
374 # We still print updates occasionally (once every 60 seconds by default) to | |
375 # act as a keep-alive for systems like Travis-CI that take lack-of-output as | |
376 # an indication that a task has frozen. | |
377 class NonInteractiveSpinner(SpinnerInterface): | |
378 def __init__(self, message, min_update_interval_seconds=60): | |
379 # type: (str, float) -> None | |
380 self._message = message | |
381 self._finished = False | |
382 self._rate_limiter = RateLimiter(min_update_interval_seconds) | |
383 self._update("started") | |
384 | |
385 def _update(self, status): | |
386 assert not self._finished | |
387 self._rate_limiter.reset() | |
388 logger.info("%s: %s", self._message, status) | |
389 | |
390 def spin(self): | |
391 # type: () -> None | |
392 if self._finished: | |
393 return | |
394 if not self._rate_limiter.ready(): | |
395 return | |
396 self._update("still running...") | |
397 | |
398 def finish(self, final_status): | |
399 # type: (str) -> None | |
400 if self._finished: | |
401 return | |
402 self._update("finished with status '%s'" % (final_status,)) | |
403 self._finished = True | |
404 | |
405 | |
406 @contextlib.contextmanager | |
407 def open_spinner(message): | |
408 # type: (str) -> Iterator[SpinnerInterface] | |
409 # Interactive spinner goes directly to sys.stdout rather than being routed | |
410 # through the logging system, but it acts like it has level INFO, | |
411 # i.e. it's only displayed if we're at level INFO or better. | |
412 # Non-interactive spinner goes through the logging system, so it is always | |
413 # in sync with logging configuration. | |
414 if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO: | |
415 spinner = InteractiveSpinner(message) # type: SpinnerInterface | |
416 else: | |
417 spinner = NonInteractiveSpinner(message) | |
418 try: | |
419 with hidden_cursor(sys.stdout): | |
420 yield spinner | |
421 except KeyboardInterrupt: | |
422 spinner.finish("canceled") | |
423 raise | |
424 except Exception: | |
425 spinner.finish("error") | |
426 raise | |
427 else: | |
428 spinner.finish("done") |