comparison env/lib/python3.7/site-packages/requests_toolbelt/auth/guess.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
comparison
equal deleted inserted replaced
4:79f47841a781 5:9b1c78e6ba9c
1 # -*- coding: utf-8 -*-
2 """The module containing the code for GuessAuth."""
3 from requests import auth
4 from requests import cookies
5
6 from . import _digest_auth_compat as auth_compat, http_proxy_digest
7
8
9 class GuessAuth(auth.AuthBase):
10 """Guesses the auth type by the WWW-Authentication header."""
11 def __init__(self, username, password):
12 self.username = username
13 self.password = password
14 self.auth = None
15 self.pos = None
16
17 def _handle_basic_auth_401(self, r, kwargs):
18 if self.pos is not None:
19 r.request.body.seek(self.pos)
20
21 # Consume content and release the original connection
22 # to allow our new request to reuse the same one.
23 r.content
24 r.raw.release_conn()
25 prep = r.request.copy()
26 if not hasattr(prep, '_cookies'):
27 prep._cookies = cookies.RequestsCookieJar()
28 cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
29 prep.prepare_cookies(prep._cookies)
30
31 self.auth = auth.HTTPBasicAuth(self.username, self.password)
32 prep = self.auth(prep)
33 _r = r.connection.send(prep, **kwargs)
34 _r.history.append(r)
35 _r.request = prep
36
37 return _r
38
39 def _handle_digest_auth_401(self, r, kwargs):
40 self.auth = auth_compat.HTTPDigestAuth(self.username, self.password)
41 try:
42 self.auth.init_per_thread_state()
43 except AttributeError:
44 # If we're not on requests 2.8.0+ this method does not exist and
45 # is not relevant.
46 pass
47
48 # Check that the attr exists because much older versions of requests
49 # set this attribute lazily. For example:
50 # https://github.com/kennethreitz/requests/blob/33735480f77891754304e7f13e3cdf83aaaa76aa/requests/auth.py#L59
51 if (hasattr(self.auth, 'num_401_calls') and
52 self.auth.num_401_calls is None):
53 self.auth.num_401_calls = 1
54 # Digest auth would resend the request by itself. We can take a
55 # shortcut here.
56 return self.auth.handle_401(r, **kwargs)
57
58 def handle_401(self, r, **kwargs):
59 """Resends a request with auth headers, if needed."""
60
61 www_authenticate = r.headers.get('www-authenticate', '').lower()
62
63 if 'basic' in www_authenticate:
64 return self._handle_basic_auth_401(r, kwargs)
65
66 if 'digest' in www_authenticate:
67 return self._handle_digest_auth_401(r, kwargs)
68
69 def __call__(self, request):
70 if self.auth is not None:
71 return self.auth(request)
72
73 try:
74 self.pos = request.body.tell()
75 except AttributeError:
76 pass
77
78 request.register_hook('response', self.handle_401)
79 return request
80
81
82 class GuessProxyAuth(GuessAuth):
83 """
84 Guesses the auth type by WWW-Authentication and Proxy-Authentication
85 headers
86 """
87 def __init__(self, username=None, password=None,
88 proxy_username=None, proxy_password=None):
89 super(GuessProxyAuth, self).__init__(username, password)
90 self.proxy_username = proxy_username
91 self.proxy_password = proxy_password
92 self.proxy_auth = None
93
94 def _handle_basic_auth_407(self, r, kwargs):
95 if self.pos is not None:
96 r.request.body.seek(self.pos)
97
98 r.content
99 r.raw.release_conn()
100 prep = r.request.copy()
101 if not hasattr(prep, '_cookies'):
102 prep._cookies = cookies.RequestsCookieJar()
103 cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
104 prep.prepare_cookies(prep._cookies)
105
106 self.proxy_auth = auth.HTTPProxyAuth(self.proxy_username,
107 self.proxy_password)
108 prep = self.proxy_auth(prep)
109 _r = r.connection.send(prep, **kwargs)
110 _r.history.append(r)
111 _r.request = prep
112
113 return _r
114
115 def _handle_digest_auth_407(self, r, kwargs):
116 self.proxy_auth = http_proxy_digest.HTTPProxyDigestAuth(
117 username=self.proxy_username,
118 password=self.proxy_password)
119
120 try:
121 self.auth.init_per_thread_state()
122 except AttributeError:
123 pass
124
125 return self.proxy_auth.handle_407(r, **kwargs)
126
127 def handle_407(self, r, **kwargs):
128 proxy_authenticate = r.headers.get('Proxy-Authenticate', '').lower()
129
130 if 'basic' in proxy_authenticate:
131 return self._handle_basic_auth_407(r, kwargs)
132
133 if 'digest' in proxy_authenticate:
134 return self._handle_digest_auth_407(r, kwargs)
135
136 def __call__(self, request):
137 if self.proxy_auth is not None:
138 request = self.proxy_auth(request)
139
140 try:
141 self.pos = request.body.tell()
142 except AttributeError:
143 pass
144
145 request.register_hook('response', self.handle_407)
146 return super(GuessProxyAuth, self).__call__(request)