diff env/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,501 @@
+"""
+SSL with SNI_-support for Python 2. Follow these instructions if you would
+like to verify SSL certificates in Python 2. Note, the default libraries do
+*not* do certificate checking; you need to do additional work to validate
+certificates yourself.
+
+This needs the following packages installed:
+
+* pyOpenSSL (tested with 16.0.0)
+* cryptography (minimum 1.3.4, from pyopenssl)
+* idna (minimum 2.0, from cryptography)
+
+However, pyopenssl depends on cryptography, which depends on idna, so while we
+use all three directly here we end up having relatively few packages required.
+
+You can install them with the following command:
+
+    pip install pyopenssl cryptography idna
+
+To activate certificate checking, call
+:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code
+before you begin making HTTP requests. This can be done in a ``sitecustomize``
+module, or at any other time before your application begins using ``urllib3``,
+like this::
+
+    try:
+        import urllib3.contrib.pyopenssl
+        urllib3.contrib.pyopenssl.inject_into_urllib3()
+    except ImportError:
+        pass
+
+Now you can use :mod:`urllib3` as you normally would, and it will support SNI
+when the required modules are installed.
+
+Activating this module also has the positive side effect of disabling SSL/TLS
+compression in Python 2 (see `CRIME attack`_).
+
+If you want to configure the default list of supported cipher suites, you can
+set the ``urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST`` variable.
+
+.. _sni: https://en.wikipedia.org/wiki/Server_Name_Indication
+.. _crime attack: https://en.wikipedia.org/wiki/CRIME_(security_exploit)
+"""
+from __future__ import absolute_import
+
+import OpenSSL.SSL
+from cryptography import x509
+from cryptography.hazmat.backends.openssl import backend as openssl_backend
+from cryptography.hazmat.backends.openssl.x509 import _Certificate
+
+try:
+    from cryptography.x509 import UnsupportedExtension
+except ImportError:
+    # UnsupportedExtension is gone in cryptography >= 2.1.0
+    class UnsupportedExtension(Exception):
+        pass
+
+
+from socket import timeout, error as SocketError
+from io import BytesIO
+
+try:  # Platform-specific: Python 2
+    from socket import _fileobject
+except ImportError:  # Platform-specific: Python 3
+    _fileobject = None
+    from ..packages.backports.makefile import backport_makefile
+
+import logging
+import ssl
+from ..packages import six
+import sys
+
+from .. import util
+
+
+__all__ = ["inject_into_urllib3", "extract_from_urllib3"]
+
+# SNI always works.
+HAS_SNI = True
+
+# Map from urllib3 to PyOpenSSL compatible parameter-values.
+_openssl_versions = {
+    util.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD,
+    ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD,
+}
+
+if hasattr(ssl, "PROTOCOL_SSLv3") and hasattr(OpenSSL.SSL, "SSLv3_METHOD"):
+    _openssl_versions[ssl.PROTOCOL_SSLv3] = OpenSSL.SSL.SSLv3_METHOD
+
+if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"):
+    _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD
+
+if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"):
+    _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD
+
+
+_stdlib_to_openssl_verify = {
+    ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE,
+    ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER,
+    ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER
+    + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+}
+_openssl_to_stdlib_verify = dict((v, k) for k, v in _stdlib_to_openssl_verify.items())
+
+# OpenSSL will only write 16K at a time
+SSL_WRITE_BLOCKSIZE = 16384
+
+orig_util_HAS_SNI = util.HAS_SNI
+orig_util_SSLContext = util.ssl_.SSLContext
+
+
+log = logging.getLogger(__name__)
+
+
+def inject_into_urllib3():
+    "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support."
+
+    _validate_dependencies_met()
+
+    util.SSLContext = PyOpenSSLContext
+    util.ssl_.SSLContext = PyOpenSSLContext
+    util.HAS_SNI = HAS_SNI
+    util.ssl_.HAS_SNI = HAS_SNI
+    util.IS_PYOPENSSL = True
+    util.ssl_.IS_PYOPENSSL = True
+
+
+def extract_from_urllib3():
+    "Undo monkey-patching by :func:`inject_into_urllib3`."
+
+    util.SSLContext = orig_util_SSLContext
+    util.ssl_.SSLContext = orig_util_SSLContext
+    util.HAS_SNI = orig_util_HAS_SNI
+    util.ssl_.HAS_SNI = orig_util_HAS_SNI
+    util.IS_PYOPENSSL = False
+    util.ssl_.IS_PYOPENSSL = False
+
+
+def _validate_dependencies_met():
+    """
+    Verifies that PyOpenSSL's package-level dependencies have been met.
+    Throws `ImportError` if they are not met.
+    """
+    # Method added in `cryptography==1.1`; not available in older versions
+    from cryptography.x509.extensions import Extensions
+
+    if getattr(Extensions, "get_extension_for_class", None) is None:
+        raise ImportError(
+            "'cryptography' module missing required functionality.  "
+            "Try upgrading to v1.3.4 or newer."
+        )
+
+    # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509
+    # attribute is only present on those versions.
+    from OpenSSL.crypto import X509
+
+    x509 = X509()
+    if getattr(x509, "_x509", None) is None:
+        raise ImportError(
+            "'pyOpenSSL' module missing required functionality. "
+            "Try upgrading to v0.14 or newer."
+        )
+
+
+def _dnsname_to_stdlib(name):
+    """
+    Converts a dNSName SubjectAlternativeName field to the form used by the
+    standard library on the given Python version.
+
+    Cryptography produces a dNSName as a unicode string that was idna-decoded
+    from ASCII bytes. We need to idna-encode that string to get it back, and
+    then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
+    uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
+
+    If the name cannot be idna-encoded then we return None signalling that
+    the name given should be skipped.
+    """
+
+    def idna_encode(name):
+        """
+        Borrowed wholesale from the Python Cryptography Project. It turns out
+        that we can't just safely call `idna.encode`: it can explode for
+        wildcard names. This avoids that problem.
+        """
+        import idna
+
+        try:
+            for prefix in [u"*.", u"."]:
+                if name.startswith(prefix):
+                    name = name[len(prefix) :]
+                    return prefix.encode("ascii") + idna.encode(name)
+            return idna.encode(name)
+        except idna.core.IDNAError:
+            return None
+
+    # Don't send IPv6 addresses through the IDNA encoder.
+    if ":" in name:
+        return name
+
+    name = idna_encode(name)
+    if name is None:
+        return None
+    elif sys.version_info >= (3, 0):
+        name = name.decode("utf-8")
+    return name
+
+
+def get_subj_alt_name(peer_cert):
+    """
+    Given an PyOpenSSL certificate, provides all the subject alternative names.
+    """
+    # Pass the cert to cryptography, which has much better APIs for this.
+    if hasattr(peer_cert, "to_cryptography"):
+        cert = peer_cert.to_cryptography()
+    else:
+        # This is technically using private APIs, but should work across all
+        # relevant versions before PyOpenSSL got a proper API for this.
+        cert = _Certificate(openssl_backend, peer_cert._x509)
+
+    # We want to find the SAN extension. Ask Cryptography to locate it (it's
+    # faster than looping in Python)
+    try:
+        ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
+    except x509.ExtensionNotFound:
+        # No such extension, return the empty list.
+        return []
+    except (
+        x509.DuplicateExtension,
+        UnsupportedExtension,
+        x509.UnsupportedGeneralNameType,
+        UnicodeError,
+    ) as e:
+        # A problem has been found with the quality of the certificate. Assume
+        # no SAN field is present.
+        log.warning(
+            "A problem was encountered with the certificate that prevented "
+            "urllib3 from finding the SubjectAlternativeName field. This can "
+            "affect certificate validation. The error was %s",
+            e,
+        )
+        return []
+
+    # We want to return dNSName and iPAddress fields. We need to cast the IPs
+    # back to strings because the match_hostname function wants them as
+    # strings.
+    # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8
+    # decoded. This is pretty frustrating, but that's what the standard library
+    # does with certificates, and so we need to attempt to do the same.
+    # We also want to skip over names which cannot be idna encoded.
+    names = [
+        ("DNS", name)
+        for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName))
+        if name is not None
+    ]
+    names.extend(
+        ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress)
+    )
+
+    return names
+
+
+class WrappedSocket(object):
+    """API-compatibility wrapper for Python OpenSSL's Connection-class.
+
+    Note: _makefile_refs, _drop() and _reuse() are needed for the garbage
+    collector of pypy.
+    """
+
+    def __init__(self, connection, socket, suppress_ragged_eofs=True):
+        self.connection = connection
+        self.socket = socket
+        self.suppress_ragged_eofs = suppress_ragged_eofs
+        self._makefile_refs = 0
+        self._closed = False
+
+    def fileno(self):
+        return self.socket.fileno()
+
+    # Copy-pasted from Python 3.5 source code
+    def _decref_socketios(self):
+        if self._makefile_refs > 0:
+            self._makefile_refs -= 1
+        if self._closed:
+            self.close()
+
+    def recv(self, *args, **kwargs):
+        try:
+            data = self.connection.recv(*args, **kwargs)
+        except OpenSSL.SSL.SysCallError as e:
+            if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
+                return b""
+            else:
+                raise SocketError(str(e))
+        except OpenSSL.SSL.ZeroReturnError:
+            if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
+                return b""
+            else:
+                raise
+        except OpenSSL.SSL.WantReadError:
+            if not util.wait_for_read(self.socket, self.socket.gettimeout()):
+                raise timeout("The read operation timed out")
+            else:
+                return self.recv(*args, **kwargs)
+
+        # TLS 1.3 post-handshake authentication
+        except OpenSSL.SSL.Error as e:
+            raise ssl.SSLError("read error: %r" % e)
+        else:
+            return data
+
+    def recv_into(self, *args, **kwargs):
+        try:
+            return self.connection.recv_into(*args, **kwargs)
+        except OpenSSL.SSL.SysCallError as e:
+            if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
+                return 0
+            else:
+                raise SocketError(str(e))
+        except OpenSSL.SSL.ZeroReturnError:
+            if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
+                return 0
+            else:
+                raise
+        except OpenSSL.SSL.WantReadError:
+            if not util.wait_for_read(self.socket, self.socket.gettimeout()):
+                raise timeout("The read operation timed out")
+            else:
+                return self.recv_into(*args, **kwargs)
+
+        # TLS 1.3 post-handshake authentication
+        except OpenSSL.SSL.Error as e:
+            raise ssl.SSLError("read error: %r" % e)
+
+    def settimeout(self, timeout):
+        return self.socket.settimeout(timeout)
+
+    def _send_until_done(self, data):
+        while True:
+            try:
+                return self.connection.send(data)
+            except OpenSSL.SSL.WantWriteError:
+                if not util.wait_for_write(self.socket, self.socket.gettimeout()):
+                    raise timeout()
+                continue
+            except OpenSSL.SSL.SysCallError as e:
+                raise SocketError(str(e))
+
+    def sendall(self, data):
+        total_sent = 0
+        while total_sent < len(data):
+            sent = self._send_until_done(
+                data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE]
+            )
+            total_sent += sent
+
+    def shutdown(self):
+        # FIXME rethrow compatible exceptions should we ever use this
+        self.connection.shutdown()
+
+    def close(self):
+        if self._makefile_refs < 1:
+            try:
+                self._closed = True
+                return self.connection.close()
+            except OpenSSL.SSL.Error:
+                return
+        else:
+            self._makefile_refs -= 1
+
+    def getpeercert(self, binary_form=False):
+        x509 = self.connection.get_peer_certificate()
+
+        if not x509:
+            return x509
+
+        if binary_form:
+            return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509)
+
+        return {
+            "subject": ((("commonName", x509.get_subject().CN),),),
+            "subjectAltName": get_subj_alt_name(x509),
+        }
+
+    def version(self):
+        return self.connection.get_protocol_version_name()
+
+    def _reuse(self):
+        self._makefile_refs += 1
+
+    def _drop(self):
+        if self._makefile_refs < 1:
+            self.close()
+        else:
+            self._makefile_refs -= 1
+
+
+if _fileobject:  # Platform-specific: Python 2
+
+    def makefile(self, mode, bufsize=-1):
+        self._makefile_refs += 1
+        return _fileobject(self, mode, bufsize, close=True)
+
+
+else:  # Platform-specific: Python 3
+    makefile = backport_makefile
+
+WrappedSocket.makefile = makefile
+
+
+class PyOpenSSLContext(object):
+    """
+    I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible
+    for translating the interface of the standard library ``SSLContext`` object
+    to calls into PyOpenSSL.
+    """
+
+    def __init__(self, protocol):
+        self.protocol = _openssl_versions[protocol]
+        self._ctx = OpenSSL.SSL.Context(self.protocol)
+        self._options = 0
+        self.check_hostname = False
+
+    @property
+    def options(self):
+        return self._options
+
+    @options.setter
+    def options(self, value):
+        self._options = value
+        self._ctx.set_options(value)
+
+    @property
+    def verify_mode(self):
+        return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()]
+
+    @verify_mode.setter
+    def verify_mode(self, value):
+        self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback)
+
+    def set_default_verify_paths(self):
+        self._ctx.set_default_verify_paths()
+
+    def set_ciphers(self, ciphers):
+        if isinstance(ciphers, six.text_type):
+            ciphers = ciphers.encode("utf-8")
+        self._ctx.set_cipher_list(ciphers)
+
+    def load_verify_locations(self, cafile=None, capath=None, cadata=None):
+        if cafile is not None:
+            cafile = cafile.encode("utf-8")
+        if capath is not None:
+            capath = capath.encode("utf-8")
+        try:
+            self._ctx.load_verify_locations(cafile, capath)
+            if cadata is not None:
+                self._ctx.load_verify_locations(BytesIO(cadata))
+        except OpenSSL.SSL.Error as e:
+            raise ssl.SSLError("unable to load trusted certificates: %r" % e)
+
+    def load_cert_chain(self, certfile, keyfile=None, password=None):
+        self._ctx.use_certificate_chain_file(certfile)
+        if password is not None:
+            if not isinstance(password, six.binary_type):
+                password = password.encode("utf-8")
+            self._ctx.set_passwd_cb(lambda *_: password)
+        self._ctx.use_privatekey_file(keyfile or certfile)
+
+    def wrap_socket(
+        self,
+        sock,
+        server_side=False,
+        do_handshake_on_connect=True,
+        suppress_ragged_eofs=True,
+        server_hostname=None,
+    ):
+        cnx = OpenSSL.SSL.Connection(self._ctx, sock)
+
+        if isinstance(server_hostname, six.text_type):  # Platform-specific: Python 3
+            server_hostname = server_hostname.encode("utf-8")
+
+        if server_hostname is not None:
+            cnx.set_tlsext_host_name(server_hostname)
+
+        cnx.set_connect_state()
+
+        while True:
+            try:
+                cnx.do_handshake()
+            except OpenSSL.SSL.WantReadError:
+                if not util.wait_for_read(sock, sock.gettimeout()):
+                    raise timeout("select timed out")
+                continue
+            except OpenSSL.SSL.Error as e:
+                raise ssl.SSLError("bad handshake: %r" % e)
+            break
+
+        return WrappedSocket(cnx, sock)
+
+
+def _verify_callback(cnx, x509, err_no, err_depth, return_code):
+    return err_no == 0