comparison lib/python3.8/site-packages/pip/_internal/utils/ui.py @ 0:9e54283cc701 draft

"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author guerler
date Mon, 27 Jul 2020 03:47:31 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:9e54283cc701
1 # The following comment should be removed at some point in the future.
2 # mypy: strict-optional=False
3 # mypy: disallow-untyped-defs=False
4
5 from __future__ import absolute_import, division
6
7 import contextlib
8 import itertools
9 import logging
10 import sys
11 import time
12 from signal import SIGINT, default_int_handler, signal
13
14 from pip._vendor import six
15 from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
16 from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar
17 from pip._vendor.progress.spinner import Spinner
18
19 from pip._internal.utils.compat import WINDOWS
20 from pip._internal.utils.logging import get_indentation
21 from pip._internal.utils.misc import format_size
22 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
23
24 if MYPY_CHECK_RUNNING:
25 from typing import Any, Iterator, IO
26
27 try:
28 from pip._vendor import colorama
29 # Lots of different errors can come from this, including SystemError and
30 # ImportError.
31 except Exception:
32 colorama = None
33
34 logger = logging.getLogger(__name__)
35
36
37 def _select_progress_class(preferred, fallback):
38 encoding = getattr(preferred.file, "encoding", None)
39
40 # If we don't know what encoding this file is in, then we'll just assume
41 # that it doesn't support unicode and use the ASCII bar.
42 if not encoding:
43 return fallback
44
45 # Collect all of the possible characters we want to use with the preferred
46 # bar.
47 characters = [
48 getattr(preferred, "empty_fill", six.text_type()),
49 getattr(preferred, "fill", six.text_type()),
50 ]
51 characters += list(getattr(preferred, "phases", []))
52
53 # Try to decode the characters we're using for the bar using the encoding
54 # of the given file, if this works then we'll assume that we can use the
55 # fancier bar and if not we'll fall back to the plaintext bar.
56 try:
57 six.text_type().join(characters).encode(encoding)
58 except UnicodeEncodeError:
59 return fallback
60 else:
61 return preferred
62
63
64 _BaseBar = _select_progress_class(IncrementalBar, Bar) # type: Any
65
66
67 class InterruptibleMixin(object):
68 """
69 Helper to ensure that self.finish() gets called on keyboard interrupt.
70
71 This allows downloads to be interrupted without leaving temporary state
72 (like hidden cursors) behind.
73
74 This class is similar to the progress library's existing SigIntMixin
75 helper, but as of version 1.2, that helper has the following problems:
76
77 1. It calls sys.exit().
78 2. It discards the existing SIGINT handler completely.
79 3. It leaves its own handler in place even after an uninterrupted finish,
80 which will have unexpected delayed effects if the user triggers an
81 unrelated keyboard interrupt some time after a progress-displaying
82 download has already completed, for example.
83 """
84
85 def __init__(self, *args, **kwargs):
86 """
87 Save the original SIGINT handler for later.
88 """
89 super(InterruptibleMixin, self).__init__(*args, **kwargs)
90
91 self.original_handler = signal(SIGINT, self.handle_sigint)
92
93 # If signal() returns None, the previous handler was not installed from
94 # Python, and we cannot restore it. This probably should not happen,
95 # but if it does, we must restore something sensible instead, at least.
96 # The least bad option should be Python's default SIGINT handler, which
97 # just raises KeyboardInterrupt.
98 if self.original_handler is None:
99 self.original_handler = default_int_handler
100
101 def finish(self):
102 """
103 Restore the original SIGINT handler after finishing.
104
105 This should happen regardless of whether the progress display finishes
106 normally, or gets interrupted.
107 """
108 super(InterruptibleMixin, self).finish()
109 signal(SIGINT, self.original_handler)
110
111 def handle_sigint(self, signum, frame):
112 """
113 Call self.finish() before delegating to the original SIGINT handler.
114
115 This handler should only be in place while the progress display is
116 active.
117 """
118 self.finish()
119 self.original_handler(signum, frame)
120
121
122 class SilentBar(Bar):
123
124 def update(self):
125 pass
126
127
128 class BlueEmojiBar(IncrementalBar):
129
130 suffix = "%(percent)d%%"
131 bar_prefix = " "
132 bar_suffix = " "
133 phases = (u"\U0001F539", u"\U0001F537", u"\U0001F535") # type: Any
134
135
136 class DownloadProgressMixin(object):
137
138 def __init__(self, *args, **kwargs):
139 super(DownloadProgressMixin, self).__init__(*args, **kwargs)
140 self.message = (" " * (get_indentation() + 2)) + self.message
141
142 @property
143 def downloaded(self):
144 return format_size(self.index)
145
146 @property
147 def download_speed(self):
148 # Avoid zero division errors...
149 if self.avg == 0.0:
150 return "..."
151 return format_size(1 / self.avg) + "/s"
152
153 @property
154 def pretty_eta(self):
155 if self.eta:
156 return "eta %s" % self.eta_td
157 return ""
158
159 def iter(self, it):
160 for x in it:
161 yield x
162 self.next(len(x))
163 self.finish()
164
165
166 class WindowsMixin(object):
167
168 def __init__(self, *args, **kwargs):
169 # The Windows terminal does not support the hide/show cursor ANSI codes
170 # even with colorama. So we'll ensure that hide_cursor is False on
171 # Windows.
172 # This call needs to go before the super() call, so that hide_cursor
173 # is set in time. The base progress bar class writes the "hide cursor"
174 # code to the terminal in its init, so if we don't set this soon
175 # enough, we get a "hide" with no corresponding "show"...
176 if WINDOWS and self.hide_cursor:
177 self.hide_cursor = False
178
179 super(WindowsMixin, self).__init__(*args, **kwargs)
180
181 # Check if we are running on Windows and we have the colorama module,
182 # if we do then wrap our file with it.
183 if WINDOWS and colorama:
184 self.file = colorama.AnsiToWin32(self.file)
185 # The progress code expects to be able to call self.file.isatty()
186 # but the colorama.AnsiToWin32() object doesn't have that, so we'll
187 # add it.
188 self.file.isatty = lambda: self.file.wrapped.isatty()
189 # The progress code expects to be able to call self.file.flush()
190 # but the colorama.AnsiToWin32() object doesn't have that, so we'll
191 # add it.
192 self.file.flush = lambda: self.file.wrapped.flush()
193
194
195 class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin,
196 DownloadProgressMixin):
197
198 file = sys.stdout
199 message = "%(percent)d%%"
200 suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s"
201
202 # NOTE: The "type: ignore" comments on the following classes are there to
203 # work around https://github.com/python/typing/issues/241
204
205
206 class DefaultDownloadProgressBar(BaseDownloadProgressBar,
207 _BaseBar):
208 pass
209
210
211 class DownloadSilentBar(BaseDownloadProgressBar, SilentBar): # type: ignore
212 pass
213
214
215 class DownloadBar(BaseDownloadProgressBar, # type: ignore
216 Bar):
217 pass
218
219
220 class DownloadFillingCirclesBar(BaseDownloadProgressBar, # type: ignore
221 FillingCirclesBar):
222 pass
223
224
225 class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, # type: ignore
226 BlueEmojiBar):
227 pass
228
229
230 class DownloadProgressSpinner(WindowsMixin, InterruptibleMixin,
231 DownloadProgressMixin, Spinner):
232
233 file = sys.stdout
234 suffix = "%(downloaded)s %(download_speed)s"
235
236 def next_phase(self):
237 if not hasattr(self, "_phaser"):
238 self._phaser = itertools.cycle(self.phases)
239 return next(self._phaser)
240
241 def update(self):
242 message = self.message % self
243 phase = self.next_phase()
244 suffix = self.suffix % self
245 line = ''.join([
246 message,
247 " " if message else "",
248 phase,
249 " " if suffix else "",
250 suffix,
251 ])
252
253 self.writeln(line)
254
255
256 BAR_TYPES = {
257 "off": (DownloadSilentBar, DownloadSilentBar),
258 "on": (DefaultDownloadProgressBar, DownloadProgressSpinner),
259 "ascii": (DownloadBar, DownloadProgressSpinner),
260 "pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner),
261 "emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner)
262 }
263
264
265 def DownloadProgressProvider(progress_bar, max=None):
266 if max is None or max == 0:
267 return BAR_TYPES[progress_bar][1]().iter
268 else:
269 return BAR_TYPES[progress_bar][0](max=max).iter
270
271
272 ################################################################
273 # Generic "something is happening" spinners
274 #
275 # We don't even try using progress.spinner.Spinner here because it's actually
276 # simpler to reimplement from scratch than to coerce their code into doing
277 # what we need.
278 ################################################################
279
280 @contextlib.contextmanager
281 def hidden_cursor(file):
282 # type: (IO[Any]) -> Iterator[None]
283 # The Windows terminal does not support the hide/show cursor ANSI codes,
284 # even via colorama. So don't even try.
285 if WINDOWS:
286 yield
287 # We don't want to clutter the output with control characters if we're
288 # writing to a file, or if the user is running with --quiet.
289 # See https://github.com/pypa/pip/issues/3418
290 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
291 yield
292 else:
293 file.write(HIDE_CURSOR)
294 try:
295 yield
296 finally:
297 file.write(SHOW_CURSOR)
298
299
300 class RateLimiter(object):
301 def __init__(self, min_update_interval_seconds):
302 # type: (float) -> None
303 self._min_update_interval_seconds = min_update_interval_seconds
304 self._last_update = 0 # type: float
305
306 def ready(self):
307 # type: () -> bool
308 now = time.time()
309 delta = now - self._last_update
310 return delta >= self._min_update_interval_seconds
311
312 def reset(self):
313 # type: () -> None
314 self._last_update = time.time()
315
316
317 class SpinnerInterface(object):
318 def spin(self):
319 # type: () -> None
320 raise NotImplementedError()
321
322 def finish(self, final_status):
323 # type: (str) -> None
324 raise NotImplementedError()
325
326
327 class InteractiveSpinner(SpinnerInterface):
328 def __init__(self, message, file=None, spin_chars="-\\|/",
329 # Empirically, 8 updates/second looks nice
330 min_update_interval_seconds=0.125):
331 self._message = message
332 if file is None:
333 file = sys.stdout
334 self._file = file
335 self._rate_limiter = RateLimiter(min_update_interval_seconds)
336 self._finished = False
337
338 self._spin_cycle = itertools.cycle(spin_chars)
339
340 self._file.write(" " * get_indentation() + self._message + " ... ")
341 self._width = 0
342
343 def _write(self, status):
344 assert not self._finished
345 # Erase what we wrote before by backspacing to the beginning, writing
346 # spaces to overwrite the old text, and then backspacing again
347 backup = "\b" * self._width
348 self._file.write(backup + " " * self._width + backup)
349 # Now we have a blank slate to add our status
350 self._file.write(status)
351 self._width = len(status)
352 self._file.flush()
353 self._rate_limiter.reset()
354
355 def spin(self):
356 # type: () -> None
357 if self._finished:
358 return
359 if not self._rate_limiter.ready():
360 return
361 self._write(next(self._spin_cycle))
362
363 def finish(self, final_status):
364 # type: (str) -> None
365 if self._finished:
366 return
367 self._write(final_status)
368 self._file.write("\n")
369 self._file.flush()
370 self._finished = True
371
372
373 # Used for dumb terminals, non-interactive installs (no tty), etc.
374 # We still print updates occasionally (once every 60 seconds by default) to
375 # act as a keep-alive for systems like Travis-CI that take lack-of-output as
376 # an indication that a task has frozen.
377 class NonInteractiveSpinner(SpinnerInterface):
378 def __init__(self, message, min_update_interval_seconds=60):
379 # type: (str, float) -> None
380 self._message = message
381 self._finished = False
382 self._rate_limiter = RateLimiter(min_update_interval_seconds)
383 self._update("started")
384
385 def _update(self, status):
386 assert not self._finished
387 self._rate_limiter.reset()
388 logger.info("%s: %s", self._message, status)
389
390 def spin(self):
391 # type: () -> None
392 if self._finished:
393 return
394 if not self._rate_limiter.ready():
395 return
396 self._update("still running...")
397
398 def finish(self, final_status):
399 # type: (str) -> None
400 if self._finished:
401 return
402 self._update("finished with status '%s'" % (final_status,))
403 self._finished = True
404
405
406 @contextlib.contextmanager
407 def open_spinner(message):
408 # type: (str) -> Iterator[SpinnerInterface]
409 # Interactive spinner goes directly to sys.stdout rather than being routed
410 # through the logging system, but it acts like it has level INFO,
411 # i.e. it's only displayed if we're at level INFO or better.
412 # Non-interactive spinner goes through the logging system, so it is always
413 # in sync with logging configuration.
414 if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
415 spinner = InteractiveSpinner(message) # type: SpinnerInterface
416 else:
417 spinner = NonInteractiveSpinner(message)
418 try:
419 with hidden_cursor(sys.stdout):
420 yield spinner
421 except KeyboardInterrupt:
422 spinner.finish("canceled")
423 raise
424 except Exception:
425 spinner.finish("error")
426 raise
427 else:
428 spinner.finish("done")