Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/requests_toolbelt/adapters/x509.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/requests_toolbelt/adapters/x509.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,178 @@ +# -*- coding: utf-8 -*- +"""A X509Adapter for use with the requests library. + +This file contains an implementation of the X509Adapter that will +allow users to authenticate a request using an arbitrary +X.509 certificate without needing to convert it to a .pem file + +""" + +from OpenSSL.crypto import PKey, X509 +from cryptography import x509 +from cryptography.hazmat.primitives.serialization import (load_pem_private_key, + load_der_private_key) +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography.hazmat.backends import default_backend + +from datetime import datetime +from requests.adapters import HTTPAdapter +import requests + +from .._compat import PyOpenSSLContext +from .. import exceptions as exc + +""" +importing the protocol constants from _ssl instead of ssl because only the +constants are needed and to handle issues caused by importing from ssl on +the 2.7.x line. +""" +try: + from _ssl import PROTOCOL_TLS as PROTOCOL +except ImportError: + from _ssl import PROTOCOL_SSLv23 as PROTOCOL + + +class X509Adapter(HTTPAdapter): + r"""Adapter for use with X.509 certificates. + + Provides an interface for Requests sessions to contact HTTPS urls and + authenticate with an X.509 cert by implementing the Transport Adapter + interface. This class will need to be manually instantiated and mounted + to the session + + :param pool_connections: The number of urllib3 connection pools to + cache. + :param pool_maxsize: The maximum number of connections to save in the + pool. + :param max_retries: The maximum number of retries each connection + should attempt. Note, this applies only to failed DNS lookups, + socket connections and connection timeouts, never to requests where + data has made it to the server. By default, Requests does not retry + failed connections. If you need granular control over the + conditions under which we retry a request, import urllib3's + ``Retry`` class and pass that instead. + :param pool_block: Whether the connection pool should block for + connections. + + :param bytes cert_bytes: + bytes object containing contents of a cryptography.x509Certificate + object using the encoding specified by the ``encoding`` parameter. + :param bytes pk_bytes: + bytes object containing contents of a object that implements + ``cryptography.hazmat.primitives.serialization.PrivateFormat`` + using the encoding specified by the ``encoding`` parameter. + :param password: + string or utf8 encoded bytes containing the passphrase used for the + private key. None if unencrypted. Defaults to None. + :param encoding: + Enumeration detailing the encoding method used on the ``cert_bytes`` + parameter. Can be either PEM or DER. Defaults to PEM. + :type encoding: + :class: `cryptography.hazmat.primitives.serialization.Encoding` + + Usage:: + + >>> import requests + >>> from requests_toolbelt.adapters.x509 import X509Adapter + >>> s = requests.Session() + >>> a = X509Adapter(max_retries=3, + cert_bytes=b'...', pk_bytes=b'...', encoding='...' + >>> s.mount('https://', a) + """ + + def __init__(self, *args, **kwargs): + self._check_version() + cert_bytes = kwargs.pop('cert_bytes', None) + pk_bytes = kwargs.pop('pk_bytes', None) + password = kwargs.pop('password', None) + encoding = kwargs.pop('encoding', Encoding.PEM) + + password_bytes = None + + if cert_bytes is None or not isinstance(cert_bytes, bytes): + raise ValueError('Invalid cert content provided. ' + 'You must provide an X.509 cert ' + 'formatted as a byte array.') + if pk_bytes is None or not isinstance(pk_bytes, bytes): + raise ValueError('Invalid private key content provided. ' + 'You must provide a private key ' + 'formatted as a byte array.') + + if isinstance(password, bytes): + password_bytes = password + elif password: + password_bytes = password.encode('utf8') + + self.ssl_context = create_ssl_context(cert_bytes, pk_bytes, + password_bytes, encoding) + + super(X509Adapter, self).__init__(*args, **kwargs) + + def init_poolmanager(self, *args, **kwargs): + if self.ssl_context: + kwargs['ssl_context'] = self.ssl_context + return super(X509Adapter, self).init_poolmanager(*args, **kwargs) + + def proxy_manager_for(self, *args, **kwargs): + if self.ssl_context: + kwargs['ssl_context'] = self.ssl_context + return super(X509Adapter, self).proxy_manager_for(*args, **kwargs) + + def _check_version(self): + if PyOpenSSLContext is None: + raise exc.VersionMismatchError( + "The X509Adapter requires at least Requests 2.12.0 to be " + "installed. Version {0} was found instead.".format( + requests.__version__ + ) + ) + + +def check_cert_dates(cert): + """Verify that the supplied client cert is not invalid.""" + + now = datetime.utcnow() + if cert.not_valid_after < now or cert.not_valid_before > now: + raise ValueError('Client certificate expired: Not After: ' + '{0:%Y-%m-%d %H:%M:%SZ} ' + 'Not Before: {1:%Y-%m-%d %H:%M:%SZ}' + .format(cert.not_valid_after, cert.not_valid_before)) + + +def create_ssl_context(cert_byes, pk_bytes, password=None, + encoding=Encoding.PEM): + """Create an SSL Context with the supplied cert/password. + + :param cert_bytes array of bytes containing the cert encoded + using the method supplied in the ``encoding`` parameter + :param pk_bytes array of bytes containing the private key encoded + using the method supplied in the ``encoding`` parameter + :param password array of bytes containing the passphrase to be used + with the supplied private key. None if unencrypted. + Defaults to None. + :param encoding ``cryptography.hazmat.primitives.serialization.Encoding`` + details the encoding method used on the ``cert_bytes`` and + ``pk_bytes`` parameters. Can be either PEM or DER. + Defaults to PEM. + """ + backend = default_backend() + + cert = None + key = None + if encoding == Encoding.PEM: + cert = x509.load_pem_x509_certificate(cert_byes, backend) + key = load_pem_private_key(pk_bytes, password, backend) + elif encoding == Encoding.DER: + cert = x509.load_der_x509_certificate(cert_byes, backend) + key = load_der_private_key(pk_bytes, password, backend) + else: + raise ValueError('Invalid encoding provided: Must be PEM or DER') + + if not (cert and key): + raise ValueError('Cert and key could not be parsed from ' + 'provided data') + check_cert_dates(cert) + ssl_context = PyOpenSSLContext(PROTOCOL) + ssl_context._ctx.use_certificate(X509.from_cryptography(cert)) + ssl_context._ctx.use_privatekey(PKey.from_cryptography_key(key)) + return ssl_context