diff env/lib/python3.7/site-packages/requests_toolbelt/adapters/x509.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/requests_toolbelt/adapters/x509.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,178 +0,0 @@
-# -*- coding: utf-8 -*-
-"""A X509Adapter for use with the requests library.
-
-This file contains an implementation of the X509Adapter that will
-allow users to authenticate a request using an arbitrary
-X.509 certificate without needing to convert it to a .pem file
-
-"""
-
-from OpenSSL.crypto import PKey, X509
-from cryptography import x509
-from cryptography.hazmat.primitives.serialization import (load_pem_private_key,
-                                                          load_der_private_key)
-from cryptography.hazmat.primitives.serialization import Encoding
-from cryptography.hazmat.backends import default_backend
-
-from datetime import datetime
-from requests.adapters import HTTPAdapter
-import requests
-
-from .._compat import PyOpenSSLContext
-from .. import exceptions as exc
-
-"""
-importing the protocol constants from _ssl instead of ssl because only the
-constants are needed and to handle issues caused by importing from ssl on
-the 2.7.x line.
-"""
-try:
-    from _ssl import PROTOCOL_TLS as PROTOCOL
-except ImportError:
-    from _ssl import PROTOCOL_SSLv23 as PROTOCOL
-
-
-class X509Adapter(HTTPAdapter):
-    r"""Adapter for use with X.509 certificates.
-
-    Provides an interface for Requests sessions to contact HTTPS urls and
-    authenticate  with an X.509 cert by implementing the Transport Adapter
-    interface. This class will need to be manually instantiated and mounted
-    to the session
-
-    :param pool_connections: The number of urllib3 connection pools to
-           cache.
-    :param pool_maxsize: The maximum number of connections to save in the
-            pool.
-    :param max_retries: The maximum number of retries each connection
-        should attempt. Note, this applies only to failed DNS lookups,
-        socket connections and connection timeouts, never to requests where
-        data has made it to the server. By default, Requests does not retry
-        failed connections. If you need granular control over the
-        conditions under which we retry a request, import urllib3's
-        ``Retry`` class and pass that instead.
-    :param pool_block: Whether the connection pool should block for
-            connections.
-
-    :param bytes cert_bytes:
-        bytes object containing contents of a cryptography.x509Certificate
-        object using the encoding specified by the ``encoding`` parameter.
-    :param bytes pk_bytes:
-        bytes object containing contents of a object that implements
-        ``cryptography.hazmat.primitives.serialization.PrivateFormat``
-        using the encoding specified by the ``encoding`` parameter.
-    :param password:
-        string or utf8 encoded bytes containing the passphrase used for the
-        private key. None if unencrypted. Defaults to None.
-    :param encoding:
-        Enumeration detailing the encoding method used on the ``cert_bytes``
-        parameter. Can be either PEM or DER. Defaults to PEM.
-    :type encoding:
-        :class: `cryptography.hazmat.primitives.serialization.Encoding`
-
-    Usage::
-
-      >>> import requests
-      >>> from requests_toolbelt.adapters.x509 import X509Adapter
-      >>> s = requests.Session()
-      >>> a = X509Adapter(max_retries=3,
-                cert_bytes=b'...', pk_bytes=b'...', encoding='...'
-      >>> s.mount('https://', a)
-    """
-
-    def __init__(self, *args, **kwargs):
-        self._check_version()
-        cert_bytes = kwargs.pop('cert_bytes', None)
-        pk_bytes = kwargs.pop('pk_bytes', None)
-        password = kwargs.pop('password', None)
-        encoding = kwargs.pop('encoding', Encoding.PEM)
-
-        password_bytes = None
-
-        if cert_bytes is None or not isinstance(cert_bytes, bytes):
-            raise ValueError('Invalid cert content provided. '
-                             'You must provide an X.509 cert '
-                             'formatted as a byte array.')
-        if pk_bytes is None or not isinstance(pk_bytes, bytes):
-            raise ValueError('Invalid private key content provided. '
-                             'You must provide a private key '
-                             'formatted as a byte array.')
-
-        if isinstance(password, bytes):
-            password_bytes = password
-        elif password:
-            password_bytes = password.encode('utf8')
-
-        self.ssl_context = create_ssl_context(cert_bytes, pk_bytes,
-                                              password_bytes, encoding)
-
-        super(X509Adapter, self).__init__(*args, **kwargs)
-
-    def init_poolmanager(self, *args, **kwargs):
-        if self.ssl_context:
-            kwargs['ssl_context'] = self.ssl_context
-        return super(X509Adapter, self).init_poolmanager(*args, **kwargs)
-
-    def proxy_manager_for(self, *args, **kwargs):
-        if self.ssl_context:
-            kwargs['ssl_context'] = self.ssl_context
-        return super(X509Adapter, self).proxy_manager_for(*args, **kwargs)
-
-    def _check_version(self):
-        if PyOpenSSLContext is None:
-            raise exc.VersionMismatchError(
-                "The X509Adapter requires at least Requests 2.12.0 to be "
-                "installed. Version {0} was found instead.".format(
-                    requests.__version__
-                )
-            )
-
-
-def check_cert_dates(cert):
-    """Verify that the supplied client cert is not invalid."""
-
-    now = datetime.utcnow()
-    if cert.not_valid_after < now or cert.not_valid_before > now:
-        raise ValueError('Client certificate expired: Not After: '
-                         '{0:%Y-%m-%d %H:%M:%SZ} '
-                         'Not Before: {1:%Y-%m-%d %H:%M:%SZ}'
-                         .format(cert.not_valid_after, cert.not_valid_before))
-
-
-def create_ssl_context(cert_byes, pk_bytes, password=None,
-                       encoding=Encoding.PEM):
-    """Create an SSL Context with the supplied cert/password.
-
-    :param cert_bytes array of bytes containing the cert encoded
-           using the method supplied in the ``encoding`` parameter
-    :param pk_bytes array of bytes containing the private key encoded
-           using the method supplied in the ``encoding`` parameter
-    :param password array of bytes containing the passphrase to be used
-           with the supplied private key. None if unencrypted.
-           Defaults to None.
-    :param encoding ``cryptography.hazmat.primitives.serialization.Encoding``
-            details the encoding method used on the ``cert_bytes``  and
-            ``pk_bytes`` parameters. Can be either PEM or DER.
-            Defaults to PEM.
-    """
-    backend = default_backend()
-
-    cert = None
-    key = None
-    if encoding == Encoding.PEM:
-        cert = x509.load_pem_x509_certificate(cert_byes, backend)
-        key = load_pem_private_key(pk_bytes, password, backend)
-    elif encoding == Encoding.DER:
-        cert = x509.load_der_x509_certificate(cert_byes, backend)
-        key = load_der_private_key(pk_bytes, password, backend)
-    else:
-        raise ValueError('Invalid encoding provided: Must be PEM or DER')
-
-    if not (cert and key):
-        raise ValueError('Cert and key could not be parsed from '
-                         'provided data')
-    check_cert_dates(cert)
-    ssl_context = PyOpenSSLContext(PROTOCOL)
-    ssl_context._ctx.use_certificate(X509.from_cryptography(cert))
-    ssl_context._ctx.use_privatekey(PKey.from_cryptography_key(key))
-    return ssl_context