comparison env/lib/python3.7/site-packages/click/_termui_impl.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
comparison
equal deleted inserted replaced
4:79f47841a781 5:9b1c78e6ba9c
1 # -*- coding: utf-8 -*-
2 """
3 This module contains implementations for the termui module. To keep the
4 import time of Click down, some infrequently used functionality is
5 placed in this module and only imported as needed.
6 """
7 import contextlib
8 import math
9 import os
10 import sys
11 import time
12
13 from ._compat import _default_text_stdout
14 from ._compat import CYGWIN
15 from ._compat import get_best_encoding
16 from ._compat import int_types
17 from ._compat import isatty
18 from ._compat import open_stream
19 from ._compat import range_type
20 from ._compat import strip_ansi
21 from ._compat import term_len
22 from ._compat import WIN
23 from .exceptions import ClickException
24 from .utils import echo
25
26 if os.name == "nt":
27 BEFORE_BAR = "\r"
28 AFTER_BAR = "\n"
29 else:
30 BEFORE_BAR = "\r\033[?25l"
31 AFTER_BAR = "\033[?25h\n"
32
33
34 def _length_hint(obj):
35 """Returns the length hint of an object."""
36 try:
37 return len(obj)
38 except (AttributeError, TypeError):
39 try:
40 get_hint = type(obj).__length_hint__
41 except AttributeError:
42 return None
43 try:
44 hint = get_hint(obj)
45 except TypeError:
46 return None
47 if hint is NotImplemented or not isinstance(hint, int_types) or hint < 0:
48 return None
49 return hint
50
51
52 class ProgressBar(object):
53 def __init__(
54 self,
55 iterable,
56 length=None,
57 fill_char="#",
58 empty_char=" ",
59 bar_template="%(bar)s",
60 info_sep=" ",
61 show_eta=True,
62 show_percent=None,
63 show_pos=False,
64 item_show_func=None,
65 label=None,
66 file=None,
67 color=None,
68 width=30,
69 ):
70 self.fill_char = fill_char
71 self.empty_char = empty_char
72 self.bar_template = bar_template
73 self.info_sep = info_sep
74 self.show_eta = show_eta
75 self.show_percent = show_percent
76 self.show_pos = show_pos
77 self.item_show_func = item_show_func
78 self.label = label or ""
79 if file is None:
80 file = _default_text_stdout()
81 self.file = file
82 self.color = color
83 self.width = width
84 self.autowidth = width == 0
85
86 if length is None:
87 length = _length_hint(iterable)
88 if iterable is None:
89 if length is None:
90 raise TypeError("iterable or length is required")
91 iterable = range_type(length)
92 self.iter = iter(iterable)
93 self.length = length
94 self.length_known = length is not None
95 self.pos = 0
96 self.avg = []
97 self.start = self.last_eta = time.time()
98 self.eta_known = False
99 self.finished = False
100 self.max_width = None
101 self.entered = False
102 self.current_item = None
103 self.is_hidden = not isatty(self.file)
104 self._last_line = None
105 self.short_limit = 0.5
106
107 def __enter__(self):
108 self.entered = True
109 self.render_progress()
110 return self
111
112 def __exit__(self, exc_type, exc_value, tb):
113 self.render_finish()
114
115 def __iter__(self):
116 if not self.entered:
117 raise RuntimeError("You need to use progress bars in a with block.")
118 self.render_progress()
119 return self.generator()
120
121 def __next__(self):
122 # Iteration is defined in terms of a generator function,
123 # returned by iter(self); use that to define next(). This works
124 # because `self.iter` is an iterable consumed by that generator,
125 # so it is re-entry safe. Calling `next(self.generator())`
126 # twice works and does "what you want".
127 return next(iter(self))
128
129 # Python 2 compat
130 next = __next__
131
132 def is_fast(self):
133 return time.time() - self.start <= self.short_limit
134
135 def render_finish(self):
136 if self.is_hidden or self.is_fast():
137 return
138 self.file.write(AFTER_BAR)
139 self.file.flush()
140
141 @property
142 def pct(self):
143 if self.finished:
144 return 1.0
145 return min(self.pos / (float(self.length) or 1), 1.0)
146
147 @property
148 def time_per_iteration(self):
149 if not self.avg:
150 return 0.0
151 return sum(self.avg) / float(len(self.avg))
152
153 @property
154 def eta(self):
155 if self.length_known and not self.finished:
156 return self.time_per_iteration * (self.length - self.pos)
157 return 0.0
158
159 def format_eta(self):
160 if self.eta_known:
161 t = int(self.eta)
162 seconds = t % 60
163 t //= 60
164 minutes = t % 60
165 t //= 60
166 hours = t % 24
167 t //= 24
168 if t > 0:
169 return "{}d {:02}:{:02}:{:02}".format(t, hours, minutes, seconds)
170 else:
171 return "{:02}:{:02}:{:02}".format(hours, minutes, seconds)
172 return ""
173
174 def format_pos(self):
175 pos = str(self.pos)
176 if self.length_known:
177 pos += "/{}".format(self.length)
178 return pos
179
180 def format_pct(self):
181 return "{: 4}%".format(int(self.pct * 100))[1:]
182
183 def format_bar(self):
184 if self.length_known:
185 bar_length = int(self.pct * self.width)
186 bar = self.fill_char * bar_length
187 bar += self.empty_char * (self.width - bar_length)
188 elif self.finished:
189 bar = self.fill_char * self.width
190 else:
191 bar = list(self.empty_char * (self.width or 1))
192 if self.time_per_iteration != 0:
193 bar[
194 int(
195 (math.cos(self.pos * self.time_per_iteration) / 2.0 + 0.5)
196 * self.width
197 )
198 ] = self.fill_char
199 bar = "".join(bar)
200 return bar
201
202 def format_progress_line(self):
203 show_percent = self.show_percent
204
205 info_bits = []
206 if self.length_known and show_percent is None:
207 show_percent = not self.show_pos
208
209 if self.show_pos:
210 info_bits.append(self.format_pos())
211 if show_percent:
212 info_bits.append(self.format_pct())
213 if self.show_eta and self.eta_known and not self.finished:
214 info_bits.append(self.format_eta())
215 if self.item_show_func is not None:
216 item_info = self.item_show_func(self.current_item)
217 if item_info is not None:
218 info_bits.append(item_info)
219
220 return (
221 self.bar_template
222 % {
223 "label": self.label,
224 "bar": self.format_bar(),
225 "info": self.info_sep.join(info_bits),
226 }
227 ).rstrip()
228
229 def render_progress(self):
230 from .termui import get_terminal_size
231
232 if self.is_hidden:
233 return
234
235 buf = []
236 # Update width in case the terminal has been resized
237 if self.autowidth:
238 old_width = self.width
239 self.width = 0
240 clutter_length = term_len(self.format_progress_line())
241 new_width = max(0, get_terminal_size()[0] - clutter_length)
242 if new_width < old_width:
243 buf.append(BEFORE_BAR)
244 buf.append(" " * self.max_width)
245 self.max_width = new_width
246 self.width = new_width
247
248 clear_width = self.width
249 if self.max_width is not None:
250 clear_width = self.max_width
251
252 buf.append(BEFORE_BAR)
253 line = self.format_progress_line()
254 line_len = term_len(line)
255 if self.max_width is None or self.max_width < line_len:
256 self.max_width = line_len
257
258 buf.append(line)
259 buf.append(" " * (clear_width - line_len))
260 line = "".join(buf)
261 # Render the line only if it changed.
262
263 if line != self._last_line and not self.is_fast():
264 self._last_line = line
265 echo(line, file=self.file, color=self.color, nl=False)
266 self.file.flush()
267
268 def make_step(self, n_steps):
269 self.pos += n_steps
270 if self.length_known and self.pos >= self.length:
271 self.finished = True
272
273 if (time.time() - self.last_eta) < 1.0:
274 return
275
276 self.last_eta = time.time()
277
278 # self.avg is a rolling list of length <= 7 of steps where steps are
279 # defined as time elapsed divided by the total progress through
280 # self.length.
281 if self.pos:
282 step = (time.time() - self.start) / self.pos
283 else:
284 step = time.time() - self.start
285
286 self.avg = self.avg[-6:] + [step]
287
288 self.eta_known = self.length_known
289
290 def update(self, n_steps):
291 self.make_step(n_steps)
292 self.render_progress()
293
294 def finish(self):
295 self.eta_known = 0
296 self.current_item = None
297 self.finished = True
298
299 def generator(self):
300 """Return a generator which yields the items added to the bar
301 during construction, and updates the progress bar *after* the
302 yielded block returns.
303 """
304 # WARNING: the iterator interface for `ProgressBar` relies on
305 # this and only works because this is a simple generator which
306 # doesn't create or manage additional state. If this function
307 # changes, the impact should be evaluated both against
308 # `iter(bar)` and `next(bar)`. `next()` in particular may call
309 # `self.generator()` repeatedly, and this must remain safe in
310 # order for that interface to work.
311 if not self.entered:
312 raise RuntimeError("You need to use progress bars in a with block.")
313
314 if self.is_hidden:
315 for rv in self.iter:
316 yield rv
317 else:
318 for rv in self.iter:
319 self.current_item = rv
320 yield rv
321 self.update(1)
322 self.finish()
323 self.render_progress()
324
325
326 def pager(generator, color=None):
327 """Decide what method to use for paging through text."""
328 stdout = _default_text_stdout()
329 if not isatty(sys.stdin) or not isatty(stdout):
330 return _nullpager(stdout, generator, color)
331 pager_cmd = (os.environ.get("PAGER", None) or "").strip()
332 if pager_cmd:
333 if WIN:
334 return _tempfilepager(generator, pager_cmd, color)
335 return _pipepager(generator, pager_cmd, color)
336 if os.environ.get("TERM") in ("dumb", "emacs"):
337 return _nullpager(stdout, generator, color)
338 if WIN or sys.platform.startswith("os2"):
339 return _tempfilepager(generator, "more <", color)
340 if hasattr(os, "system") and os.system("(less) 2>/dev/null") == 0:
341 return _pipepager(generator, "less", color)
342
343 import tempfile
344
345 fd, filename = tempfile.mkstemp()
346 os.close(fd)
347 try:
348 if hasattr(os, "system") and os.system('more "{}"'.format(filename)) == 0:
349 return _pipepager(generator, "more", color)
350 return _nullpager(stdout, generator, color)
351 finally:
352 os.unlink(filename)
353
354
355 def _pipepager(generator, cmd, color):
356 """Page through text by feeding it to another program. Invoking a
357 pager through this might support colors.
358 """
359 import subprocess
360
361 env = dict(os.environ)
362
363 # If we're piping to less we might support colors under the
364 # condition that
365 cmd_detail = cmd.rsplit("/", 1)[-1].split()
366 if color is None and cmd_detail[0] == "less":
367 less_flags = "{}{}".format(os.environ.get("LESS", ""), " ".join(cmd_detail[1:]))
368 if not less_flags:
369 env["LESS"] = "-R"
370 color = True
371 elif "r" in less_flags or "R" in less_flags:
372 color = True
373
374 c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, env=env)
375 encoding = get_best_encoding(c.stdin)
376 try:
377 for text in generator:
378 if not color:
379 text = strip_ansi(text)
380
381 c.stdin.write(text.encode(encoding, "replace"))
382 except (IOError, KeyboardInterrupt):
383 pass
384 else:
385 c.stdin.close()
386
387 # Less doesn't respect ^C, but catches it for its own UI purposes (aborting
388 # search or other commands inside less).
389 #
390 # That means when the user hits ^C, the parent process (click) terminates,
391 # but less is still alive, paging the output and messing up the terminal.
392 #
393 # If the user wants to make the pager exit on ^C, they should set
394 # `LESS='-K'`. It's not our decision to make.
395 while True:
396 try:
397 c.wait()
398 except KeyboardInterrupt:
399 pass
400 else:
401 break
402
403
404 def _tempfilepager(generator, cmd, color):
405 """Page through text by invoking a program on a temporary file."""
406 import tempfile
407
408 filename = tempfile.mktemp()
409 # TODO: This never terminates if the passed generator never terminates.
410 text = "".join(generator)
411 if not color:
412 text = strip_ansi(text)
413 encoding = get_best_encoding(sys.stdout)
414 with open_stream(filename, "wb")[0] as f:
415 f.write(text.encode(encoding))
416 try:
417 os.system('{} "{}"'.format(cmd, filename))
418 finally:
419 os.unlink(filename)
420
421
422 def _nullpager(stream, generator, color):
423 """Simply print unformatted text. This is the ultimate fallback."""
424 for text in generator:
425 if not color:
426 text = strip_ansi(text)
427 stream.write(text)
428
429
430 class Editor(object):
431 def __init__(self, editor=None, env=None, require_save=True, extension=".txt"):
432 self.editor = editor
433 self.env = env
434 self.require_save = require_save
435 self.extension = extension
436
437 def get_editor(self):
438 if self.editor is not None:
439 return self.editor
440 for key in "VISUAL", "EDITOR":
441 rv = os.environ.get(key)
442 if rv:
443 return rv
444 if WIN:
445 return "notepad"
446 for editor in "sensible-editor", "vim", "nano":
447 if os.system("which {} >/dev/null 2>&1".format(editor)) == 0:
448 return editor
449 return "vi"
450
451 def edit_file(self, filename):
452 import subprocess
453
454 editor = self.get_editor()
455 if self.env:
456 environ = os.environ.copy()
457 environ.update(self.env)
458 else:
459 environ = None
460 try:
461 c = subprocess.Popen(
462 '{} "{}"'.format(editor, filename), env=environ, shell=True,
463 )
464 exit_code = c.wait()
465 if exit_code != 0:
466 raise ClickException("{}: Editing failed!".format(editor))
467 except OSError as e:
468 raise ClickException("{}: Editing failed: {}".format(editor, e))
469
470 def edit(self, text):
471 import tempfile
472
473 text = text or ""
474 if text and not text.endswith("\n"):
475 text += "\n"
476
477 fd, name = tempfile.mkstemp(prefix="editor-", suffix=self.extension)
478 try:
479 if WIN:
480 encoding = "utf-8-sig"
481 text = text.replace("\n", "\r\n")
482 else:
483 encoding = "utf-8"
484 text = text.encode(encoding)
485
486 f = os.fdopen(fd, "wb")
487 f.write(text)
488 f.close()
489 timestamp = os.path.getmtime(name)
490
491 self.edit_file(name)
492
493 if self.require_save and os.path.getmtime(name) == timestamp:
494 return None
495
496 f = open(name, "rb")
497 try:
498 rv = f.read()
499 finally:
500 f.close()
501 return rv.decode("utf-8-sig").replace("\r\n", "\n")
502 finally:
503 os.unlink(name)
504
505
506 def open_url(url, wait=False, locate=False):
507 import subprocess
508
509 def _unquote_file(url):
510 try:
511 import urllib
512 except ImportError:
513 import urllib
514 if url.startswith("file://"):
515 url = urllib.unquote(url[7:])
516 return url
517
518 if sys.platform == "darwin":
519 args = ["open"]
520 if wait:
521 args.append("-W")
522 if locate:
523 args.append("-R")
524 args.append(_unquote_file(url))
525 null = open("/dev/null", "w")
526 try:
527 return subprocess.Popen(args, stderr=null).wait()
528 finally:
529 null.close()
530 elif WIN:
531 if locate:
532 url = _unquote_file(url)
533 args = 'explorer /select,"{}"'.format(_unquote_file(url.replace('"', "")))
534 else:
535 args = 'start {} "" "{}"'.format(
536 "/WAIT" if wait else "", url.replace('"', "")
537 )
538 return os.system(args)
539 elif CYGWIN:
540 if locate:
541 url = _unquote_file(url)
542 args = 'cygstart "{}"'.format(os.path.dirname(url).replace('"', ""))
543 else:
544 args = 'cygstart {} "{}"'.format("-w" if wait else "", url.replace('"', ""))
545 return os.system(args)
546
547 try:
548 if locate:
549 url = os.path.dirname(_unquote_file(url)) or "."
550 else:
551 url = _unquote_file(url)
552 c = subprocess.Popen(["xdg-open", url])
553 if wait:
554 return c.wait()
555 return 0
556 except OSError:
557 if url.startswith(("http://", "https://")) and not locate and not wait:
558 import webbrowser
559
560 webbrowser.open(url)
561 return 0
562 return 1
563
564
565 def _translate_ch_to_exc(ch):
566 if ch == u"\x03":
567 raise KeyboardInterrupt()
568 if ch == u"\x04" and not WIN: # Unix-like, Ctrl+D
569 raise EOFError()
570 if ch == u"\x1a" and WIN: # Windows, Ctrl+Z
571 raise EOFError()
572
573
574 if WIN:
575 import msvcrt
576
577 @contextlib.contextmanager
578 def raw_terminal():
579 yield
580
581 def getchar(echo):
582 # The function `getch` will return a bytes object corresponding to
583 # the pressed character. Since Windows 10 build 1803, it will also
584 # return \x00 when called a second time after pressing a regular key.
585 #
586 # `getwch` does not share this probably-bugged behavior. Moreover, it
587 # returns a Unicode object by default, which is what we want.
588 #
589 # Either of these functions will return \x00 or \xe0 to indicate
590 # a special key, and you need to call the same function again to get
591 # the "rest" of the code. The fun part is that \u00e0 is
592 # "latin small letter a with grave", so if you type that on a French
593 # keyboard, you _also_ get a \xe0.
594 # E.g., consider the Up arrow. This returns \xe0 and then \x48. The
595 # resulting Unicode string reads as "a with grave" + "capital H".
596 # This is indistinguishable from when the user actually types
597 # "a with grave" and then "capital H".
598 #
599 # When \xe0 is returned, we assume it's part of a special-key sequence
600 # and call `getwch` again, but that means that when the user types
601 # the \u00e0 character, `getchar` doesn't return until a second
602 # character is typed.
603 # The alternative is returning immediately, but that would mess up
604 # cross-platform handling of arrow keys and others that start with
605 # \xe0. Another option is using `getch`, but then we can't reliably
606 # read non-ASCII characters, because return values of `getch` are
607 # limited to the current 8-bit codepage.
608 #
609 # Anyway, Click doesn't claim to do this Right(tm), and using `getwch`
610 # is doing the right thing in more situations than with `getch`.
611 if echo:
612 func = msvcrt.getwche
613 else:
614 func = msvcrt.getwch
615
616 rv = func()
617 if rv in (u"\x00", u"\xe0"):
618 # \x00 and \xe0 are control characters that indicate special key,
619 # see above.
620 rv += func()
621 _translate_ch_to_exc(rv)
622 return rv
623
624
625 else:
626 import tty
627 import termios
628
629 @contextlib.contextmanager
630 def raw_terminal():
631 if not isatty(sys.stdin):
632 f = open("/dev/tty")
633 fd = f.fileno()
634 else:
635 fd = sys.stdin.fileno()
636 f = None
637 try:
638 old_settings = termios.tcgetattr(fd)
639 try:
640 tty.setraw(fd)
641 yield fd
642 finally:
643 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
644 sys.stdout.flush()
645 if f is not None:
646 f.close()
647 except termios.error:
648 pass
649
650 def getchar(echo):
651 with raw_terminal() as fd:
652 ch = os.read(fd, 32)
653 ch = ch.decode(get_best_encoding(sys.stdin), "replace")
654 if echo and isatty(sys.stdout):
655 sys.stdout.write(ch)
656 _translate_ch_to_exc(ch)
657 return ch