Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/requests_toolbelt/adapters/x509.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 # -*- coding: utf-8 -*- | |
2 """A X509Adapter for use with the requests library. | |
3 | |
4 This file contains an implementation of the X509Adapter that will | |
5 allow users to authenticate a request using an arbitrary | |
6 X.509 certificate without needing to convert it to a .pem file | |
7 | |
8 """ | |
9 | |
10 from OpenSSL.crypto import PKey, X509 | |
11 from cryptography import x509 | |
12 from cryptography.hazmat.primitives.serialization import (load_pem_private_key, | |
13 load_der_private_key) | |
14 from cryptography.hazmat.primitives.serialization import Encoding | |
15 from cryptography.hazmat.backends import default_backend | |
16 | |
17 from datetime import datetime | |
18 from requests.adapters import HTTPAdapter | |
19 import requests | |
20 | |
21 from .._compat import PyOpenSSLContext | |
22 from .. import exceptions as exc | |
23 | |
24 """ | |
25 importing the protocol constants from _ssl instead of ssl because only the | |
26 constants are needed and to handle issues caused by importing from ssl on | |
27 the 2.7.x line. | |
28 """ | |
29 try: | |
30 from _ssl import PROTOCOL_TLS as PROTOCOL | |
31 except ImportError: | |
32 from _ssl import PROTOCOL_SSLv23 as PROTOCOL | |
33 | |
34 | |
35 class X509Adapter(HTTPAdapter): | |
36 r"""Adapter for use with X.509 certificates. | |
37 | |
38 Provides an interface for Requests sessions to contact HTTPS urls and | |
39 authenticate with an X.509 cert by implementing the Transport Adapter | |
40 interface. This class will need to be manually instantiated and mounted | |
41 to the session | |
42 | |
43 :param pool_connections: The number of urllib3 connection pools to | |
44 cache. | |
45 :param pool_maxsize: The maximum number of connections to save in the | |
46 pool. | |
47 :param max_retries: The maximum number of retries each connection | |
48 should attempt. Note, this applies only to failed DNS lookups, | |
49 socket connections and connection timeouts, never to requests where | |
50 data has made it to the server. By default, Requests does not retry | |
51 failed connections. If you need granular control over the | |
52 conditions under which we retry a request, import urllib3's | |
53 ``Retry`` class and pass that instead. | |
54 :param pool_block: Whether the connection pool should block for | |
55 connections. | |
56 | |
57 :param bytes cert_bytes: | |
58 bytes object containing contents of a cryptography.x509Certificate | |
59 object using the encoding specified by the ``encoding`` parameter. | |
60 :param bytes pk_bytes: | |
61 bytes object containing contents of a object that implements | |
62 ``cryptography.hazmat.primitives.serialization.PrivateFormat`` | |
63 using the encoding specified by the ``encoding`` parameter. | |
64 :param password: | |
65 string or utf8 encoded bytes containing the passphrase used for the | |
66 private key. None if unencrypted. Defaults to None. | |
67 :param encoding: | |
68 Enumeration detailing the encoding method used on the ``cert_bytes`` | |
69 parameter. Can be either PEM or DER. Defaults to PEM. | |
70 :type encoding: | |
71 :class: `cryptography.hazmat.primitives.serialization.Encoding` | |
72 | |
73 Usage:: | |
74 | |
75 >>> import requests | |
76 >>> from requests_toolbelt.adapters.x509 import X509Adapter | |
77 >>> s = requests.Session() | |
78 >>> a = X509Adapter(max_retries=3, | |
79 cert_bytes=b'...', pk_bytes=b'...', encoding='...' | |
80 >>> s.mount('https://', a) | |
81 """ | |
82 | |
83 def __init__(self, *args, **kwargs): | |
84 self._check_version() | |
85 cert_bytes = kwargs.pop('cert_bytes', None) | |
86 pk_bytes = kwargs.pop('pk_bytes', None) | |
87 password = kwargs.pop('password', None) | |
88 encoding = kwargs.pop('encoding', Encoding.PEM) | |
89 | |
90 password_bytes = None | |
91 | |
92 if cert_bytes is None or not isinstance(cert_bytes, bytes): | |
93 raise ValueError('Invalid cert content provided. ' | |
94 'You must provide an X.509 cert ' | |
95 'formatted as a byte array.') | |
96 if pk_bytes is None or not isinstance(pk_bytes, bytes): | |
97 raise ValueError('Invalid private key content provided. ' | |
98 'You must provide a private key ' | |
99 'formatted as a byte array.') | |
100 | |
101 if isinstance(password, bytes): | |
102 password_bytes = password | |
103 elif password: | |
104 password_bytes = password.encode('utf8') | |
105 | |
106 self.ssl_context = create_ssl_context(cert_bytes, pk_bytes, | |
107 password_bytes, encoding) | |
108 | |
109 super(X509Adapter, self).__init__(*args, **kwargs) | |
110 | |
111 def init_poolmanager(self, *args, **kwargs): | |
112 if self.ssl_context: | |
113 kwargs['ssl_context'] = self.ssl_context | |
114 return super(X509Adapter, self).init_poolmanager(*args, **kwargs) | |
115 | |
116 def proxy_manager_for(self, *args, **kwargs): | |
117 if self.ssl_context: | |
118 kwargs['ssl_context'] = self.ssl_context | |
119 return super(X509Adapter, self).proxy_manager_for(*args, **kwargs) | |
120 | |
121 def _check_version(self): | |
122 if PyOpenSSLContext is None: | |
123 raise exc.VersionMismatchError( | |
124 "The X509Adapter requires at least Requests 2.12.0 to be " | |
125 "installed. Version {0} was found instead.".format( | |
126 requests.__version__ | |
127 ) | |
128 ) | |
129 | |
130 | |
131 def check_cert_dates(cert): | |
132 """Verify that the supplied client cert is not invalid.""" | |
133 | |
134 now = datetime.utcnow() | |
135 if cert.not_valid_after < now or cert.not_valid_before > now: | |
136 raise ValueError('Client certificate expired: Not After: ' | |
137 '{0:%Y-%m-%d %H:%M:%SZ} ' | |
138 'Not Before: {1:%Y-%m-%d %H:%M:%SZ}' | |
139 .format(cert.not_valid_after, cert.not_valid_before)) | |
140 | |
141 | |
142 def create_ssl_context(cert_byes, pk_bytes, password=None, | |
143 encoding=Encoding.PEM): | |
144 """Create an SSL Context with the supplied cert/password. | |
145 | |
146 :param cert_bytes array of bytes containing the cert encoded | |
147 using the method supplied in the ``encoding`` parameter | |
148 :param pk_bytes array of bytes containing the private key encoded | |
149 using the method supplied in the ``encoding`` parameter | |
150 :param password array of bytes containing the passphrase to be used | |
151 with the supplied private key. None if unencrypted. | |
152 Defaults to None. | |
153 :param encoding ``cryptography.hazmat.primitives.serialization.Encoding`` | |
154 details the encoding method used on the ``cert_bytes`` and | |
155 ``pk_bytes`` parameters. Can be either PEM or DER. | |
156 Defaults to PEM. | |
157 """ | |
158 backend = default_backend() | |
159 | |
160 cert = None | |
161 key = None | |
162 if encoding == Encoding.PEM: | |
163 cert = x509.load_pem_x509_certificate(cert_byes, backend) | |
164 key = load_pem_private_key(pk_bytes, password, backend) | |
165 elif encoding == Encoding.DER: | |
166 cert = x509.load_der_x509_certificate(cert_byes, backend) | |
167 key = load_der_private_key(pk_bytes, password, backend) | |
168 else: | |
169 raise ValueError('Invalid encoding provided: Must be PEM or DER') | |
170 | |
171 if not (cert and key): | |
172 raise ValueError('Cert and key could not be parsed from ' | |
173 'provided data') | |
174 check_cert_dates(cert) | |
175 ssl_context = PyOpenSSLContext(PROTOCOL) | |
176 ssl_context._ctx.use_certificate(X509.from_cryptography(cert)) | |
177 ssl_context._ctx.use_privatekey(PKey.from_cryptography_key(key)) | |
178 return ssl_context |