Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/utils/misc.py @ 1:64071f2a4cf0 draft default tip
Deleted selected files
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:55:49 -0400 |
parents | 9e54283cc701 |
children |
comparison
equal
deleted
inserted
replaced
0:9e54283cc701 | 1:64071f2a4cf0 |
---|---|
1 # The following comment should be removed at some point in the future. | |
2 # mypy: strict-optional=False | |
3 # mypy: disallow-untyped-defs=False | |
4 | |
5 from __future__ import absolute_import | |
6 | |
7 import contextlib | |
8 import errno | |
9 import getpass | |
10 import hashlib | |
11 import io | |
12 import logging | |
13 import os | |
14 import posixpath | |
15 import shutil | |
16 import stat | |
17 import sys | |
18 from collections import deque | |
19 | |
20 from pip._vendor import pkg_resources | |
21 # NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is | |
22 # why we ignore the type on this import. | |
23 from pip._vendor.retrying import retry # type: ignore | |
24 from pip._vendor.six import PY2, text_type | |
25 from pip._vendor.six.moves import input | |
26 from pip._vendor.six.moves.urllib import parse as urllib_parse | |
27 from pip._vendor.six.moves.urllib.parse import unquote as urllib_unquote | |
28 | |
29 from pip import __version__ | |
30 from pip._internal.exceptions import CommandError | |
31 from pip._internal.locations import ( | |
32 get_major_minor_version, | |
33 site_packages, | |
34 user_site, | |
35 ) | |
36 from pip._internal.utils.compat import ( | |
37 WINDOWS, | |
38 expanduser, | |
39 stdlib_pkgs, | |
40 str_to_display, | |
41 ) | |
42 from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast | |
43 from pip._internal.utils.virtualenv import ( | |
44 running_under_virtualenv, | |
45 virtualenv_no_global, | |
46 ) | |
47 | |
48 if PY2: | |
49 from io import BytesIO as StringIO | |
50 else: | |
51 from io import StringIO | |
52 | |
53 if MYPY_CHECK_RUNNING: | |
54 from typing import ( | |
55 Any, AnyStr, Container, Iterable, List, Optional, Text, | |
56 Tuple, Union, | |
57 ) | |
58 from pip._vendor.pkg_resources import Distribution | |
59 | |
60 VersionInfo = Tuple[int, int, int] | |
61 | |
62 | |
63 __all__ = ['rmtree', 'display_path', 'backup_dir', | |
64 'ask', 'splitext', | |
65 'format_size', 'is_installable_dir', | |
66 'normalize_path', | |
67 'renames', 'get_prog', | |
68 'captured_stdout', 'ensure_dir', | |
69 'get_installed_version', 'remove_auth_from_url'] | |
70 | |
71 | |
72 logger = logging.getLogger(__name__) | |
73 | |
74 | |
75 def get_pip_version(): | |
76 # type: () -> str | |
77 pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..") | |
78 pip_pkg_dir = os.path.abspath(pip_pkg_dir) | |
79 | |
80 return ( | |
81 'pip {} from {} (python {})'.format( | |
82 __version__, pip_pkg_dir, get_major_minor_version(), | |
83 ) | |
84 ) | |
85 | |
86 | |
87 def normalize_version_info(py_version_info): | |
88 # type: (Tuple[int, ...]) -> Tuple[int, int, int] | |
89 """ | |
90 Convert a tuple of ints representing a Python version to one of length | |
91 three. | |
92 | |
93 :param py_version_info: a tuple of ints representing a Python version, | |
94 or None to specify no version. The tuple can have any length. | |
95 | |
96 :return: a tuple of length three if `py_version_info` is non-None. | |
97 Otherwise, return `py_version_info` unchanged (i.e. None). | |
98 """ | |
99 if len(py_version_info) < 3: | |
100 py_version_info += (3 - len(py_version_info)) * (0,) | |
101 elif len(py_version_info) > 3: | |
102 py_version_info = py_version_info[:3] | |
103 | |
104 return cast('VersionInfo', py_version_info) | |
105 | |
106 | |
107 def ensure_dir(path): | |
108 # type: (AnyStr) -> None | |
109 """os.path.makedirs without EEXIST.""" | |
110 try: | |
111 os.makedirs(path) | |
112 except OSError as e: | |
113 # Windows can raise spurious ENOTEMPTY errors. See #6426. | |
114 if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY: | |
115 raise | |
116 | |
117 | |
118 def get_prog(): | |
119 # type: () -> str | |
120 try: | |
121 prog = os.path.basename(sys.argv[0]) | |
122 if prog in ('__main__.py', '-c'): | |
123 return "%s -m pip" % sys.executable | |
124 else: | |
125 return prog | |
126 except (AttributeError, TypeError, IndexError): | |
127 pass | |
128 return 'pip' | |
129 | |
130 | |
131 # Retry every half second for up to 3 seconds | |
132 @retry(stop_max_delay=3000, wait_fixed=500) | |
133 def rmtree(dir, ignore_errors=False): | |
134 # type: (str, bool) -> None | |
135 shutil.rmtree(dir, ignore_errors=ignore_errors, | |
136 onerror=rmtree_errorhandler) | |
137 | |
138 | |
139 def rmtree_errorhandler(func, path, exc_info): | |
140 """On Windows, the files in .svn are read-only, so when rmtree() tries to | |
141 remove them, an exception is thrown. We catch that here, remove the | |
142 read-only attribute, and hopefully continue without problems.""" | |
143 try: | |
144 has_attr_readonly = not (os.stat(path).st_mode & stat.S_IWRITE) | |
145 except (IOError, OSError): | |
146 # it's equivalent to os.path.exists | |
147 return | |
148 | |
149 if has_attr_readonly: | |
150 # convert to read/write | |
151 os.chmod(path, stat.S_IWRITE) | |
152 # use the original function to repeat the operation | |
153 func(path) | |
154 return | |
155 else: | |
156 raise | |
157 | |
158 | |
159 def path_to_display(path): | |
160 # type: (Optional[Union[str, Text]]) -> Optional[Text] | |
161 """ | |
162 Convert a bytes (or text) path to text (unicode in Python 2) for display | |
163 and logging purposes. | |
164 | |
165 This function should never error out. Also, this function is mainly needed | |
166 for Python 2 since in Python 3 str paths are already text. | |
167 """ | |
168 if path is None: | |
169 return None | |
170 if isinstance(path, text_type): | |
171 return path | |
172 # Otherwise, path is a bytes object (str in Python 2). | |
173 try: | |
174 display_path = path.decode(sys.getfilesystemencoding(), 'strict') | |
175 except UnicodeDecodeError: | |
176 # Include the full bytes to make troubleshooting easier, even though | |
177 # it may not be very human readable. | |
178 if PY2: | |
179 # Convert the bytes to a readable str representation using | |
180 # repr(), and then convert the str to unicode. | |
181 # Also, we add the prefix "b" to the repr() return value both | |
182 # to make the Python 2 output look like the Python 3 output, and | |
183 # to signal to the user that this is a bytes representation. | |
184 display_path = str_to_display('b{!r}'.format(path)) | |
185 else: | |
186 # Silence the "F821 undefined name 'ascii'" flake8 error since | |
187 # in Python 3 ascii() is a built-in. | |
188 display_path = ascii(path) # noqa: F821 | |
189 | |
190 return display_path | |
191 | |
192 | |
193 def display_path(path): | |
194 # type: (Union[str, Text]) -> str | |
195 """Gives the display value for a given path, making it relative to cwd | |
196 if possible.""" | |
197 path = os.path.normcase(os.path.abspath(path)) | |
198 if sys.version_info[0] == 2: | |
199 path = path.decode(sys.getfilesystemencoding(), 'replace') | |
200 path = path.encode(sys.getdefaultencoding(), 'replace') | |
201 if path.startswith(os.getcwd() + os.path.sep): | |
202 path = '.' + path[len(os.getcwd()):] | |
203 return path | |
204 | |
205 | |
206 def backup_dir(dir, ext='.bak'): | |
207 # type: (str, str) -> str | |
208 """Figure out the name of a directory to back up the given dir to | |
209 (adding .bak, .bak2, etc)""" | |
210 n = 1 | |
211 extension = ext | |
212 while os.path.exists(dir + extension): | |
213 n += 1 | |
214 extension = ext + str(n) | |
215 return dir + extension | |
216 | |
217 | |
218 def ask_path_exists(message, options): | |
219 # type: (str, Iterable[str]) -> str | |
220 for action in os.environ.get('PIP_EXISTS_ACTION', '').split(): | |
221 if action in options: | |
222 return action | |
223 return ask(message, options) | |
224 | |
225 | |
226 def _check_no_input(message): | |
227 # type: (str) -> None | |
228 """Raise an error if no input is allowed.""" | |
229 if os.environ.get('PIP_NO_INPUT'): | |
230 raise Exception( | |
231 'No input was expected ($PIP_NO_INPUT set); question: %s' % | |
232 message | |
233 ) | |
234 | |
235 | |
236 def ask(message, options): | |
237 # type: (str, Iterable[str]) -> str | |
238 """Ask the message interactively, with the given possible responses""" | |
239 while 1: | |
240 _check_no_input(message) | |
241 response = input(message) | |
242 response = response.strip().lower() | |
243 if response not in options: | |
244 print( | |
245 'Your response (%r) was not one of the expected responses: ' | |
246 '%s' % (response, ', '.join(options)) | |
247 ) | |
248 else: | |
249 return response | |
250 | |
251 | |
252 def ask_input(message): | |
253 # type: (str) -> str | |
254 """Ask for input interactively.""" | |
255 _check_no_input(message) | |
256 return input(message) | |
257 | |
258 | |
259 def ask_password(message): | |
260 # type: (str) -> str | |
261 """Ask for a password interactively.""" | |
262 _check_no_input(message) | |
263 return getpass.getpass(message) | |
264 | |
265 | |
266 def format_size(bytes): | |
267 # type: (float) -> str | |
268 if bytes > 1000 * 1000: | |
269 return '%.1f MB' % (bytes / 1000.0 / 1000) | |
270 elif bytes > 10 * 1000: | |
271 return '%i kB' % (bytes / 1000) | |
272 elif bytes > 1000: | |
273 return '%.1f kB' % (bytes / 1000.0) | |
274 else: | |
275 return '%i bytes' % bytes | |
276 | |
277 | |
278 def is_installable_dir(path): | |
279 # type: (str) -> bool | |
280 """Is path is a directory containing setup.py or pyproject.toml? | |
281 """ | |
282 if not os.path.isdir(path): | |
283 return False | |
284 setup_py = os.path.join(path, 'setup.py') | |
285 if os.path.isfile(setup_py): | |
286 return True | |
287 pyproject_toml = os.path.join(path, 'pyproject.toml') | |
288 if os.path.isfile(pyproject_toml): | |
289 return True | |
290 return False | |
291 | |
292 | |
293 def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE): | |
294 """Yield pieces of data from a file-like object until EOF.""" | |
295 while True: | |
296 chunk = file.read(size) | |
297 if not chunk: | |
298 break | |
299 yield chunk | |
300 | |
301 | |
302 def normalize_path(path, resolve_symlinks=True): | |
303 # type: (str, bool) -> str | |
304 """ | |
305 Convert a path to its canonical, case-normalized, absolute version. | |
306 | |
307 """ | |
308 path = expanduser(path) | |
309 if resolve_symlinks: | |
310 path = os.path.realpath(path) | |
311 else: | |
312 path = os.path.abspath(path) | |
313 return os.path.normcase(path) | |
314 | |
315 | |
316 def splitext(path): | |
317 # type: (str) -> Tuple[str, str] | |
318 """Like os.path.splitext, but take off .tar too""" | |
319 base, ext = posixpath.splitext(path) | |
320 if base.lower().endswith('.tar'): | |
321 ext = base[-4:] + ext | |
322 base = base[:-4] | |
323 return base, ext | |
324 | |
325 | |
326 def renames(old, new): | |
327 # type: (str, str) -> None | |
328 """Like os.renames(), but handles renaming across devices.""" | |
329 # Implementation borrowed from os.renames(). | |
330 head, tail = os.path.split(new) | |
331 if head and tail and not os.path.exists(head): | |
332 os.makedirs(head) | |
333 | |
334 shutil.move(old, new) | |
335 | |
336 head, tail = os.path.split(old) | |
337 if head and tail: | |
338 try: | |
339 os.removedirs(head) | |
340 except OSError: | |
341 pass | |
342 | |
343 | |
344 def is_local(path): | |
345 # type: (str) -> bool | |
346 """ | |
347 Return True if path is within sys.prefix, if we're running in a virtualenv. | |
348 | |
349 If we're not in a virtualenv, all paths are considered "local." | |
350 | |
351 Caution: this function assumes the head of path has been normalized | |
352 with normalize_path. | |
353 """ | |
354 if not running_under_virtualenv(): | |
355 return True | |
356 return path.startswith(normalize_path(sys.prefix)) | |
357 | |
358 | |
359 def dist_is_local(dist): | |
360 # type: (Distribution) -> bool | |
361 """ | |
362 Return True if given Distribution object is installed locally | |
363 (i.e. within current virtualenv). | |
364 | |
365 Always True if we're not in a virtualenv. | |
366 | |
367 """ | |
368 return is_local(dist_location(dist)) | |
369 | |
370 | |
371 def dist_in_usersite(dist): | |
372 # type: (Distribution) -> bool | |
373 """ | |
374 Return True if given Distribution is installed in user site. | |
375 """ | |
376 return dist_location(dist).startswith(normalize_path(user_site)) | |
377 | |
378 | |
379 def dist_in_site_packages(dist): | |
380 # type: (Distribution) -> bool | |
381 """ | |
382 Return True if given Distribution is installed in | |
383 sysconfig.get_python_lib(). | |
384 """ | |
385 return dist_location(dist).startswith(normalize_path(site_packages)) | |
386 | |
387 | |
388 def dist_is_editable(dist): | |
389 # type: (Distribution) -> bool | |
390 """ | |
391 Return True if given Distribution is an editable install. | |
392 """ | |
393 for path_item in sys.path: | |
394 egg_link = os.path.join(path_item, dist.project_name + '.egg-link') | |
395 if os.path.isfile(egg_link): | |
396 return True | |
397 return False | |
398 | |
399 | |
400 def get_installed_distributions( | |
401 local_only=True, # type: bool | |
402 skip=stdlib_pkgs, # type: Container[str] | |
403 include_editables=True, # type: bool | |
404 editables_only=False, # type: bool | |
405 user_only=False, # type: bool | |
406 paths=None # type: Optional[List[str]] | |
407 ): | |
408 # type: (...) -> List[Distribution] | |
409 """ | |
410 Return a list of installed Distribution objects. | |
411 | |
412 If ``local_only`` is True (default), only return installations | |
413 local to the current virtualenv, if in a virtualenv. | |
414 | |
415 ``skip`` argument is an iterable of lower-case project names to | |
416 ignore; defaults to stdlib_pkgs | |
417 | |
418 If ``include_editables`` is False, don't report editables. | |
419 | |
420 If ``editables_only`` is True , only report editables. | |
421 | |
422 If ``user_only`` is True , only report installations in the user | |
423 site directory. | |
424 | |
425 If ``paths`` is set, only report the distributions present at the | |
426 specified list of locations. | |
427 """ | |
428 if paths: | |
429 working_set = pkg_resources.WorkingSet(paths) | |
430 else: | |
431 working_set = pkg_resources.working_set | |
432 | |
433 if local_only: | |
434 local_test = dist_is_local | |
435 else: | |
436 def local_test(d): | |
437 return True | |
438 | |
439 if include_editables: | |
440 def editable_test(d): | |
441 return True | |
442 else: | |
443 def editable_test(d): | |
444 return not dist_is_editable(d) | |
445 | |
446 if editables_only: | |
447 def editables_only_test(d): | |
448 return dist_is_editable(d) | |
449 else: | |
450 def editables_only_test(d): | |
451 return True | |
452 | |
453 if user_only: | |
454 user_test = dist_in_usersite | |
455 else: | |
456 def user_test(d): | |
457 return True | |
458 | |
459 return [d for d in working_set | |
460 if local_test(d) and | |
461 d.key not in skip and | |
462 editable_test(d) and | |
463 editables_only_test(d) and | |
464 user_test(d) | |
465 ] | |
466 | |
467 | |
468 def egg_link_path(dist): | |
469 # type: (Distribution) -> Optional[str] | |
470 """ | |
471 Return the path for the .egg-link file if it exists, otherwise, None. | |
472 | |
473 There's 3 scenarios: | |
474 1) not in a virtualenv | |
475 try to find in site.USER_SITE, then site_packages | |
476 2) in a no-global virtualenv | |
477 try to find in site_packages | |
478 3) in a yes-global virtualenv | |
479 try to find in site_packages, then site.USER_SITE | |
480 (don't look in global location) | |
481 | |
482 For #1 and #3, there could be odd cases, where there's an egg-link in 2 | |
483 locations. | |
484 | |
485 This method will just return the first one found. | |
486 """ | |
487 sites = [] | |
488 if running_under_virtualenv(): | |
489 sites.append(site_packages) | |
490 if not virtualenv_no_global() and user_site: | |
491 sites.append(user_site) | |
492 else: | |
493 if user_site: | |
494 sites.append(user_site) | |
495 sites.append(site_packages) | |
496 | |
497 for site in sites: | |
498 egglink = os.path.join(site, dist.project_name) + '.egg-link' | |
499 if os.path.isfile(egglink): | |
500 return egglink | |
501 return None | |
502 | |
503 | |
504 def dist_location(dist): | |
505 # type: (Distribution) -> str | |
506 """ | |
507 Get the site-packages location of this distribution. Generally | |
508 this is dist.location, except in the case of develop-installed | |
509 packages, where dist.location is the source code location, and we | |
510 want to know where the egg-link file is. | |
511 | |
512 The returned location is normalized (in particular, with symlinks removed). | |
513 """ | |
514 egg_link = egg_link_path(dist) | |
515 if egg_link: | |
516 return normalize_path(egg_link) | |
517 return normalize_path(dist.location) | |
518 | |
519 | |
520 def write_output(msg, *args): | |
521 # type: (str, str) -> None | |
522 logger.info(msg, *args) | |
523 | |
524 | |
525 class FakeFile(object): | |
526 """Wrap a list of lines in an object with readline() to make | |
527 ConfigParser happy.""" | |
528 def __init__(self, lines): | |
529 self._gen = (l for l in lines) | |
530 | |
531 def readline(self): | |
532 try: | |
533 try: | |
534 return next(self._gen) | |
535 except NameError: | |
536 return self._gen.next() | |
537 except StopIteration: | |
538 return '' | |
539 | |
540 def __iter__(self): | |
541 return self._gen | |
542 | |
543 | |
544 class StreamWrapper(StringIO): | |
545 | |
546 @classmethod | |
547 def from_stream(cls, orig_stream): | |
548 cls.orig_stream = orig_stream | |
549 return cls() | |
550 | |
551 # compileall.compile_dir() needs stdout.encoding to print to stdout | |
552 @property | |
553 def encoding(self): | |
554 return self.orig_stream.encoding | |
555 | |
556 | |
557 @contextlib.contextmanager | |
558 def captured_output(stream_name): | |
559 """Return a context manager used by captured_stdout/stdin/stderr | |
560 that temporarily replaces the sys stream *stream_name* with a StringIO. | |
561 | |
562 Taken from Lib/support/__init__.py in the CPython repo. | |
563 """ | |
564 orig_stdout = getattr(sys, stream_name) | |
565 setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout)) | |
566 try: | |
567 yield getattr(sys, stream_name) | |
568 finally: | |
569 setattr(sys, stream_name, orig_stdout) | |
570 | |
571 | |
572 def captured_stdout(): | |
573 """Capture the output of sys.stdout: | |
574 | |
575 with captured_stdout() as stdout: | |
576 print('hello') | |
577 self.assertEqual(stdout.getvalue(), 'hello\n') | |
578 | |
579 Taken from Lib/support/__init__.py in the CPython repo. | |
580 """ | |
581 return captured_output('stdout') | |
582 | |
583 | |
584 def captured_stderr(): | |
585 """ | |
586 See captured_stdout(). | |
587 """ | |
588 return captured_output('stderr') | |
589 | |
590 | |
591 class cached_property(object): | |
592 """A property that is only computed once per instance and then replaces | |
593 itself with an ordinary attribute. Deleting the attribute resets the | |
594 property. | |
595 | |
596 Source: https://github.com/bottlepy/bottle/blob/0.11.5/bottle.py#L175 | |
597 """ | |
598 | |
599 def __init__(self, func): | |
600 self.__doc__ = getattr(func, '__doc__') | |
601 self.func = func | |
602 | |
603 def __get__(self, obj, cls): | |
604 if obj is None: | |
605 # We're being accessed from the class itself, not from an object | |
606 return self | |
607 value = obj.__dict__[self.func.__name__] = self.func(obj) | |
608 return value | |
609 | |
610 | |
611 def get_installed_version(dist_name, working_set=None): | |
612 """Get the installed version of dist_name avoiding pkg_resources cache""" | |
613 # Create a requirement that we'll look for inside of setuptools. | |
614 req = pkg_resources.Requirement.parse(dist_name) | |
615 | |
616 if working_set is None: | |
617 # We want to avoid having this cached, so we need to construct a new | |
618 # working set each time. | |
619 working_set = pkg_resources.WorkingSet() | |
620 | |
621 # Get the installed distribution from our working set | |
622 dist = working_set.find(req) | |
623 | |
624 # Check to see if we got an installed distribution or not, if we did | |
625 # we want to return it's version. | |
626 return dist.version if dist else None | |
627 | |
628 | |
629 def consume(iterator): | |
630 """Consume an iterable at C speed.""" | |
631 deque(iterator, maxlen=0) | |
632 | |
633 | |
634 # Simulates an enum | |
635 def enum(*sequential, **named): | |
636 enums = dict(zip(sequential, range(len(sequential))), **named) | |
637 reverse = {value: key for key, value in enums.items()} | |
638 enums['reverse_mapping'] = reverse | |
639 return type('Enum', (), enums) | |
640 | |
641 | |
642 def build_netloc(host, port): | |
643 # type: (str, Optional[int]) -> str | |
644 """ | |
645 Build a netloc from a host-port pair | |
646 """ | |
647 if port is None: | |
648 return host | |
649 if ':' in host: | |
650 # Only wrap host with square brackets when it is IPv6 | |
651 host = '[{}]'.format(host) | |
652 return '{}:{}'.format(host, port) | |
653 | |
654 | |
655 def build_url_from_netloc(netloc, scheme='https'): | |
656 # type: (str, str) -> str | |
657 """ | |
658 Build a full URL from a netloc. | |
659 """ | |
660 if netloc.count(':') >= 2 and '@' not in netloc and '[' not in netloc: | |
661 # It must be a bare IPv6 address, so wrap it with brackets. | |
662 netloc = '[{}]'.format(netloc) | |
663 return '{}://{}'.format(scheme, netloc) | |
664 | |
665 | |
666 def parse_netloc(netloc): | |
667 # type: (str) -> Tuple[str, Optional[int]] | |
668 """ | |
669 Return the host-port pair from a netloc. | |
670 """ | |
671 url = build_url_from_netloc(netloc) | |
672 parsed = urllib_parse.urlparse(url) | |
673 return parsed.hostname, parsed.port | |
674 | |
675 | |
676 def split_auth_from_netloc(netloc): | |
677 """ | |
678 Parse out and remove the auth information from a netloc. | |
679 | |
680 Returns: (netloc, (username, password)). | |
681 """ | |
682 if '@' not in netloc: | |
683 return netloc, (None, None) | |
684 | |
685 # Split from the right because that's how urllib.parse.urlsplit() | |
686 # behaves if more than one @ is present (which can be checked using | |
687 # the password attribute of urlsplit()'s return value). | |
688 auth, netloc = netloc.rsplit('@', 1) | |
689 if ':' in auth: | |
690 # Split from the left because that's how urllib.parse.urlsplit() | |
691 # behaves if more than one : is present (which again can be checked | |
692 # using the password attribute of the return value) | |
693 user_pass = auth.split(':', 1) | |
694 else: | |
695 user_pass = auth, None | |
696 | |
697 user_pass = tuple( | |
698 None if x is None else urllib_unquote(x) for x in user_pass | |
699 ) | |
700 | |
701 return netloc, user_pass | |
702 | |
703 | |
704 def redact_netloc(netloc): | |
705 # type: (str) -> str | |
706 """ | |
707 Replace the sensitive data in a netloc with "****", if it exists. | |
708 | |
709 For example: | |
710 - "user:pass@example.com" returns "user:****@example.com" | |
711 - "accesstoken@example.com" returns "****@example.com" | |
712 """ | |
713 netloc, (user, password) = split_auth_from_netloc(netloc) | |
714 if user is None: | |
715 return netloc | |
716 if password is None: | |
717 user = '****' | |
718 password = '' | |
719 else: | |
720 user = urllib_parse.quote(user) | |
721 password = ':****' | |
722 return '{user}{password}@{netloc}'.format(user=user, | |
723 password=password, | |
724 netloc=netloc) | |
725 | |
726 | |
727 def _transform_url(url, transform_netloc): | |
728 """Transform and replace netloc in a url. | |
729 | |
730 transform_netloc is a function taking the netloc and returning a | |
731 tuple. The first element of this tuple is the new netloc. The | |
732 entire tuple is returned. | |
733 | |
734 Returns a tuple containing the transformed url as item 0 and the | |
735 original tuple returned by transform_netloc as item 1. | |
736 """ | |
737 purl = urllib_parse.urlsplit(url) | |
738 netloc_tuple = transform_netloc(purl.netloc) | |
739 # stripped url | |
740 url_pieces = ( | |
741 purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment | |
742 ) | |
743 surl = urllib_parse.urlunsplit(url_pieces) | |
744 return surl, netloc_tuple | |
745 | |
746 | |
747 def _get_netloc(netloc): | |
748 return split_auth_from_netloc(netloc) | |
749 | |
750 | |
751 def _redact_netloc(netloc): | |
752 return (redact_netloc(netloc),) | |
753 | |
754 | |
755 def split_auth_netloc_from_url(url): | |
756 # type: (str) -> Tuple[str, str, Tuple[str, str]] | |
757 """ | |
758 Parse a url into separate netloc, auth, and url with no auth. | |
759 | |
760 Returns: (url_without_auth, netloc, (username, password)) | |
761 """ | |
762 url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc) | |
763 return url_without_auth, netloc, auth | |
764 | |
765 | |
766 def remove_auth_from_url(url): | |
767 # type: (str) -> str | |
768 """Return a copy of url with 'username:password@' removed.""" | |
769 # username/pass params are passed to subversion through flags | |
770 # and are not recognized in the url. | |
771 return _transform_url(url, _get_netloc)[0] | |
772 | |
773 | |
774 def redact_auth_from_url(url): | |
775 # type: (str) -> str | |
776 """Replace the password in a given url with ****.""" | |
777 return _transform_url(url, _redact_netloc)[0] | |
778 | |
779 | |
780 class HiddenText(object): | |
781 def __init__( | |
782 self, | |
783 secret, # type: str | |
784 redacted, # type: str | |
785 ): | |
786 # type: (...) -> None | |
787 self.secret = secret | |
788 self.redacted = redacted | |
789 | |
790 def __repr__(self): | |
791 # type: (...) -> str | |
792 return '<HiddenText {!r}>'.format(str(self)) | |
793 | |
794 def __str__(self): | |
795 # type: (...) -> str | |
796 return self.redacted | |
797 | |
798 # This is useful for testing. | |
799 def __eq__(self, other): | |
800 # type: (Any) -> bool | |
801 if type(self) != type(other): | |
802 return False | |
803 | |
804 # The string being used for redaction doesn't also have to match, | |
805 # just the raw, original string. | |
806 return (self.secret == other.secret) | |
807 | |
808 # We need to provide an explicit __ne__ implementation for Python 2. | |
809 # TODO: remove this when we drop PY2 support. | |
810 def __ne__(self, other): | |
811 # type: (Any) -> bool | |
812 return not self == other | |
813 | |
814 | |
815 def hide_value(value): | |
816 # type: (str) -> HiddenText | |
817 return HiddenText(value, redacted='****') | |
818 | |
819 | |
820 def hide_url(url): | |
821 # type: (str) -> HiddenText | |
822 redacted = redact_auth_from_url(url) | |
823 return HiddenText(url, redacted=redacted) | |
824 | |
825 | |
826 def protect_pip_from_modification_on_windows(modifying_pip): | |
827 # type: (bool) -> None | |
828 """Protection of pip.exe from modification on Windows | |
829 | |
830 On Windows, any operation modifying pip should be run as: | |
831 python -m pip ... | |
832 """ | |
833 pip_names = [ | |
834 "pip.exe", | |
835 "pip{}.exe".format(sys.version_info[0]), | |
836 "pip{}.{}.exe".format(*sys.version_info[:2]) | |
837 ] | |
838 | |
839 # See https://github.com/pypa/pip/issues/1299 for more discussion | |
840 should_show_use_python_msg = ( | |
841 modifying_pip and | |
842 WINDOWS and | |
843 os.path.basename(sys.argv[0]) in pip_names | |
844 ) | |
845 | |
846 if should_show_use_python_msg: | |
847 new_command = [ | |
848 sys.executable, "-m", "pip" | |
849 ] + sys.argv[1:] | |
850 raise CommandError( | |
851 'To modify pip, please run the following command:\n{}' | |
852 .format(" ".join(new_command)) | |
853 ) | |
854 | |
855 | |
856 def is_console_interactive(): | |
857 # type: () -> bool | |
858 """Is this console interactive? | |
859 """ | |
860 return sys.stdin is not None and sys.stdin.isatty() | |
861 | |
862 | |
863 def hash_file(path, blocksize=1 << 20): | |
864 # type: (str, int) -> Tuple[Any, int] | |
865 """Return (hash, length) for path using hashlib.sha256() | |
866 """ | |
867 | |
868 h = hashlib.sha256() | |
869 length = 0 | |
870 with open(path, 'rb') as f: | |
871 for block in read_chunks(f, size=blocksize): | |
872 length += len(block) | |
873 h.update(block) | |
874 return h, length | |
875 | |
876 | |
877 def is_wheel_installed(): | |
878 """ | |
879 Return whether the wheel package is installed. | |
880 """ | |
881 try: | |
882 import wheel # noqa: F401 | |
883 except ImportError: | |
884 return False | |
885 | |
886 return True |