comparison env/lib/python3.7/site-packages/requests_toolbelt/adapters/x509.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
comparison
equal deleted inserted replaced
4:79f47841a781 5:9b1c78e6ba9c
1 # -*- coding: utf-8 -*-
2 """A X509Adapter for use with the requests library.
3
4 This file contains an implementation of the X509Adapter that will
5 allow users to authenticate a request using an arbitrary
6 X.509 certificate without needing to convert it to a .pem file
7
8 """
9
10 from OpenSSL.crypto import PKey, X509
11 from cryptography import x509
12 from cryptography.hazmat.primitives.serialization import (load_pem_private_key,
13 load_der_private_key)
14 from cryptography.hazmat.primitives.serialization import Encoding
15 from cryptography.hazmat.backends import default_backend
16
17 from datetime import datetime
18 from requests.adapters import HTTPAdapter
19 import requests
20
21 from .._compat import PyOpenSSLContext
22 from .. import exceptions as exc
23
24 """
25 importing the protocol constants from _ssl instead of ssl because only the
26 constants are needed and to handle issues caused by importing from ssl on
27 the 2.7.x line.
28 """
29 try:
30 from _ssl import PROTOCOL_TLS as PROTOCOL
31 except ImportError:
32 from _ssl import PROTOCOL_SSLv23 as PROTOCOL
33
34
35 class X509Adapter(HTTPAdapter):
36 r"""Adapter for use with X.509 certificates.
37
38 Provides an interface for Requests sessions to contact HTTPS urls and
39 authenticate with an X.509 cert by implementing the Transport Adapter
40 interface. This class will need to be manually instantiated and mounted
41 to the session
42
43 :param pool_connections: The number of urllib3 connection pools to
44 cache.
45 :param pool_maxsize: The maximum number of connections to save in the
46 pool.
47 :param max_retries: The maximum number of retries each connection
48 should attempt. Note, this applies only to failed DNS lookups,
49 socket connections and connection timeouts, never to requests where
50 data has made it to the server. By default, Requests does not retry
51 failed connections. If you need granular control over the
52 conditions under which we retry a request, import urllib3's
53 ``Retry`` class and pass that instead.
54 :param pool_block: Whether the connection pool should block for
55 connections.
56
57 :param bytes cert_bytes:
58 bytes object containing contents of a cryptography.x509Certificate
59 object using the encoding specified by the ``encoding`` parameter.
60 :param bytes pk_bytes:
61 bytes object containing contents of a object that implements
62 ``cryptography.hazmat.primitives.serialization.PrivateFormat``
63 using the encoding specified by the ``encoding`` parameter.
64 :param password:
65 string or utf8 encoded bytes containing the passphrase used for the
66 private key. None if unencrypted. Defaults to None.
67 :param encoding:
68 Enumeration detailing the encoding method used on the ``cert_bytes``
69 parameter. Can be either PEM or DER. Defaults to PEM.
70 :type encoding:
71 :class: `cryptography.hazmat.primitives.serialization.Encoding`
72
73 Usage::
74
75 >>> import requests
76 >>> from requests_toolbelt.adapters.x509 import X509Adapter
77 >>> s = requests.Session()
78 >>> a = X509Adapter(max_retries=3,
79 cert_bytes=b'...', pk_bytes=b'...', encoding='...'
80 >>> s.mount('https://', a)
81 """
82
83 def __init__(self, *args, **kwargs):
84 self._check_version()
85 cert_bytes = kwargs.pop('cert_bytes', None)
86 pk_bytes = kwargs.pop('pk_bytes', None)
87 password = kwargs.pop('password', None)
88 encoding = kwargs.pop('encoding', Encoding.PEM)
89
90 password_bytes = None
91
92 if cert_bytes is None or not isinstance(cert_bytes, bytes):
93 raise ValueError('Invalid cert content provided. '
94 'You must provide an X.509 cert '
95 'formatted as a byte array.')
96 if pk_bytes is None or not isinstance(pk_bytes, bytes):
97 raise ValueError('Invalid private key content provided. '
98 'You must provide a private key '
99 'formatted as a byte array.')
100
101 if isinstance(password, bytes):
102 password_bytes = password
103 elif password:
104 password_bytes = password.encode('utf8')
105
106 self.ssl_context = create_ssl_context(cert_bytes, pk_bytes,
107 password_bytes, encoding)
108
109 super(X509Adapter, self).__init__(*args, **kwargs)
110
111 def init_poolmanager(self, *args, **kwargs):
112 if self.ssl_context:
113 kwargs['ssl_context'] = self.ssl_context
114 return super(X509Adapter, self).init_poolmanager(*args, **kwargs)
115
116 def proxy_manager_for(self, *args, **kwargs):
117 if self.ssl_context:
118 kwargs['ssl_context'] = self.ssl_context
119 return super(X509Adapter, self).proxy_manager_for(*args, **kwargs)
120
121 def _check_version(self):
122 if PyOpenSSLContext is None:
123 raise exc.VersionMismatchError(
124 "The X509Adapter requires at least Requests 2.12.0 to be "
125 "installed. Version {0} was found instead.".format(
126 requests.__version__
127 )
128 )
129
130
131 def check_cert_dates(cert):
132 """Verify that the supplied client cert is not invalid."""
133
134 now = datetime.utcnow()
135 if cert.not_valid_after < now or cert.not_valid_before > now:
136 raise ValueError('Client certificate expired: Not After: '
137 '{0:%Y-%m-%d %H:%M:%SZ} '
138 'Not Before: {1:%Y-%m-%d %H:%M:%SZ}'
139 .format(cert.not_valid_after, cert.not_valid_before))
140
141
142 def create_ssl_context(cert_byes, pk_bytes, password=None,
143 encoding=Encoding.PEM):
144 """Create an SSL Context with the supplied cert/password.
145
146 :param cert_bytes array of bytes containing the cert encoded
147 using the method supplied in the ``encoding`` parameter
148 :param pk_bytes array of bytes containing the private key encoded
149 using the method supplied in the ``encoding`` parameter
150 :param password array of bytes containing the passphrase to be used
151 with the supplied private key. None if unencrypted.
152 Defaults to None.
153 :param encoding ``cryptography.hazmat.primitives.serialization.Encoding``
154 details the encoding method used on the ``cert_bytes`` and
155 ``pk_bytes`` parameters. Can be either PEM or DER.
156 Defaults to PEM.
157 """
158 backend = default_backend()
159
160 cert = None
161 key = None
162 if encoding == Encoding.PEM:
163 cert = x509.load_pem_x509_certificate(cert_byes, backend)
164 key = load_pem_private_key(pk_bytes, password, backend)
165 elif encoding == Encoding.DER:
166 cert = x509.load_der_x509_certificate(cert_byes, backend)
167 key = load_der_private_key(pk_bytes, password, backend)
168 else:
169 raise ValueError('Invalid encoding provided: Must be PEM or DER')
170
171 if not (cert and key):
172 raise ValueError('Cert and key could not be parsed from '
173 'provided data')
174 check_cert_dates(cert)
175 ssl_context = PyOpenSSLContext(PROTOCOL)
176 ssl_context._ctx.use_certificate(X509.from_cryptography(cert))
177 ssl_context._ctx.use_privatekey(PKey.from_cryptography_key(key))
178 return ssl_context