diff env/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,501 +0,0 @@
-"""
-SSL with SNI_-support for Python 2. Follow these instructions if you would
-like to verify SSL certificates in Python 2. Note, the default libraries do
-*not* do certificate checking; you need to do additional work to validate
-certificates yourself.
-
-This needs the following packages installed:
-
-* pyOpenSSL (tested with 16.0.0)
-* cryptography (minimum 1.3.4, from pyopenssl)
-* idna (minimum 2.0, from cryptography)
-
-However, pyopenssl depends on cryptography, which depends on idna, so while we
-use all three directly here we end up having relatively few packages required.
-
-You can install them with the following command:
-
-    pip install pyopenssl cryptography idna
-
-To activate certificate checking, call
-:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code
-before you begin making HTTP requests. This can be done in a ``sitecustomize``
-module, or at any other time before your application begins using ``urllib3``,
-like this::
-
-    try:
-        import urllib3.contrib.pyopenssl
-        urllib3.contrib.pyopenssl.inject_into_urllib3()
-    except ImportError:
-        pass
-
-Now you can use :mod:`urllib3` as you normally would, and it will support SNI
-when the required modules are installed.
-
-Activating this module also has the positive side effect of disabling SSL/TLS
-compression in Python 2 (see `CRIME attack`_).
-
-If you want to configure the default list of supported cipher suites, you can
-set the ``urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST`` variable.
-
-.. _sni: https://en.wikipedia.org/wiki/Server_Name_Indication
-.. _crime attack: https://en.wikipedia.org/wiki/CRIME_(security_exploit)
-"""
-from __future__ import absolute_import
-
-import OpenSSL.SSL
-from cryptography import x509
-from cryptography.hazmat.backends.openssl import backend as openssl_backend
-from cryptography.hazmat.backends.openssl.x509 import _Certificate
-
-try:
-    from cryptography.x509 import UnsupportedExtension
-except ImportError:
-    # UnsupportedExtension is gone in cryptography >= 2.1.0
-    class UnsupportedExtension(Exception):
-        pass
-
-
-from socket import timeout, error as SocketError
-from io import BytesIO
-
-try:  # Platform-specific: Python 2
-    from socket import _fileobject
-except ImportError:  # Platform-specific: Python 3
-    _fileobject = None
-    from ..packages.backports.makefile import backport_makefile
-
-import logging
-import ssl
-from ..packages import six
-import sys
-
-from .. import util
-
-
-__all__ = ["inject_into_urllib3", "extract_from_urllib3"]
-
-# SNI always works.
-HAS_SNI = True
-
-# Map from urllib3 to PyOpenSSL compatible parameter-values.
-_openssl_versions = {
-    util.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD,
-    ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD,
-}
-
-if hasattr(ssl, "PROTOCOL_SSLv3") and hasattr(OpenSSL.SSL, "SSLv3_METHOD"):
-    _openssl_versions[ssl.PROTOCOL_SSLv3] = OpenSSL.SSL.SSLv3_METHOD
-
-if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"):
-    _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD
-
-if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"):
-    _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD
-
-
-_stdlib_to_openssl_verify = {
-    ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE,
-    ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER,
-    ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER
-    + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
-}
-_openssl_to_stdlib_verify = dict((v, k) for k, v in _stdlib_to_openssl_verify.items())
-
-# OpenSSL will only write 16K at a time
-SSL_WRITE_BLOCKSIZE = 16384
-
-orig_util_HAS_SNI = util.HAS_SNI
-orig_util_SSLContext = util.ssl_.SSLContext
-
-
-log = logging.getLogger(__name__)
-
-
-def inject_into_urllib3():
-    "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support."
-
-    _validate_dependencies_met()
-
-    util.SSLContext = PyOpenSSLContext
-    util.ssl_.SSLContext = PyOpenSSLContext
-    util.HAS_SNI = HAS_SNI
-    util.ssl_.HAS_SNI = HAS_SNI
-    util.IS_PYOPENSSL = True
-    util.ssl_.IS_PYOPENSSL = True
-
-
-def extract_from_urllib3():
-    "Undo monkey-patching by :func:`inject_into_urllib3`."
-
-    util.SSLContext = orig_util_SSLContext
-    util.ssl_.SSLContext = orig_util_SSLContext
-    util.HAS_SNI = orig_util_HAS_SNI
-    util.ssl_.HAS_SNI = orig_util_HAS_SNI
-    util.IS_PYOPENSSL = False
-    util.ssl_.IS_PYOPENSSL = False
-
-
-def _validate_dependencies_met():
-    """
-    Verifies that PyOpenSSL's package-level dependencies have been met.
-    Throws `ImportError` if they are not met.
-    """
-    # Method added in `cryptography==1.1`; not available in older versions
-    from cryptography.x509.extensions import Extensions
-
-    if getattr(Extensions, "get_extension_for_class", None) is None:
-        raise ImportError(
-            "'cryptography' module missing required functionality.  "
-            "Try upgrading to v1.3.4 or newer."
-        )
-
-    # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509
-    # attribute is only present on those versions.
-    from OpenSSL.crypto import X509
-
-    x509 = X509()
-    if getattr(x509, "_x509", None) is None:
-        raise ImportError(
-            "'pyOpenSSL' module missing required functionality. "
-            "Try upgrading to v0.14 or newer."
-        )
-
-
-def _dnsname_to_stdlib(name):
-    """
-    Converts a dNSName SubjectAlternativeName field to the form used by the
-    standard library on the given Python version.
-
-    Cryptography produces a dNSName as a unicode string that was idna-decoded
-    from ASCII bytes. We need to idna-encode that string to get it back, and
-    then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
-    uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
-
-    If the name cannot be idna-encoded then we return None signalling that
-    the name given should be skipped.
-    """
-
-    def idna_encode(name):
-        """
-        Borrowed wholesale from the Python Cryptography Project. It turns out
-        that we can't just safely call `idna.encode`: it can explode for
-        wildcard names. This avoids that problem.
-        """
-        import idna
-
-        try:
-            for prefix in [u"*.", u"."]:
-                if name.startswith(prefix):
-                    name = name[len(prefix) :]
-                    return prefix.encode("ascii") + idna.encode(name)
-            return idna.encode(name)
-        except idna.core.IDNAError:
-            return None
-
-    # Don't send IPv6 addresses through the IDNA encoder.
-    if ":" in name:
-        return name
-
-    name = idna_encode(name)
-    if name is None:
-        return None
-    elif sys.version_info >= (3, 0):
-        name = name.decode("utf-8")
-    return name
-
-
-def get_subj_alt_name(peer_cert):
-    """
-    Given an PyOpenSSL certificate, provides all the subject alternative names.
-    """
-    # Pass the cert to cryptography, which has much better APIs for this.
-    if hasattr(peer_cert, "to_cryptography"):
-        cert = peer_cert.to_cryptography()
-    else:
-        # This is technically using private APIs, but should work across all
-        # relevant versions before PyOpenSSL got a proper API for this.
-        cert = _Certificate(openssl_backend, peer_cert._x509)
-
-    # We want to find the SAN extension. Ask Cryptography to locate it (it's
-    # faster than looping in Python)
-    try:
-        ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
-    except x509.ExtensionNotFound:
-        # No such extension, return the empty list.
-        return []
-    except (
-        x509.DuplicateExtension,
-        UnsupportedExtension,
-        x509.UnsupportedGeneralNameType,
-        UnicodeError,
-    ) as e:
-        # A problem has been found with the quality of the certificate. Assume
-        # no SAN field is present.
-        log.warning(
-            "A problem was encountered with the certificate that prevented "
-            "urllib3 from finding the SubjectAlternativeName field. This can "
-            "affect certificate validation. The error was %s",
-            e,
-        )
-        return []
-
-    # We want to return dNSName and iPAddress fields. We need to cast the IPs
-    # back to strings because the match_hostname function wants them as
-    # strings.
-    # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8
-    # decoded. This is pretty frustrating, but that's what the standard library
-    # does with certificates, and so we need to attempt to do the same.
-    # We also want to skip over names which cannot be idna encoded.
-    names = [
-        ("DNS", name)
-        for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName))
-        if name is not None
-    ]
-    names.extend(
-        ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress)
-    )
-
-    return names
-
-
-class WrappedSocket(object):
-    """API-compatibility wrapper for Python OpenSSL's Connection-class.
-
-    Note: _makefile_refs, _drop() and _reuse() are needed for the garbage
-    collector of pypy.
-    """
-
-    def __init__(self, connection, socket, suppress_ragged_eofs=True):
-        self.connection = connection
-        self.socket = socket
-        self.suppress_ragged_eofs = suppress_ragged_eofs
-        self._makefile_refs = 0
-        self._closed = False
-
-    def fileno(self):
-        return self.socket.fileno()
-
-    # Copy-pasted from Python 3.5 source code
-    def _decref_socketios(self):
-        if self._makefile_refs > 0:
-            self._makefile_refs -= 1
-        if self._closed:
-            self.close()
-
-    def recv(self, *args, **kwargs):
-        try:
-            data = self.connection.recv(*args, **kwargs)
-        except OpenSSL.SSL.SysCallError as e:
-            if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
-                return b""
-            else:
-                raise SocketError(str(e))
-        except OpenSSL.SSL.ZeroReturnError:
-            if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
-                return b""
-            else:
-                raise
-        except OpenSSL.SSL.WantReadError:
-            if not util.wait_for_read(self.socket, self.socket.gettimeout()):
-                raise timeout("The read operation timed out")
-            else:
-                return self.recv(*args, **kwargs)
-
-        # TLS 1.3 post-handshake authentication
-        except OpenSSL.SSL.Error as e:
-            raise ssl.SSLError("read error: %r" % e)
-        else:
-            return data
-
-    def recv_into(self, *args, **kwargs):
-        try:
-            return self.connection.recv_into(*args, **kwargs)
-        except OpenSSL.SSL.SysCallError as e:
-            if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
-                return 0
-            else:
-                raise SocketError(str(e))
-        except OpenSSL.SSL.ZeroReturnError:
-            if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
-                return 0
-            else:
-                raise
-        except OpenSSL.SSL.WantReadError:
-            if not util.wait_for_read(self.socket, self.socket.gettimeout()):
-                raise timeout("The read operation timed out")
-            else:
-                return self.recv_into(*args, **kwargs)
-
-        # TLS 1.3 post-handshake authentication
-        except OpenSSL.SSL.Error as e:
-            raise ssl.SSLError("read error: %r" % e)
-
-    def settimeout(self, timeout):
-        return self.socket.settimeout(timeout)
-
-    def _send_until_done(self, data):
-        while True:
-            try:
-                return self.connection.send(data)
-            except OpenSSL.SSL.WantWriteError:
-                if not util.wait_for_write(self.socket, self.socket.gettimeout()):
-                    raise timeout()
-                continue
-            except OpenSSL.SSL.SysCallError as e:
-                raise SocketError(str(e))
-
-    def sendall(self, data):
-        total_sent = 0
-        while total_sent < len(data):
-            sent = self._send_until_done(
-                data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE]
-            )
-            total_sent += sent
-
-    def shutdown(self):
-        # FIXME rethrow compatible exceptions should we ever use this
-        self.connection.shutdown()
-
-    def close(self):
-        if self._makefile_refs < 1:
-            try:
-                self._closed = True
-                return self.connection.close()
-            except OpenSSL.SSL.Error:
-                return
-        else:
-            self._makefile_refs -= 1
-
-    def getpeercert(self, binary_form=False):
-        x509 = self.connection.get_peer_certificate()
-
-        if not x509:
-            return x509
-
-        if binary_form:
-            return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509)
-
-        return {
-            "subject": ((("commonName", x509.get_subject().CN),),),
-            "subjectAltName": get_subj_alt_name(x509),
-        }
-
-    def version(self):
-        return self.connection.get_protocol_version_name()
-
-    def _reuse(self):
-        self._makefile_refs += 1
-
-    def _drop(self):
-        if self._makefile_refs < 1:
-            self.close()
-        else:
-            self._makefile_refs -= 1
-
-
-if _fileobject:  # Platform-specific: Python 2
-
-    def makefile(self, mode, bufsize=-1):
-        self._makefile_refs += 1
-        return _fileobject(self, mode, bufsize, close=True)
-
-
-else:  # Platform-specific: Python 3
-    makefile = backport_makefile
-
-WrappedSocket.makefile = makefile
-
-
-class PyOpenSSLContext(object):
-    """
-    I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible
-    for translating the interface of the standard library ``SSLContext`` object
-    to calls into PyOpenSSL.
-    """
-
-    def __init__(self, protocol):
-        self.protocol = _openssl_versions[protocol]
-        self._ctx = OpenSSL.SSL.Context(self.protocol)
-        self._options = 0
-        self.check_hostname = False
-
-    @property
-    def options(self):
-        return self._options
-
-    @options.setter
-    def options(self, value):
-        self._options = value
-        self._ctx.set_options(value)
-
-    @property
-    def verify_mode(self):
-        return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()]
-
-    @verify_mode.setter
-    def verify_mode(self, value):
-        self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback)
-
-    def set_default_verify_paths(self):
-        self._ctx.set_default_verify_paths()
-
-    def set_ciphers(self, ciphers):
-        if isinstance(ciphers, six.text_type):
-            ciphers = ciphers.encode("utf-8")
-        self._ctx.set_cipher_list(ciphers)
-
-    def load_verify_locations(self, cafile=None, capath=None, cadata=None):
-        if cafile is not None:
-            cafile = cafile.encode("utf-8")
-        if capath is not None:
-            capath = capath.encode("utf-8")
-        try:
-            self._ctx.load_verify_locations(cafile, capath)
-            if cadata is not None:
-                self._ctx.load_verify_locations(BytesIO(cadata))
-        except OpenSSL.SSL.Error as e:
-            raise ssl.SSLError("unable to load trusted certificates: %r" % e)
-
-    def load_cert_chain(self, certfile, keyfile=None, password=None):
-        self._ctx.use_certificate_chain_file(certfile)
-        if password is not None:
-            if not isinstance(password, six.binary_type):
-                password = password.encode("utf-8")
-            self._ctx.set_passwd_cb(lambda *_: password)
-        self._ctx.use_privatekey_file(keyfile or certfile)
-
-    def wrap_socket(
-        self,
-        sock,
-        server_side=False,
-        do_handshake_on_connect=True,
-        suppress_ragged_eofs=True,
-        server_hostname=None,
-    ):
-        cnx = OpenSSL.SSL.Connection(self._ctx, sock)
-
-        if isinstance(server_hostname, six.text_type):  # Platform-specific: Python 3
-            server_hostname = server_hostname.encode("utf-8")
-
-        if server_hostname is not None:
-            cnx.set_tlsext_host_name(server_hostname)
-
-        cnx.set_connect_state()
-
-        while True:
-            try:
-                cnx.do_handshake()
-            except OpenSSL.SSL.WantReadError:
-                if not util.wait_for_read(sock, sock.gettimeout()):
-                    raise timeout("select timed out")
-                continue
-            except OpenSSL.SSL.Error as e:
-                raise ssl.SSLError("bad handshake: %r" % e)
-            break
-
-        return WrappedSocket(cnx, sock)
-
-
-def _verify_callback(cnx, x509, err_no, err_depth, return_code):
-    return err_no == 0