comparison lib/python3.8/site-packages/pip/_internal/network/session.py @ 0:9e54283cc701 draft

"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author guerler
date Mon, 27 Jul 2020 03:47:31 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:9e54283cc701
1 """PipSession and supporting code, containing all pip-specific
2 network request configuration and behavior.
3 """
4
5 # The following comment should be removed at some point in the future.
6 # mypy: disallow-untyped-defs=False
7
8 import email.utils
9 import json
10 import logging
11 import mimetypes
12 import os
13 import platform
14 import sys
15 import warnings
16
17 from pip._vendor import requests, six, urllib3
18 from pip._vendor.cachecontrol import CacheControlAdapter
19 from pip._vendor.requests.adapters import BaseAdapter, HTTPAdapter
20 from pip._vendor.requests.models import Response
21 from pip._vendor.requests.structures import CaseInsensitiveDict
22 from pip._vendor.six.moves.urllib import parse as urllib_parse
23 from pip._vendor.urllib3.exceptions import InsecureRequestWarning
24
25 from pip import __version__
26 from pip._internal.network.auth import MultiDomainBasicAuth
27 from pip._internal.network.cache import SafeFileCache
28 # Import ssl from compat so the initial import occurs in only one place.
29 from pip._internal.utils.compat import has_tls, ipaddress
30 from pip._internal.utils.glibc import libc_ver
31 from pip._internal.utils.misc import (
32 build_url_from_netloc,
33 get_installed_version,
34 parse_netloc,
35 )
36 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
37 from pip._internal.utils.urls import url_to_path
38
39 if MYPY_CHECK_RUNNING:
40 from typing import (
41 Iterator, List, Optional, Tuple, Union,
42 )
43
44 from pip._internal.models.link import Link
45
46 SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]
47
48
49 logger = logging.getLogger(__name__)
50
51
52 # Ignore warning raised when using --trusted-host.
53 warnings.filterwarnings("ignore", category=InsecureRequestWarning)
54
55
56 SECURE_ORIGINS = [
57 # protocol, hostname, port
58 # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
59 ("https", "*", "*"),
60 ("*", "localhost", "*"),
61 ("*", "127.0.0.0/8", "*"),
62 ("*", "::1/128", "*"),
63 ("file", "*", None),
64 # ssh is always secure.
65 ("ssh", "*", "*"),
66 ] # type: List[SecureOrigin]
67
68
69 # These are environment variables present when running under various
70 # CI systems. For each variable, some CI systems that use the variable
71 # are indicated. The collection was chosen so that for each of a number
72 # of popular systems, at least one of the environment variables is used.
73 # This list is used to provide some indication of and lower bound for
74 # CI traffic to PyPI. Thus, it is okay if the list is not comprehensive.
75 # For more background, see: https://github.com/pypa/pip/issues/5499
76 CI_ENVIRONMENT_VARIABLES = (
77 # Azure Pipelines
78 'BUILD_BUILDID',
79 # Jenkins
80 'BUILD_ID',
81 # AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI
82 'CI',
83 # Explicit environment variable.
84 'PIP_IS_CI',
85 )
86
87
88 def looks_like_ci():
89 # type: () -> bool
90 """
91 Return whether it looks like pip is running under CI.
92 """
93 # We don't use the method of checking for a tty (e.g. using isatty())
94 # because some CI systems mimic a tty (e.g. Travis CI). Thus that
95 # method doesn't provide definitive information in either direction.
96 return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES)
97
98
99 def user_agent():
100 """
101 Return a string representing the user agent.
102 """
103 data = {
104 "installer": {"name": "pip", "version": __version__},
105 "python": platform.python_version(),
106 "implementation": {
107 "name": platform.python_implementation(),
108 },
109 }
110
111 if data["implementation"]["name"] == 'CPython':
112 data["implementation"]["version"] = platform.python_version()
113 elif data["implementation"]["name"] == 'PyPy':
114 if sys.pypy_version_info.releaselevel == 'final':
115 pypy_version_info = sys.pypy_version_info[:3]
116 else:
117 pypy_version_info = sys.pypy_version_info
118 data["implementation"]["version"] = ".".join(
119 [str(x) for x in pypy_version_info]
120 )
121 elif data["implementation"]["name"] == 'Jython':
122 # Complete Guess
123 data["implementation"]["version"] = platform.python_version()
124 elif data["implementation"]["name"] == 'IronPython':
125 # Complete Guess
126 data["implementation"]["version"] = platform.python_version()
127
128 if sys.platform.startswith("linux"):
129 from pip._vendor import distro
130 distro_infos = dict(filter(
131 lambda x: x[1],
132 zip(["name", "version", "id"], distro.linux_distribution()),
133 ))
134 libc = dict(filter(
135 lambda x: x[1],
136 zip(["lib", "version"], libc_ver()),
137 ))
138 if libc:
139 distro_infos["libc"] = libc
140 if distro_infos:
141 data["distro"] = distro_infos
142
143 if sys.platform.startswith("darwin") and platform.mac_ver()[0]:
144 data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]}
145
146 if platform.system():
147 data.setdefault("system", {})["name"] = platform.system()
148
149 if platform.release():
150 data.setdefault("system", {})["release"] = platform.release()
151
152 if platform.machine():
153 data["cpu"] = platform.machine()
154
155 if has_tls():
156 import _ssl as ssl
157 data["openssl_version"] = ssl.OPENSSL_VERSION
158
159 setuptools_version = get_installed_version("setuptools")
160 if setuptools_version is not None:
161 data["setuptools_version"] = setuptools_version
162
163 # Use None rather than False so as not to give the impression that
164 # pip knows it is not being run under CI. Rather, it is a null or
165 # inconclusive result. Also, we include some value rather than no
166 # value to make it easier to know that the check has been run.
167 data["ci"] = True if looks_like_ci() else None
168
169 user_data = os.environ.get("PIP_USER_AGENT_USER_DATA")
170 if user_data is not None:
171 data["user_data"] = user_data
172
173 return "{data[installer][name]}/{data[installer][version]} {json}".format(
174 data=data,
175 json=json.dumps(data, separators=(",", ":"), sort_keys=True),
176 )
177
178
179 class LocalFSAdapter(BaseAdapter):
180
181 def send(self, request, stream=None, timeout=None, verify=None, cert=None,
182 proxies=None):
183 pathname = url_to_path(request.url)
184
185 resp = Response()
186 resp.status_code = 200
187 resp.url = request.url
188
189 try:
190 stats = os.stat(pathname)
191 except OSError as exc:
192 resp.status_code = 404
193 resp.raw = exc
194 else:
195 modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
196 content_type = mimetypes.guess_type(pathname)[0] or "text/plain"
197 resp.headers = CaseInsensitiveDict({
198 "Content-Type": content_type,
199 "Content-Length": stats.st_size,
200 "Last-Modified": modified,
201 })
202
203 resp.raw = open(pathname, "rb")
204 resp.close = resp.raw.close
205
206 return resp
207
208 def close(self):
209 pass
210
211
212 class InsecureHTTPAdapter(HTTPAdapter):
213
214 def cert_verify(self, conn, url, verify, cert):
215 super(InsecureHTTPAdapter, self).cert_verify(
216 conn=conn, url=url, verify=False, cert=cert
217 )
218
219
220 class PipSession(requests.Session):
221
222 timeout = None # type: Optional[int]
223
224 def __init__(self, *args, **kwargs):
225 """
226 :param trusted_hosts: Domains not to emit warnings for when not using
227 HTTPS.
228 """
229 retries = kwargs.pop("retries", 0)
230 cache = kwargs.pop("cache", None)
231 trusted_hosts = kwargs.pop("trusted_hosts", []) # type: List[str]
232 index_urls = kwargs.pop("index_urls", None)
233
234 super(PipSession, self).__init__(*args, **kwargs)
235
236 # Namespace the attribute with "pip_" just in case to prevent
237 # possible conflicts with the base class.
238 self.pip_trusted_origins = [] # type: List[Tuple[str, Optional[int]]]
239
240 # Attach our User Agent to the request
241 self.headers["User-Agent"] = user_agent()
242
243 # Attach our Authentication handler to the session
244 self.auth = MultiDomainBasicAuth(index_urls=index_urls)
245
246 # Create our urllib3.Retry instance which will allow us to customize
247 # how we handle retries.
248 retries = urllib3.Retry(
249 # Set the total number of retries that a particular request can
250 # have.
251 total=retries,
252
253 # A 503 error from PyPI typically means that the Fastly -> Origin
254 # connection got interrupted in some way. A 503 error in general
255 # is typically considered a transient error so we'll go ahead and
256 # retry it.
257 # A 500 may indicate transient error in Amazon S3
258 # A 520 or 527 - may indicate transient error in CloudFlare
259 status_forcelist=[500, 503, 520, 527],
260
261 # Add a small amount of back off between failed requests in
262 # order to prevent hammering the service.
263 backoff_factor=0.25,
264 )
265
266 # We want to _only_ cache responses on securely fetched origins. We do
267 # this because we can't validate the response of an insecurely fetched
268 # origin, and we don't want someone to be able to poison the cache and
269 # require manual eviction from the cache to fix it.
270 if cache:
271 secure_adapter = CacheControlAdapter(
272 cache=SafeFileCache(cache),
273 max_retries=retries,
274 )
275 else:
276 secure_adapter = HTTPAdapter(max_retries=retries)
277
278 # Our Insecure HTTPAdapter disables HTTPS validation. It does not
279 # support caching (see above) so we'll use it for all http:// URLs as
280 # well as any https:// host that we've marked as ignoring TLS errors
281 # for.
282 insecure_adapter = InsecureHTTPAdapter(max_retries=retries)
283 # Save this for later use in add_insecure_host().
284 self._insecure_adapter = insecure_adapter
285
286 self.mount("https://", secure_adapter)
287 self.mount("http://", insecure_adapter)
288
289 # Enable file:// urls
290 self.mount("file://", LocalFSAdapter())
291
292 for host in trusted_hosts:
293 self.add_trusted_host(host, suppress_logging=True)
294
295 def add_trusted_host(self, host, source=None, suppress_logging=False):
296 # type: (str, Optional[str], bool) -> None
297 """
298 :param host: It is okay to provide a host that has previously been
299 added.
300 :param source: An optional source string, for logging where the host
301 string came from.
302 """
303 if not suppress_logging:
304 msg = 'adding trusted host: {!r}'.format(host)
305 if source is not None:
306 msg += ' (from {})'.format(source)
307 logger.info(msg)
308
309 host_port = parse_netloc(host)
310 if host_port not in self.pip_trusted_origins:
311 self.pip_trusted_origins.append(host_port)
312
313 self.mount(build_url_from_netloc(host) + '/', self._insecure_adapter)
314 if not host_port[1]:
315 # Mount wildcard ports for the same host.
316 self.mount(
317 build_url_from_netloc(host) + ':',
318 self._insecure_adapter
319 )
320
321 def iter_secure_origins(self):
322 # type: () -> Iterator[SecureOrigin]
323 for secure_origin in SECURE_ORIGINS:
324 yield secure_origin
325 for host, port in self.pip_trusted_origins:
326 yield ('*', host, '*' if port is None else port)
327
328 def is_secure_origin(self, location):
329 # type: (Link) -> bool
330 # Determine if this url used a secure transport mechanism
331 parsed = urllib_parse.urlparse(str(location))
332 origin_protocol, origin_host, origin_port = (
333 parsed.scheme, parsed.hostname, parsed.port,
334 )
335
336 # The protocol to use to see if the protocol matches.
337 # Don't count the repository type as part of the protocol: in
338 # cases such as "git+ssh", only use "ssh". (I.e., Only verify against
339 # the last scheme.)
340 origin_protocol = origin_protocol.rsplit('+', 1)[-1]
341
342 # Determine if our origin is a secure origin by looking through our
343 # hardcoded list of secure origins, as well as any additional ones
344 # configured on this PackageFinder instance.
345 for secure_origin in self.iter_secure_origins():
346 secure_protocol, secure_host, secure_port = secure_origin
347 if origin_protocol != secure_protocol and secure_protocol != "*":
348 continue
349
350 try:
351 addr = ipaddress.ip_address(
352 None
353 if origin_host is None
354 else six.ensure_text(origin_host)
355 )
356 network = ipaddress.ip_network(
357 six.ensure_text(secure_host)
358 )
359 except ValueError:
360 # We don't have both a valid address or a valid network, so
361 # we'll check this origin against hostnames.
362 if (
363 origin_host and
364 origin_host.lower() != secure_host.lower() and
365 secure_host != "*"
366 ):
367 continue
368 else:
369 # We have a valid address and network, so see if the address
370 # is contained within the network.
371 if addr not in network:
372 continue
373
374 # Check to see if the port matches.
375 if (
376 origin_port != secure_port and
377 secure_port != "*" and
378 secure_port is not None
379 ):
380 continue
381
382 # If we've gotten here, then this origin matches the current
383 # secure origin and we should return True
384 return True
385
386 # If we've gotten to this point, then the origin isn't secure and we
387 # will not accept it as a valid location to search. We will however
388 # log a warning that we are ignoring it.
389 logger.warning(
390 "The repository located at %s is not a trusted or secure host and "
391 "is being ignored. If this repository is available via HTTPS we "
392 "recommend you use HTTPS instead, otherwise you may silence "
393 "this warning and allow it anyway with '--trusted-host %s'.",
394 origin_host,
395 origin_host,
396 )
397
398 return False
399
400 def request(self, method, url, *args, **kwargs):
401 # Allow setting a default timeout on a session
402 kwargs.setdefault("timeout", self.timeout)
403
404 # Dispatch the actual request
405 return super(PipSession, self).request(method, url, *args, **kwargs)