Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/requests_toolbelt/adapters/x509.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 # -*- coding: utf-8 -*- | |
| 2 """A X509Adapter for use with the requests library. | |
| 3 | |
| 4 This file contains an implementation of the X509Adapter that will | |
| 5 allow users to authenticate a request using an arbitrary | |
| 6 X.509 certificate without needing to convert it to a .pem file | |
| 7 | |
| 8 """ | |
| 9 | |
| 10 from OpenSSL.crypto import PKey, X509 | |
| 11 from cryptography import x509 | |
| 12 from cryptography.hazmat.primitives.serialization import (load_pem_private_key, | |
| 13 load_der_private_key) | |
| 14 from cryptography.hazmat.primitives.serialization import Encoding | |
| 15 from cryptography.hazmat.backends import default_backend | |
| 16 | |
| 17 from datetime import datetime | |
| 18 from requests.adapters import HTTPAdapter | |
| 19 import requests | |
| 20 | |
| 21 from .._compat import PyOpenSSLContext | |
| 22 from .. import exceptions as exc | |
| 23 | |
| 24 """ | |
| 25 importing the protocol constants from _ssl instead of ssl because only the | |
| 26 constants are needed and to handle issues caused by importing from ssl on | |
| 27 the 2.7.x line. | |
| 28 """ | |
| 29 try: | |
| 30 from _ssl import PROTOCOL_TLS as PROTOCOL | |
| 31 except ImportError: | |
| 32 from _ssl import PROTOCOL_SSLv23 as PROTOCOL | |
| 33 | |
| 34 | |
| 35 class X509Adapter(HTTPAdapter): | |
| 36 r"""Adapter for use with X.509 certificates. | |
| 37 | |
| 38 Provides an interface for Requests sessions to contact HTTPS urls and | |
| 39 authenticate with an X.509 cert by implementing the Transport Adapter | |
| 40 interface. This class will need to be manually instantiated and mounted | |
| 41 to the session | |
| 42 | |
| 43 :param pool_connections: The number of urllib3 connection pools to | |
| 44 cache. | |
| 45 :param pool_maxsize: The maximum number of connections to save in the | |
| 46 pool. | |
| 47 :param max_retries: The maximum number of retries each connection | |
| 48 should attempt. Note, this applies only to failed DNS lookups, | |
| 49 socket connections and connection timeouts, never to requests where | |
| 50 data has made it to the server. By default, Requests does not retry | |
| 51 failed connections. If you need granular control over the | |
| 52 conditions under which we retry a request, import urllib3's | |
| 53 ``Retry`` class and pass that instead. | |
| 54 :param pool_block: Whether the connection pool should block for | |
| 55 connections. | |
| 56 | |
| 57 :param bytes cert_bytes: | |
| 58 bytes object containing contents of a cryptography.x509Certificate | |
| 59 object using the encoding specified by the ``encoding`` parameter. | |
| 60 :param bytes pk_bytes: | |
| 61 bytes object containing contents of a object that implements | |
| 62 ``cryptography.hazmat.primitives.serialization.PrivateFormat`` | |
| 63 using the encoding specified by the ``encoding`` parameter. | |
| 64 :param password: | |
| 65 string or utf8 encoded bytes containing the passphrase used for the | |
| 66 private key. None if unencrypted. Defaults to None. | |
| 67 :param encoding: | |
| 68 Enumeration detailing the encoding method used on the ``cert_bytes`` | |
| 69 parameter. Can be either PEM or DER. Defaults to PEM. | |
| 70 :type encoding: | |
| 71 :class: `cryptography.hazmat.primitives.serialization.Encoding` | |
| 72 | |
| 73 Usage:: | |
| 74 | |
| 75 >>> import requests | |
| 76 >>> from requests_toolbelt.adapters.x509 import X509Adapter | |
| 77 >>> s = requests.Session() | |
| 78 >>> a = X509Adapter(max_retries=3, | |
| 79 cert_bytes=b'...', pk_bytes=b'...', encoding='...' | |
| 80 >>> s.mount('https://', a) | |
| 81 """ | |
| 82 | |
| 83 def __init__(self, *args, **kwargs): | |
| 84 self._check_version() | |
| 85 cert_bytes = kwargs.pop('cert_bytes', None) | |
| 86 pk_bytes = kwargs.pop('pk_bytes', None) | |
| 87 password = kwargs.pop('password', None) | |
| 88 encoding = kwargs.pop('encoding', Encoding.PEM) | |
| 89 | |
| 90 password_bytes = None | |
| 91 | |
| 92 if cert_bytes is None or not isinstance(cert_bytes, bytes): | |
| 93 raise ValueError('Invalid cert content provided. ' | |
| 94 'You must provide an X.509 cert ' | |
| 95 'formatted as a byte array.') | |
| 96 if pk_bytes is None or not isinstance(pk_bytes, bytes): | |
| 97 raise ValueError('Invalid private key content provided. ' | |
| 98 'You must provide a private key ' | |
| 99 'formatted as a byte array.') | |
| 100 | |
| 101 if isinstance(password, bytes): | |
| 102 password_bytes = password | |
| 103 elif password: | |
| 104 password_bytes = password.encode('utf8') | |
| 105 | |
| 106 self.ssl_context = create_ssl_context(cert_bytes, pk_bytes, | |
| 107 password_bytes, encoding) | |
| 108 | |
| 109 super(X509Adapter, self).__init__(*args, **kwargs) | |
| 110 | |
| 111 def init_poolmanager(self, *args, **kwargs): | |
| 112 if self.ssl_context: | |
| 113 kwargs['ssl_context'] = self.ssl_context | |
| 114 return super(X509Adapter, self).init_poolmanager(*args, **kwargs) | |
| 115 | |
| 116 def proxy_manager_for(self, *args, **kwargs): | |
| 117 if self.ssl_context: | |
| 118 kwargs['ssl_context'] = self.ssl_context | |
| 119 return super(X509Adapter, self).proxy_manager_for(*args, **kwargs) | |
| 120 | |
| 121 def _check_version(self): | |
| 122 if PyOpenSSLContext is None: | |
| 123 raise exc.VersionMismatchError( | |
| 124 "The X509Adapter requires at least Requests 2.12.0 to be " | |
| 125 "installed. Version {0} was found instead.".format( | |
| 126 requests.__version__ | |
| 127 ) | |
| 128 ) | |
| 129 | |
| 130 | |
| 131 def check_cert_dates(cert): | |
| 132 """Verify that the supplied client cert is not invalid.""" | |
| 133 | |
| 134 now = datetime.utcnow() | |
| 135 if cert.not_valid_after < now or cert.not_valid_before > now: | |
| 136 raise ValueError('Client certificate expired: Not After: ' | |
| 137 '{0:%Y-%m-%d %H:%M:%SZ} ' | |
| 138 'Not Before: {1:%Y-%m-%d %H:%M:%SZ}' | |
| 139 .format(cert.not_valid_after, cert.not_valid_before)) | |
| 140 | |
| 141 | |
| 142 def create_ssl_context(cert_byes, pk_bytes, password=None, | |
| 143 encoding=Encoding.PEM): | |
| 144 """Create an SSL Context with the supplied cert/password. | |
| 145 | |
| 146 :param cert_bytes array of bytes containing the cert encoded | |
| 147 using the method supplied in the ``encoding`` parameter | |
| 148 :param pk_bytes array of bytes containing the private key encoded | |
| 149 using the method supplied in the ``encoding`` parameter | |
| 150 :param password array of bytes containing the passphrase to be used | |
| 151 with the supplied private key. None if unencrypted. | |
| 152 Defaults to None. | |
| 153 :param encoding ``cryptography.hazmat.primitives.serialization.Encoding`` | |
| 154 details the encoding method used on the ``cert_bytes`` and | |
| 155 ``pk_bytes`` parameters. Can be either PEM or DER. | |
| 156 Defaults to PEM. | |
| 157 """ | |
| 158 backend = default_backend() | |
| 159 | |
| 160 cert = None | |
| 161 key = None | |
| 162 if encoding == Encoding.PEM: | |
| 163 cert = x509.load_pem_x509_certificate(cert_byes, backend) | |
| 164 key = load_pem_private_key(pk_bytes, password, backend) | |
| 165 elif encoding == Encoding.DER: | |
| 166 cert = x509.load_der_x509_certificate(cert_byes, backend) | |
| 167 key = load_der_private_key(pk_bytes, password, backend) | |
| 168 else: | |
| 169 raise ValueError('Invalid encoding provided: Must be PEM or DER') | |
| 170 | |
| 171 if not (cert and key): | |
| 172 raise ValueError('Cert and key could not be parsed from ' | |
| 173 'provided data') | |
| 174 check_cert_dates(cert) | |
| 175 ssl_context = PyOpenSSLContext(PROTOCOL) | |
| 176 ssl_context._ctx.use_certificate(X509.from_cryptography(cert)) | |
| 177 ssl_context._ctx.use_privatekey(PKey.from_cryptography_key(key)) | |
| 178 return ssl_context |
