comparison planemo/lib/python3.7/site-packages/pip/_internal/utils/ui.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 from __future__ import absolute_import, division
2
3 import contextlib
4 import itertools
5 import logging
6 import sys
7 import time
8 from signal import SIGINT, default_int_handler, signal
9
10 from pip._vendor import six
11 from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
12 from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar
13 from pip._vendor.progress.spinner import Spinner
14
15 from pip._internal.utils.compat import WINDOWS
16 from pip._internal.utils.logging import get_indentation
17 from pip._internal.utils.misc import format_size
18 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
19
20 if MYPY_CHECK_RUNNING:
21 from typing import Any, Iterator, IO
22
23 try:
24 from pip._vendor import colorama
25 # Lots of different errors can come from this, including SystemError and
26 # ImportError.
27 except Exception:
28 colorama = None
29
30 logger = logging.getLogger(__name__)
31
32
33 def _select_progress_class(preferred, fallback):
34 encoding = getattr(preferred.file, "encoding", None)
35
36 # If we don't know what encoding this file is in, then we'll just assume
37 # that it doesn't support unicode and use the ASCII bar.
38 if not encoding:
39 return fallback
40
41 # Collect all of the possible characters we want to use with the preferred
42 # bar.
43 characters = [
44 getattr(preferred, "empty_fill", six.text_type()),
45 getattr(preferred, "fill", six.text_type()),
46 ]
47 characters += list(getattr(preferred, "phases", []))
48
49 # Try to decode the characters we're using for the bar using the encoding
50 # of the given file, if this works then we'll assume that we can use the
51 # fancier bar and if not we'll fall back to the plaintext bar.
52 try:
53 six.text_type().join(characters).encode(encoding)
54 except UnicodeEncodeError:
55 return fallback
56 else:
57 return preferred
58
59
60 _BaseBar = _select_progress_class(IncrementalBar, Bar) # type: Any
61
62
63 class InterruptibleMixin(object):
64 """
65 Helper to ensure that self.finish() gets called on keyboard interrupt.
66
67 This allows downloads to be interrupted without leaving temporary state
68 (like hidden cursors) behind.
69
70 This class is similar to the progress library's existing SigIntMixin
71 helper, but as of version 1.2, that helper has the following problems:
72
73 1. It calls sys.exit().
74 2. It discards the existing SIGINT handler completely.
75 3. It leaves its own handler in place even after an uninterrupted finish,
76 which will have unexpected delayed effects if the user triggers an
77 unrelated keyboard interrupt some time after a progress-displaying
78 download has already completed, for example.
79 """
80
81 def __init__(self, *args, **kwargs):
82 """
83 Save the original SIGINT handler for later.
84 """
85 super(InterruptibleMixin, self).__init__(*args, **kwargs)
86
87 self.original_handler = signal(SIGINT, self.handle_sigint)
88
89 # If signal() returns None, the previous handler was not installed from
90 # Python, and we cannot restore it. This probably should not happen,
91 # but if it does, we must restore something sensible instead, at least.
92 # The least bad option should be Python's default SIGINT handler, which
93 # just raises KeyboardInterrupt.
94 if self.original_handler is None:
95 self.original_handler = default_int_handler
96
97 def finish(self):
98 """
99 Restore the original SIGINT handler after finishing.
100
101 This should happen regardless of whether the progress display finishes
102 normally, or gets interrupted.
103 """
104 super(InterruptibleMixin, self).finish()
105 signal(SIGINT, self.original_handler)
106
107 def handle_sigint(self, signum, frame):
108 """
109 Call self.finish() before delegating to the original SIGINT handler.
110
111 This handler should only be in place while the progress display is
112 active.
113 """
114 self.finish()
115 self.original_handler(signum, frame)
116
117
118 class SilentBar(Bar):
119
120 def update(self):
121 pass
122
123
124 class BlueEmojiBar(IncrementalBar):
125
126 suffix = "%(percent)d%%"
127 bar_prefix = " "
128 bar_suffix = " "
129 phases = (u"\U0001F539", u"\U0001F537", u"\U0001F535") # type: Any
130
131
132 class DownloadProgressMixin(object):
133
134 def __init__(self, *args, **kwargs):
135 super(DownloadProgressMixin, self).__init__(*args, **kwargs)
136 self.message = (" " * (get_indentation() + 2)) + self.message
137
138 @property
139 def downloaded(self):
140 return format_size(self.index)
141
142 @property
143 def download_speed(self):
144 # Avoid zero division errors...
145 if self.avg == 0.0:
146 return "..."
147 return format_size(1 / self.avg) + "/s"
148
149 @property
150 def pretty_eta(self):
151 if self.eta:
152 return "eta %s" % self.eta_td
153 return ""
154
155 def iter(self, it, n=1):
156 for x in it:
157 yield x
158 self.next(n)
159 self.finish()
160
161
162 class WindowsMixin(object):
163
164 def __init__(self, *args, **kwargs):
165 # The Windows terminal does not support the hide/show cursor ANSI codes
166 # even with colorama. So we'll ensure that hide_cursor is False on
167 # Windows.
168 # This call needs to go before the super() call, so that hide_cursor
169 # is set in time. The base progress bar class writes the "hide cursor"
170 # code to the terminal in its init, so if we don't set this soon
171 # enough, we get a "hide" with no corresponding "show"...
172 if WINDOWS and self.hide_cursor:
173 self.hide_cursor = False
174
175 super(WindowsMixin, self).__init__(*args, **kwargs)
176
177 # Check if we are running on Windows and we have the colorama module,
178 # if we do then wrap our file with it.
179 if WINDOWS and colorama:
180 self.file = colorama.AnsiToWin32(self.file)
181 # The progress code expects to be able to call self.file.isatty()
182 # but the colorama.AnsiToWin32() object doesn't have that, so we'll
183 # add it.
184 self.file.isatty = lambda: self.file.wrapped.isatty()
185 # The progress code expects to be able to call self.file.flush()
186 # but the colorama.AnsiToWin32() object doesn't have that, so we'll
187 # add it.
188 self.file.flush = lambda: self.file.wrapped.flush()
189
190
191 class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin,
192 DownloadProgressMixin):
193
194 file = sys.stdout
195 message = "%(percent)d%%"
196 suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s"
197
198 # NOTE: The "type: ignore" comments on the following classes are there to
199 # work around https://github.com/python/typing/issues/241
200
201
202 class DefaultDownloadProgressBar(BaseDownloadProgressBar,
203 _BaseBar):
204 pass
205
206
207 class DownloadSilentBar(BaseDownloadProgressBar, SilentBar): # type: ignore
208 pass
209
210
211 class DownloadBar(BaseDownloadProgressBar, # type: ignore
212 Bar):
213 pass
214
215
216 class DownloadFillingCirclesBar(BaseDownloadProgressBar, # type: ignore
217 FillingCirclesBar):
218 pass
219
220
221 class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, # type: ignore
222 BlueEmojiBar):
223 pass
224
225
226 class DownloadProgressSpinner(WindowsMixin, InterruptibleMixin,
227 DownloadProgressMixin, Spinner):
228
229 file = sys.stdout
230 suffix = "%(downloaded)s %(download_speed)s"
231
232 def next_phase(self):
233 if not hasattr(self, "_phaser"):
234 self._phaser = itertools.cycle(self.phases)
235 return next(self._phaser)
236
237 def update(self):
238 message = self.message % self
239 phase = self.next_phase()
240 suffix = self.suffix % self
241 line = ''.join([
242 message,
243 " " if message else "",
244 phase,
245 " " if suffix else "",
246 suffix,
247 ])
248
249 self.writeln(line)
250
251
252 BAR_TYPES = {
253 "off": (DownloadSilentBar, DownloadSilentBar),
254 "on": (DefaultDownloadProgressBar, DownloadProgressSpinner),
255 "ascii": (DownloadBar, DownloadProgressSpinner),
256 "pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner),
257 "emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner)
258 }
259
260
261 def DownloadProgressProvider(progress_bar, max=None):
262 if max is None or max == 0:
263 return BAR_TYPES[progress_bar][1]().iter
264 else:
265 return BAR_TYPES[progress_bar][0](max=max).iter
266
267
268 ################################################################
269 # Generic "something is happening" spinners
270 #
271 # We don't even try using progress.spinner.Spinner here because it's actually
272 # simpler to reimplement from scratch than to coerce their code into doing
273 # what we need.
274 ################################################################
275
276 @contextlib.contextmanager
277 def hidden_cursor(file):
278 # type: (IO) -> Iterator[None]
279 # The Windows terminal does not support the hide/show cursor ANSI codes,
280 # even via colorama. So don't even try.
281 if WINDOWS:
282 yield
283 # We don't want to clutter the output with control characters if we're
284 # writing to a file, or if the user is running with --quiet.
285 # See https://github.com/pypa/pip/issues/3418
286 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
287 yield
288 else:
289 file.write(HIDE_CURSOR)
290 try:
291 yield
292 finally:
293 file.write(SHOW_CURSOR)
294
295
296 class RateLimiter(object):
297 def __init__(self, min_update_interval_seconds):
298 # type: (float) -> None
299 self._min_update_interval_seconds = min_update_interval_seconds
300 self._last_update = 0 # type: float
301
302 def ready(self):
303 # type: () -> bool
304 now = time.time()
305 delta = now - self._last_update
306 return delta >= self._min_update_interval_seconds
307
308 def reset(self):
309 # type: () -> None
310 self._last_update = time.time()
311
312
313 class SpinnerInterface(object):
314 def spin(self):
315 # type: () -> None
316 raise NotImplementedError()
317
318 def finish(self, final_status):
319 # type: (str) -> None
320 raise NotImplementedError()
321
322
323 class InteractiveSpinner(SpinnerInterface):
324 def __init__(self, message, file=None, spin_chars="-\\|/",
325 # Empirically, 8 updates/second looks nice
326 min_update_interval_seconds=0.125):
327 self._message = message
328 if file is None:
329 file = sys.stdout
330 self._file = file
331 self._rate_limiter = RateLimiter(min_update_interval_seconds)
332 self._finished = False
333
334 self._spin_cycle = itertools.cycle(spin_chars)
335
336 self._file.write(" " * get_indentation() + self._message + " ... ")
337 self._width = 0
338
339 def _write(self, status):
340 assert not self._finished
341 # Erase what we wrote before by backspacing to the beginning, writing
342 # spaces to overwrite the old text, and then backspacing again
343 backup = "\b" * self._width
344 self._file.write(backup + " " * self._width + backup)
345 # Now we have a blank slate to add our status
346 self._file.write(status)
347 self._width = len(status)
348 self._file.flush()
349 self._rate_limiter.reset()
350
351 def spin(self):
352 # type: () -> None
353 if self._finished:
354 return
355 if not self._rate_limiter.ready():
356 return
357 self._write(next(self._spin_cycle))
358
359 def finish(self, final_status):
360 # type: (str) -> None
361 if self._finished:
362 return
363 self._write(final_status)
364 self._file.write("\n")
365 self._file.flush()
366 self._finished = True
367
368
369 # Used for dumb terminals, non-interactive installs (no tty), etc.
370 # We still print updates occasionally (once every 60 seconds by default) to
371 # act as a keep-alive for systems like Travis-CI that take lack-of-output as
372 # an indication that a task has frozen.
373 class NonInteractiveSpinner(SpinnerInterface):
374 def __init__(self, message, min_update_interval_seconds=60):
375 # type: (str, float) -> None
376 self._message = message
377 self._finished = False
378 self._rate_limiter = RateLimiter(min_update_interval_seconds)
379 self._update("started")
380
381 def _update(self, status):
382 assert not self._finished
383 self._rate_limiter.reset()
384 logger.info("%s: %s", self._message, status)
385
386 def spin(self):
387 # type: () -> None
388 if self._finished:
389 return
390 if not self._rate_limiter.ready():
391 return
392 self._update("still running...")
393
394 def finish(self, final_status):
395 # type: (str) -> None
396 if self._finished:
397 return
398 self._update("finished with status '%s'" % (final_status,))
399 self._finished = True
400
401
402 @contextlib.contextmanager
403 def open_spinner(message):
404 # type: (str) -> Iterator[SpinnerInterface]
405 # Interactive spinner goes directly to sys.stdout rather than being routed
406 # through the logging system, but it acts like it has level INFO,
407 # i.e. it's only displayed if we're at level INFO or better.
408 # Non-interactive spinner goes through the logging system, so it is always
409 # in sync with logging configuration.
410 if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
411 spinner = InteractiveSpinner(message) # type: SpinnerInterface
412 else:
413 spinner = NonInteractiveSpinner(message)
414 try:
415 with hidden_cursor(sys.stdout):
416 yield spinner
417 except KeyboardInterrupt:
418 spinner.finish("canceled")
419 raise
420 except Exception:
421 spinner.finish("error")
422 raise
423 else:
424 spinner.finish("done")