Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/pip/_internal/utils/compat.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 """Stuff that differs in different Python versions and platform | |
2 distributions.""" | |
3 from __future__ import absolute_import, division | |
4 | |
5 import codecs | |
6 import locale | |
7 import logging | |
8 import os | |
9 import shutil | |
10 import sys | |
11 | |
12 from pip._vendor.six import text_type | |
13 from pip._vendor.urllib3.util import IS_PYOPENSSL | |
14 | |
15 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
16 | |
17 if MYPY_CHECK_RUNNING: | |
18 from typing import Optional, Text, Tuple, Union | |
19 | |
20 try: | |
21 import _ssl # noqa | |
22 except ImportError: | |
23 ssl = None | |
24 else: | |
25 # This additional assignment was needed to prevent a mypy error. | |
26 ssl = _ssl | |
27 | |
28 try: | |
29 import ipaddress | |
30 except ImportError: | |
31 try: | |
32 from pip._vendor import ipaddress # type: ignore | |
33 except ImportError: | |
34 import ipaddr as ipaddress # type: ignore | |
35 ipaddress.ip_address = ipaddress.IPAddress # type: ignore | |
36 ipaddress.ip_network = ipaddress.IPNetwork # type: ignore | |
37 | |
38 | |
39 __all__ = [ | |
40 "ipaddress", "uses_pycache", "console_to_str", "native_str", | |
41 "get_path_uid", "stdlib_pkgs", "WINDOWS", "samefile", "get_terminal_size", | |
42 "get_extension_suffixes", | |
43 ] | |
44 | |
45 | |
46 logger = logging.getLogger(__name__) | |
47 | |
48 HAS_TLS = (ssl is not None) or IS_PYOPENSSL | |
49 | |
50 if sys.version_info >= (3, 4): | |
51 uses_pycache = True | |
52 from importlib.util import cache_from_source | |
53 else: | |
54 import imp | |
55 | |
56 try: | |
57 cache_from_source = imp.cache_from_source # type: ignore | |
58 except AttributeError: | |
59 # does not use __pycache__ | |
60 cache_from_source = None | |
61 | |
62 uses_pycache = cache_from_source is not None | |
63 | |
64 | |
65 if sys.version_info >= (3, 5): | |
66 backslashreplace_decode = "backslashreplace" | |
67 else: | |
68 # In version 3.4 and older, backslashreplace exists | |
69 # but does not support use for decoding. | |
70 # We implement our own replace handler for this | |
71 # situation, so that we can consistently use | |
72 # backslash replacement for all versions. | |
73 def backslashreplace_decode_fn(err): | |
74 raw_bytes = (err.object[i] for i in range(err.start, err.end)) | |
75 if sys.version_info[0] == 2: | |
76 # Python 2 gave us characters - convert to numeric bytes | |
77 raw_bytes = (ord(b) for b in raw_bytes) | |
78 return u"".join(u"\\x%x" % c for c in raw_bytes), err.end | |
79 codecs.register_error( | |
80 "backslashreplace_decode", | |
81 backslashreplace_decode_fn, | |
82 ) | |
83 backslashreplace_decode = "backslashreplace_decode" | |
84 | |
85 | |
86 def str_to_display(data, desc=None): | |
87 # type: (Union[bytes, Text], Optional[str]) -> Text | |
88 """ | |
89 For display or logging purposes, convert a bytes object (or text) to | |
90 text (e.g. unicode in Python 2) safe for output. | |
91 | |
92 :param desc: An optional phrase describing the input data, for use in | |
93 the log message if a warning is logged. Defaults to "Bytes object". | |
94 | |
95 This function should never error out and so can take a best effort | |
96 approach. It is okay to be lossy if needed since the return value is | |
97 just for display. | |
98 | |
99 We assume the data is in the locale preferred encoding. If it won't | |
100 decode properly, we warn the user but decode as best we can. | |
101 | |
102 We also ensure that the output can be safely written to standard output | |
103 without encoding errors. | |
104 """ | |
105 if isinstance(data, text_type): | |
106 return data | |
107 | |
108 # Otherwise, data is a bytes object (str in Python 2). | |
109 # First, get the encoding we assume. This is the preferred | |
110 # encoding for the locale, unless that is not found, or | |
111 # it is ASCII, in which case assume UTF-8 | |
112 encoding = locale.getpreferredencoding() | |
113 if (not encoding) or codecs.lookup(encoding).name == "ascii": | |
114 encoding = "utf-8" | |
115 | |
116 # Now try to decode the data - if we fail, warn the user and | |
117 # decode with replacement. | |
118 try: | |
119 decoded_data = data.decode(encoding) | |
120 except UnicodeDecodeError: | |
121 if desc is None: | |
122 desc = 'Bytes object' | |
123 msg_format = '{} does not appear to be encoded as %s'.format(desc) | |
124 logger.warning(msg_format, encoding) | |
125 decoded_data = data.decode(encoding, errors=backslashreplace_decode) | |
126 | |
127 # Make sure we can print the output, by encoding it to the output | |
128 # encoding with replacement of unencodable characters, and then | |
129 # decoding again. | |
130 # We use stderr's encoding because it's less likely to be | |
131 # redirected and if we don't find an encoding we skip this | |
132 # step (on the assumption that output is wrapped by something | |
133 # that won't fail). | |
134 # The double getattr is to deal with the possibility that we're | |
135 # being called in a situation where sys.__stderr__ doesn't exist, | |
136 # or doesn't have an encoding attribute. Neither of these cases | |
137 # should occur in normal pip use, but there's no harm in checking | |
138 # in case people use pip in (unsupported) unusual situations. | |
139 output_encoding = getattr(getattr(sys, "__stderr__", None), | |
140 "encoding", None) | |
141 | |
142 if output_encoding: | |
143 output_encoded = decoded_data.encode( | |
144 output_encoding, | |
145 errors="backslashreplace" | |
146 ) | |
147 decoded_data = output_encoded.decode(output_encoding) | |
148 | |
149 return decoded_data | |
150 | |
151 | |
152 def console_to_str(data): | |
153 # type: (bytes) -> Text | |
154 """Return a string, safe for output, of subprocess output. | |
155 """ | |
156 return str_to_display(data, desc='Subprocess output') | |
157 | |
158 | |
159 if sys.version_info >= (3,): | |
160 def native_str(s, replace=False): | |
161 # type: (str, bool) -> str | |
162 if isinstance(s, bytes): | |
163 return s.decode('utf-8', 'replace' if replace else 'strict') | |
164 return s | |
165 | |
166 else: | |
167 def native_str(s, replace=False): | |
168 # type: (str, bool) -> str | |
169 # Replace is ignored -- unicode to UTF-8 can't fail | |
170 if isinstance(s, text_type): | |
171 return s.encode('utf-8') | |
172 return s | |
173 | |
174 | |
175 def get_path_uid(path): | |
176 # type: (str) -> int | |
177 """ | |
178 Return path's uid. | |
179 | |
180 Does not follow symlinks: | |
181 https://github.com/pypa/pip/pull/935#discussion_r5307003 | |
182 | |
183 Placed this function in compat due to differences on AIX and | |
184 Jython, that should eventually go away. | |
185 | |
186 :raises OSError: When path is a symlink or can't be read. | |
187 """ | |
188 if hasattr(os, 'O_NOFOLLOW'): | |
189 fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW) | |
190 file_uid = os.fstat(fd).st_uid | |
191 os.close(fd) | |
192 else: # AIX and Jython | |
193 # WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW | |
194 if not os.path.islink(path): | |
195 # older versions of Jython don't have `os.fstat` | |
196 file_uid = os.stat(path).st_uid | |
197 else: | |
198 # raise OSError for parity with os.O_NOFOLLOW above | |
199 raise OSError( | |
200 "%s is a symlink; Will not return uid for symlinks" % path | |
201 ) | |
202 return file_uid | |
203 | |
204 | |
205 if sys.version_info >= (3, 4): | |
206 from importlib.machinery import EXTENSION_SUFFIXES | |
207 | |
208 def get_extension_suffixes(): | |
209 return EXTENSION_SUFFIXES | |
210 else: | |
211 from imp import get_suffixes | |
212 | |
213 def get_extension_suffixes(): | |
214 return [suffix[0] for suffix in get_suffixes()] | |
215 | |
216 | |
217 def expanduser(path): | |
218 # type: (str) -> str | |
219 """ | |
220 Expand ~ and ~user constructions. | |
221 | |
222 Includes a workaround for https://bugs.python.org/issue14768 | |
223 """ | |
224 expanded = os.path.expanduser(path) | |
225 if path.startswith('~/') and expanded.startswith('//'): | |
226 expanded = expanded[1:] | |
227 return expanded | |
228 | |
229 | |
230 # packages in the stdlib that may have installation metadata, but should not be | |
231 # considered 'installed'. this theoretically could be determined based on | |
232 # dist.location (py27:`sysconfig.get_paths()['stdlib']`, | |
233 # py26:sysconfig.get_config_vars('LIBDEST')), but fear platform variation may | |
234 # make this ineffective, so hard-coding | |
235 stdlib_pkgs = {"python", "wsgiref", "argparse"} | |
236 | |
237 | |
238 # windows detection, covers cpython and ironpython | |
239 WINDOWS = (sys.platform.startswith("win") or | |
240 (sys.platform == 'cli' and os.name == 'nt')) | |
241 | |
242 | |
243 def samefile(file1, file2): | |
244 # type: (str, str) -> bool | |
245 """Provide an alternative for os.path.samefile on Windows/Python2""" | |
246 if hasattr(os.path, 'samefile'): | |
247 return os.path.samefile(file1, file2) | |
248 else: | |
249 path1 = os.path.normcase(os.path.abspath(file1)) | |
250 path2 = os.path.normcase(os.path.abspath(file2)) | |
251 return path1 == path2 | |
252 | |
253 | |
254 if hasattr(shutil, 'get_terminal_size'): | |
255 def get_terminal_size(): | |
256 # type: () -> Tuple[int, int] | |
257 """ | |
258 Returns a tuple (x, y) representing the width(x) and the height(y) | |
259 in characters of the terminal window. | |
260 """ | |
261 return tuple(shutil.get_terminal_size()) # type: ignore | |
262 else: | |
263 def get_terminal_size(): | |
264 # type: () -> Tuple[int, int] | |
265 """ | |
266 Returns a tuple (x, y) representing the width(x) and the height(y) | |
267 in characters of the terminal window. | |
268 """ | |
269 def ioctl_GWINSZ(fd): | |
270 try: | |
271 import fcntl | |
272 import termios | |
273 import struct | |
274 cr = struct.unpack_from( | |
275 'hh', | |
276 fcntl.ioctl(fd, termios.TIOCGWINSZ, '12345678') | |
277 ) | |
278 except Exception: | |
279 return None | |
280 if cr == (0, 0): | |
281 return None | |
282 return cr | |
283 cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) | |
284 if not cr: | |
285 try: | |
286 fd = os.open(os.ctermid(), os.O_RDONLY) | |
287 cr = ioctl_GWINSZ(fd) | |
288 os.close(fd) | |
289 except Exception: | |
290 pass | |
291 if not cr: | |
292 cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80)) | |
293 return int(cr[1]), int(cr[0]) |