Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/urllib3/contrib/pyopenssl.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """ | |
2 TLS with SNI_-support for Python 2. Follow these instructions if you would | |
3 like to verify TLS certificates in Python 2. Note, the default libraries do | |
4 *not* do certificate checking; you need to do additional work to validate | |
5 certificates yourself. | |
6 | |
7 This needs the following packages installed: | |
8 | |
9 * `pyOpenSSL`_ (tested with 16.0.0) | |
10 * `cryptography`_ (minimum 1.3.4, from pyopenssl) | |
11 * `idna`_ (minimum 2.0, from cryptography) | |
12 | |
13 However, pyopenssl depends on cryptography, which depends on idna, so while we | |
14 use all three directly here we end up having relatively few packages required. | |
15 | |
16 You can install them with the following command: | |
17 | |
18 .. code-block:: bash | |
19 | |
20 $ python -m pip install pyopenssl cryptography idna | |
21 | |
22 To activate certificate checking, call | |
23 :func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code | |
24 before you begin making HTTP requests. This can be done in a ``sitecustomize`` | |
25 module, or at any other time before your application begins using ``urllib3``, | |
26 like this: | |
27 | |
28 .. code-block:: python | |
29 | |
30 try: | |
31 import urllib3.contrib.pyopenssl | |
32 urllib3.contrib.pyopenssl.inject_into_urllib3() | |
33 except ImportError: | |
34 pass | |
35 | |
36 Now you can use :mod:`urllib3` as you normally would, and it will support SNI | |
37 when the required modules are installed. | |
38 | |
39 Activating this module also has the positive side effect of disabling SSL/TLS | |
40 compression in Python 2 (see `CRIME attack`_). | |
41 | |
42 .. _sni: https://en.wikipedia.org/wiki/Server_Name_Indication | |
43 .. _crime attack: https://en.wikipedia.org/wiki/CRIME_(security_exploit) | |
44 .. _pyopenssl: https://www.pyopenssl.org | |
45 .. _cryptography: https://cryptography.io | |
46 .. _idna: https://github.com/kjd/idna | |
47 """ | |
48 from __future__ import absolute_import | |
49 | |
50 import OpenSSL.SSL | |
51 from cryptography import x509 | |
52 from cryptography.hazmat.backends.openssl import backend as openssl_backend | |
53 from cryptography.hazmat.backends.openssl.x509 import _Certificate | |
54 | |
55 try: | |
56 from cryptography.x509 import UnsupportedExtension | |
57 except ImportError: | |
58 # UnsupportedExtension is gone in cryptography >= 2.1.0 | |
59 class UnsupportedExtension(Exception): | |
60 pass | |
61 | |
62 | |
63 from io import BytesIO | |
64 from socket import error as SocketError | |
65 from socket import timeout | |
66 | |
67 try: # Platform-specific: Python 2 | |
68 from socket import _fileobject | |
69 except ImportError: # Platform-specific: Python 3 | |
70 _fileobject = None | |
71 from ..packages.backports.makefile import backport_makefile | |
72 | |
73 import logging | |
74 import ssl | |
75 import sys | |
76 | |
77 from .. import util | |
78 from ..packages import six | |
79 | |
80 __all__ = ["inject_into_urllib3", "extract_from_urllib3"] | |
81 | |
82 # SNI always works. | |
83 HAS_SNI = True | |
84 | |
85 # Map from urllib3 to PyOpenSSL compatible parameter-values. | |
86 _openssl_versions = { | |
87 util.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, | |
88 ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD, | |
89 } | |
90 | |
91 if hasattr(ssl, "PROTOCOL_SSLv3") and hasattr(OpenSSL.SSL, "SSLv3_METHOD"): | |
92 _openssl_versions[ssl.PROTOCOL_SSLv3] = OpenSSL.SSL.SSLv3_METHOD | |
93 | |
94 if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"): | |
95 _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD | |
96 | |
97 if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"): | |
98 _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD | |
99 | |
100 | |
101 _stdlib_to_openssl_verify = { | |
102 ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE, | |
103 ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER, | |
104 ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER | |
105 + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, | |
106 } | |
107 _openssl_to_stdlib_verify = dict((v, k) for k, v in _stdlib_to_openssl_verify.items()) | |
108 | |
109 # OpenSSL will only write 16K at a time | |
110 SSL_WRITE_BLOCKSIZE = 16384 | |
111 | |
112 orig_util_HAS_SNI = util.HAS_SNI | |
113 orig_util_SSLContext = util.ssl_.SSLContext | |
114 | |
115 | |
116 log = logging.getLogger(__name__) | |
117 | |
118 | |
119 def inject_into_urllib3(): | |
120 "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support." | |
121 | |
122 _validate_dependencies_met() | |
123 | |
124 util.SSLContext = PyOpenSSLContext | |
125 util.ssl_.SSLContext = PyOpenSSLContext | |
126 util.HAS_SNI = HAS_SNI | |
127 util.ssl_.HAS_SNI = HAS_SNI | |
128 util.IS_PYOPENSSL = True | |
129 util.ssl_.IS_PYOPENSSL = True | |
130 | |
131 | |
132 def extract_from_urllib3(): | |
133 "Undo monkey-patching by :func:`inject_into_urllib3`." | |
134 | |
135 util.SSLContext = orig_util_SSLContext | |
136 util.ssl_.SSLContext = orig_util_SSLContext | |
137 util.HAS_SNI = orig_util_HAS_SNI | |
138 util.ssl_.HAS_SNI = orig_util_HAS_SNI | |
139 util.IS_PYOPENSSL = False | |
140 util.ssl_.IS_PYOPENSSL = False | |
141 | |
142 | |
143 def _validate_dependencies_met(): | |
144 """ | |
145 Verifies that PyOpenSSL's package-level dependencies have been met. | |
146 Throws `ImportError` if they are not met. | |
147 """ | |
148 # Method added in `cryptography==1.1`; not available in older versions | |
149 from cryptography.x509.extensions import Extensions | |
150 | |
151 if getattr(Extensions, "get_extension_for_class", None) is None: | |
152 raise ImportError( | |
153 "'cryptography' module missing required functionality. " | |
154 "Try upgrading to v1.3.4 or newer." | |
155 ) | |
156 | |
157 # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509 | |
158 # attribute is only present on those versions. | |
159 from OpenSSL.crypto import X509 | |
160 | |
161 x509 = X509() | |
162 if getattr(x509, "_x509", None) is None: | |
163 raise ImportError( | |
164 "'pyOpenSSL' module missing required functionality. " | |
165 "Try upgrading to v0.14 or newer." | |
166 ) | |
167 | |
168 | |
169 def _dnsname_to_stdlib(name): | |
170 """ | |
171 Converts a dNSName SubjectAlternativeName field to the form used by the | |
172 standard library on the given Python version. | |
173 | |
174 Cryptography produces a dNSName as a unicode string that was idna-decoded | |
175 from ASCII bytes. We need to idna-encode that string to get it back, and | |
176 then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib | |
177 uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8). | |
178 | |
179 If the name cannot be idna-encoded then we return None signalling that | |
180 the name given should be skipped. | |
181 """ | |
182 | |
183 def idna_encode(name): | |
184 """ | |
185 Borrowed wholesale from the Python Cryptography Project. It turns out | |
186 that we can't just safely call `idna.encode`: it can explode for | |
187 wildcard names. This avoids that problem. | |
188 """ | |
189 import idna | |
190 | |
191 try: | |
192 for prefix in [u"*.", u"."]: | |
193 if name.startswith(prefix): | |
194 name = name[len(prefix) :] | |
195 return prefix.encode("ascii") + idna.encode(name) | |
196 return idna.encode(name) | |
197 except idna.core.IDNAError: | |
198 return None | |
199 | |
200 # Don't send IPv6 addresses through the IDNA encoder. | |
201 if ":" in name: | |
202 return name | |
203 | |
204 name = idna_encode(name) | |
205 if name is None: | |
206 return None | |
207 elif sys.version_info >= (3, 0): | |
208 name = name.decode("utf-8") | |
209 return name | |
210 | |
211 | |
212 def get_subj_alt_name(peer_cert): | |
213 """ | |
214 Given an PyOpenSSL certificate, provides all the subject alternative names. | |
215 """ | |
216 # Pass the cert to cryptography, which has much better APIs for this. | |
217 if hasattr(peer_cert, "to_cryptography"): | |
218 cert = peer_cert.to_cryptography() | |
219 else: | |
220 # This is technically using private APIs, but should work across all | |
221 # relevant versions before PyOpenSSL got a proper API for this. | |
222 cert = _Certificate(openssl_backend, peer_cert._x509) | |
223 | |
224 # We want to find the SAN extension. Ask Cryptography to locate it (it's | |
225 # faster than looping in Python) | |
226 try: | |
227 ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value | |
228 except x509.ExtensionNotFound: | |
229 # No such extension, return the empty list. | |
230 return [] | |
231 except ( | |
232 x509.DuplicateExtension, | |
233 UnsupportedExtension, | |
234 x509.UnsupportedGeneralNameType, | |
235 UnicodeError, | |
236 ) as e: | |
237 # A problem has been found with the quality of the certificate. Assume | |
238 # no SAN field is present. | |
239 log.warning( | |
240 "A problem was encountered with the certificate that prevented " | |
241 "urllib3 from finding the SubjectAlternativeName field. This can " | |
242 "affect certificate validation. The error was %s", | |
243 e, | |
244 ) | |
245 return [] | |
246 | |
247 # We want to return dNSName and iPAddress fields. We need to cast the IPs | |
248 # back to strings because the match_hostname function wants them as | |
249 # strings. | |
250 # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8 | |
251 # decoded. This is pretty frustrating, but that's what the standard library | |
252 # does with certificates, and so we need to attempt to do the same. | |
253 # We also want to skip over names which cannot be idna encoded. | |
254 names = [ | |
255 ("DNS", name) | |
256 for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName)) | |
257 if name is not None | |
258 ] | |
259 names.extend( | |
260 ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress) | |
261 ) | |
262 | |
263 return names | |
264 | |
265 | |
266 class WrappedSocket(object): | |
267 """API-compatibility wrapper for Python OpenSSL's Connection-class. | |
268 | |
269 Note: _makefile_refs, _drop() and _reuse() are needed for the garbage | |
270 collector of pypy. | |
271 """ | |
272 | |
273 def __init__(self, connection, socket, suppress_ragged_eofs=True): | |
274 self.connection = connection | |
275 self.socket = socket | |
276 self.suppress_ragged_eofs = suppress_ragged_eofs | |
277 self._makefile_refs = 0 | |
278 self._closed = False | |
279 | |
280 def fileno(self): | |
281 return self.socket.fileno() | |
282 | |
283 # Copy-pasted from Python 3.5 source code | |
284 def _decref_socketios(self): | |
285 if self._makefile_refs > 0: | |
286 self._makefile_refs -= 1 | |
287 if self._closed: | |
288 self.close() | |
289 | |
290 def recv(self, *args, **kwargs): | |
291 try: | |
292 data = self.connection.recv(*args, **kwargs) | |
293 except OpenSSL.SSL.SysCallError as e: | |
294 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): | |
295 return b"" | |
296 else: | |
297 raise SocketError(str(e)) | |
298 except OpenSSL.SSL.ZeroReturnError: | |
299 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: | |
300 return b"" | |
301 else: | |
302 raise | |
303 except OpenSSL.SSL.WantReadError: | |
304 if not util.wait_for_read(self.socket, self.socket.gettimeout()): | |
305 raise timeout("The read operation timed out") | |
306 else: | |
307 return self.recv(*args, **kwargs) | |
308 | |
309 # TLS 1.3 post-handshake authentication | |
310 except OpenSSL.SSL.Error as e: | |
311 raise ssl.SSLError("read error: %r" % e) | |
312 else: | |
313 return data | |
314 | |
315 def recv_into(self, *args, **kwargs): | |
316 try: | |
317 return self.connection.recv_into(*args, **kwargs) | |
318 except OpenSSL.SSL.SysCallError as e: | |
319 if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): | |
320 return 0 | |
321 else: | |
322 raise SocketError(str(e)) | |
323 except OpenSSL.SSL.ZeroReturnError: | |
324 if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: | |
325 return 0 | |
326 else: | |
327 raise | |
328 except OpenSSL.SSL.WantReadError: | |
329 if not util.wait_for_read(self.socket, self.socket.gettimeout()): | |
330 raise timeout("The read operation timed out") | |
331 else: | |
332 return self.recv_into(*args, **kwargs) | |
333 | |
334 # TLS 1.3 post-handshake authentication | |
335 except OpenSSL.SSL.Error as e: | |
336 raise ssl.SSLError("read error: %r" % e) | |
337 | |
338 def settimeout(self, timeout): | |
339 return self.socket.settimeout(timeout) | |
340 | |
341 def _send_until_done(self, data): | |
342 while True: | |
343 try: | |
344 return self.connection.send(data) | |
345 except OpenSSL.SSL.WantWriteError: | |
346 if not util.wait_for_write(self.socket, self.socket.gettimeout()): | |
347 raise timeout() | |
348 continue | |
349 except OpenSSL.SSL.SysCallError as e: | |
350 raise SocketError(str(e)) | |
351 | |
352 def sendall(self, data): | |
353 total_sent = 0 | |
354 while total_sent < len(data): | |
355 sent = self._send_until_done( | |
356 data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE] | |
357 ) | |
358 total_sent += sent | |
359 | |
360 def shutdown(self): | |
361 # FIXME rethrow compatible exceptions should we ever use this | |
362 self.connection.shutdown() | |
363 | |
364 def close(self): | |
365 if self._makefile_refs < 1: | |
366 try: | |
367 self._closed = True | |
368 return self.connection.close() | |
369 except OpenSSL.SSL.Error: | |
370 return | |
371 else: | |
372 self._makefile_refs -= 1 | |
373 | |
374 def getpeercert(self, binary_form=False): | |
375 x509 = self.connection.get_peer_certificate() | |
376 | |
377 if not x509: | |
378 return x509 | |
379 | |
380 if binary_form: | |
381 return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) | |
382 | |
383 return { | |
384 "subject": ((("commonName", x509.get_subject().CN),),), | |
385 "subjectAltName": get_subj_alt_name(x509), | |
386 } | |
387 | |
388 def version(self): | |
389 return self.connection.get_protocol_version_name() | |
390 | |
391 def _reuse(self): | |
392 self._makefile_refs += 1 | |
393 | |
394 def _drop(self): | |
395 if self._makefile_refs < 1: | |
396 self.close() | |
397 else: | |
398 self._makefile_refs -= 1 | |
399 | |
400 | |
401 if _fileobject: # Platform-specific: Python 2 | |
402 | |
403 def makefile(self, mode, bufsize=-1): | |
404 self._makefile_refs += 1 | |
405 return _fileobject(self, mode, bufsize, close=True) | |
406 | |
407 | |
408 else: # Platform-specific: Python 3 | |
409 makefile = backport_makefile | |
410 | |
411 WrappedSocket.makefile = makefile | |
412 | |
413 | |
414 class PyOpenSSLContext(object): | |
415 """ | |
416 I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible | |
417 for translating the interface of the standard library ``SSLContext`` object | |
418 to calls into PyOpenSSL. | |
419 """ | |
420 | |
421 def __init__(self, protocol): | |
422 self.protocol = _openssl_versions[protocol] | |
423 self._ctx = OpenSSL.SSL.Context(self.protocol) | |
424 self._options = 0 | |
425 self.check_hostname = False | |
426 | |
427 @property | |
428 def options(self): | |
429 return self._options | |
430 | |
431 @options.setter | |
432 def options(self, value): | |
433 self._options = value | |
434 self._ctx.set_options(value) | |
435 | |
436 @property | |
437 def verify_mode(self): | |
438 return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()] | |
439 | |
440 @verify_mode.setter | |
441 def verify_mode(self, value): | |
442 self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback) | |
443 | |
444 def set_default_verify_paths(self): | |
445 self._ctx.set_default_verify_paths() | |
446 | |
447 def set_ciphers(self, ciphers): | |
448 if isinstance(ciphers, six.text_type): | |
449 ciphers = ciphers.encode("utf-8") | |
450 self._ctx.set_cipher_list(ciphers) | |
451 | |
452 def load_verify_locations(self, cafile=None, capath=None, cadata=None): | |
453 if cafile is not None: | |
454 cafile = cafile.encode("utf-8") | |
455 if capath is not None: | |
456 capath = capath.encode("utf-8") | |
457 try: | |
458 self._ctx.load_verify_locations(cafile, capath) | |
459 if cadata is not None: | |
460 self._ctx.load_verify_locations(BytesIO(cadata)) | |
461 except OpenSSL.SSL.Error as e: | |
462 raise ssl.SSLError("unable to load trusted certificates: %r" % e) | |
463 | |
464 def load_cert_chain(self, certfile, keyfile=None, password=None): | |
465 self._ctx.use_certificate_chain_file(certfile) | |
466 if password is not None: | |
467 if not isinstance(password, six.binary_type): | |
468 password = password.encode("utf-8") | |
469 self._ctx.set_passwd_cb(lambda *_: password) | |
470 self._ctx.use_privatekey_file(keyfile or certfile) | |
471 | |
472 def set_alpn_protocols(self, protocols): | |
473 protocols = [six.ensure_binary(p) for p in protocols] | |
474 return self._ctx.set_alpn_protos(protocols) | |
475 | |
476 def wrap_socket( | |
477 self, | |
478 sock, | |
479 server_side=False, | |
480 do_handshake_on_connect=True, | |
481 suppress_ragged_eofs=True, | |
482 server_hostname=None, | |
483 ): | |
484 cnx = OpenSSL.SSL.Connection(self._ctx, sock) | |
485 | |
486 if isinstance(server_hostname, six.text_type): # Platform-specific: Python 3 | |
487 server_hostname = server_hostname.encode("utf-8") | |
488 | |
489 if server_hostname is not None: | |
490 cnx.set_tlsext_host_name(server_hostname) | |
491 | |
492 cnx.set_connect_state() | |
493 | |
494 while True: | |
495 try: | |
496 cnx.do_handshake() | |
497 except OpenSSL.SSL.WantReadError: | |
498 if not util.wait_for_read(sock, sock.gettimeout()): | |
499 raise timeout("select timed out") | |
500 continue | |
501 except OpenSSL.SSL.Error as e: | |
502 raise ssl.SSLError("bad handshake: %r" % e) | |
503 break | |
504 | |
505 return WrappedSocket(cnx, sock) | |
506 | |
507 | |
508 def _verify_callback(cnx, x509, err_no, err_depth, return_code): | |
509 return err_no == 0 |