Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/network/auth.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:47:31 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9e54283cc701 |
---|---|
1 """Network Authentication Helpers | |
2 | |
3 Contains interface (MultiDomainBasicAuth) and associated glue code for | |
4 providing credentials in the context of network requests. | |
5 """ | |
6 | |
7 # The following comment should be removed at some point in the future. | |
8 # mypy: disallow-untyped-defs=False | |
9 | |
10 import logging | |
11 | |
12 from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth | |
13 from pip._vendor.requests.utils import get_netrc_auth | |
14 from pip._vendor.six.moves.urllib import parse as urllib_parse | |
15 | |
16 from pip._internal.utils.misc import ( | |
17 ask, | |
18 ask_input, | |
19 ask_password, | |
20 remove_auth_from_url, | |
21 split_auth_netloc_from_url, | |
22 ) | |
23 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
24 | |
25 if MYPY_CHECK_RUNNING: | |
26 from optparse import Values | |
27 from typing import Dict, Optional, Tuple | |
28 | |
29 from pip._internal.vcs.versioncontrol import AuthInfo | |
30 | |
31 Credentials = Tuple[str, str, str] | |
32 | |
33 logger = logging.getLogger(__name__) | |
34 | |
35 try: | |
36 import keyring # noqa | |
37 except ImportError: | |
38 keyring = None | |
39 except Exception as exc: | |
40 logger.warning( | |
41 "Keyring is skipped due to an exception: %s", str(exc), | |
42 ) | |
43 keyring = None | |
44 | |
45 | |
46 def get_keyring_auth(url, username): | |
47 """Return the tuple auth for a given url from keyring.""" | |
48 if not url or not keyring: | |
49 return None | |
50 | |
51 try: | |
52 try: | |
53 get_credential = keyring.get_credential | |
54 except AttributeError: | |
55 pass | |
56 else: | |
57 logger.debug("Getting credentials from keyring for %s", url) | |
58 cred = get_credential(url, username) | |
59 if cred is not None: | |
60 return cred.username, cred.password | |
61 return None | |
62 | |
63 if username: | |
64 logger.debug("Getting password from keyring for %s", url) | |
65 password = keyring.get_password(url, username) | |
66 if password: | |
67 return username, password | |
68 | |
69 except Exception as exc: | |
70 logger.warning( | |
71 "Keyring is skipped due to an exception: %s", str(exc), | |
72 ) | |
73 | |
74 | |
75 class MultiDomainBasicAuth(AuthBase): | |
76 | |
77 def __init__(self, prompting=True, index_urls=None): | |
78 # type: (bool, Optional[Values]) -> None | |
79 self.prompting = prompting | |
80 self.index_urls = index_urls | |
81 self.passwords = {} # type: Dict[str, AuthInfo] | |
82 # When the user is prompted to enter credentials and keyring is | |
83 # available, we will offer to save them. If the user accepts, | |
84 # this value is set to the credentials they entered. After the | |
85 # request authenticates, the caller should call | |
86 # ``save_credentials`` to save these. | |
87 self._credentials_to_save = None # type: Optional[Credentials] | |
88 | |
89 def _get_index_url(self, url): | |
90 """Return the original index URL matching the requested URL. | |
91 | |
92 Cached or dynamically generated credentials may work against | |
93 the original index URL rather than just the netloc. | |
94 | |
95 The provided url should have had its username and password | |
96 removed already. If the original index url had credentials then | |
97 they will be included in the return value. | |
98 | |
99 Returns None if no matching index was found, or if --no-index | |
100 was specified by the user. | |
101 """ | |
102 if not url or not self.index_urls: | |
103 return None | |
104 | |
105 for u in self.index_urls: | |
106 prefix = remove_auth_from_url(u).rstrip("/") + "/" | |
107 if url.startswith(prefix): | |
108 return u | |
109 | |
110 def _get_new_credentials(self, original_url, allow_netrc=True, | |
111 allow_keyring=True): | |
112 """Find and return credentials for the specified URL.""" | |
113 # Split the credentials and netloc from the url. | |
114 url, netloc, url_user_password = split_auth_netloc_from_url( | |
115 original_url, | |
116 ) | |
117 | |
118 # Start with the credentials embedded in the url | |
119 username, password = url_user_password | |
120 if username is not None and password is not None: | |
121 logger.debug("Found credentials in url for %s", netloc) | |
122 return url_user_password | |
123 | |
124 # Find a matching index url for this request | |
125 index_url = self._get_index_url(url) | |
126 if index_url: | |
127 # Split the credentials from the url. | |
128 index_info = split_auth_netloc_from_url(index_url) | |
129 if index_info: | |
130 index_url, _, index_url_user_password = index_info | |
131 logger.debug("Found index url %s", index_url) | |
132 | |
133 # If an index URL was found, try its embedded credentials | |
134 if index_url and index_url_user_password[0] is not None: | |
135 username, password = index_url_user_password | |
136 if username is not None and password is not None: | |
137 logger.debug("Found credentials in index url for %s", netloc) | |
138 return index_url_user_password | |
139 | |
140 # Get creds from netrc if we still don't have them | |
141 if allow_netrc: | |
142 netrc_auth = get_netrc_auth(original_url) | |
143 if netrc_auth: | |
144 logger.debug("Found credentials in netrc for %s", netloc) | |
145 return netrc_auth | |
146 | |
147 # If we don't have a password and keyring is available, use it. | |
148 if allow_keyring: | |
149 # The index url is more specific than the netloc, so try it first | |
150 kr_auth = ( | |
151 get_keyring_auth(index_url, username) or | |
152 get_keyring_auth(netloc, username) | |
153 ) | |
154 if kr_auth: | |
155 logger.debug("Found credentials in keyring for %s", netloc) | |
156 return kr_auth | |
157 | |
158 return username, password | |
159 | |
160 def _get_url_and_credentials(self, original_url): | |
161 """Return the credentials to use for the provided URL. | |
162 | |
163 If allowed, netrc and keyring may be used to obtain the | |
164 correct credentials. | |
165 | |
166 Returns (url_without_credentials, username, password). Note | |
167 that even if the original URL contains credentials, this | |
168 function may return a different username and password. | |
169 """ | |
170 url, netloc, _ = split_auth_netloc_from_url(original_url) | |
171 | |
172 # Use any stored credentials that we have for this netloc | |
173 username, password = self.passwords.get(netloc, (None, None)) | |
174 | |
175 if username is None and password is None: | |
176 # No stored credentials. Acquire new credentials without prompting | |
177 # the user. (e.g. from netrc, keyring, or the URL itself) | |
178 username, password = self._get_new_credentials(original_url) | |
179 | |
180 if username is not None or password is not None: | |
181 # Convert the username and password if they're None, so that | |
182 # this netloc will show up as "cached" in the conditional above. | |
183 # Further, HTTPBasicAuth doesn't accept None, so it makes sense to | |
184 # cache the value that is going to be used. | |
185 username = username or "" | |
186 password = password or "" | |
187 | |
188 # Store any acquired credentials. | |
189 self.passwords[netloc] = (username, password) | |
190 | |
191 assert ( | |
192 # Credentials were found | |
193 (username is not None and password is not None) or | |
194 # Credentials were not found | |
195 (username is None and password is None) | |
196 ), "Could not load credentials from url: {}".format(original_url) | |
197 | |
198 return url, username, password | |
199 | |
200 def __call__(self, req): | |
201 # Get credentials for this request | |
202 url, username, password = self._get_url_and_credentials(req.url) | |
203 | |
204 # Set the url of the request to the url without any credentials | |
205 req.url = url | |
206 | |
207 if username is not None and password is not None: | |
208 # Send the basic auth with this request | |
209 req = HTTPBasicAuth(username, password)(req) | |
210 | |
211 # Attach a hook to handle 401 responses | |
212 req.register_hook("response", self.handle_401) | |
213 | |
214 return req | |
215 | |
216 # Factored out to allow for easy patching in tests | |
217 def _prompt_for_password(self, netloc): | |
218 username = ask_input("User for %s: " % netloc) | |
219 if not username: | |
220 return None, None | |
221 auth = get_keyring_auth(netloc, username) | |
222 if auth: | |
223 return auth[0], auth[1], False | |
224 password = ask_password("Password: ") | |
225 return username, password, True | |
226 | |
227 # Factored out to allow for easy patching in tests | |
228 def _should_save_password_to_keyring(self): | |
229 if not keyring: | |
230 return False | |
231 return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" | |
232 | |
233 def handle_401(self, resp, **kwargs): | |
234 # We only care about 401 responses, anything else we want to just | |
235 # pass through the actual response | |
236 if resp.status_code != 401: | |
237 return resp | |
238 | |
239 # We are not able to prompt the user so simply return the response | |
240 if not self.prompting: | |
241 return resp | |
242 | |
243 parsed = urllib_parse.urlparse(resp.url) | |
244 | |
245 # Prompt the user for a new username and password | |
246 username, password, save = self._prompt_for_password(parsed.netloc) | |
247 | |
248 # Store the new username and password to use for future requests | |
249 self._credentials_to_save = None | |
250 if username is not None and password is not None: | |
251 self.passwords[parsed.netloc] = (username, password) | |
252 | |
253 # Prompt to save the password to keyring | |
254 if save and self._should_save_password_to_keyring(): | |
255 self._credentials_to_save = (parsed.netloc, username, password) | |
256 | |
257 # Consume content and release the original connection to allow our new | |
258 # request to reuse the same one. | |
259 resp.content | |
260 resp.raw.release_conn() | |
261 | |
262 # Add our new username and password to the request | |
263 req = HTTPBasicAuth(username or "", password or "")(resp.request) | |
264 req.register_hook("response", self.warn_on_401) | |
265 | |
266 # On successful request, save the credentials that were used to | |
267 # keyring. (Note that if the user responded "no" above, this member | |
268 # is not set and nothing will be saved.) | |
269 if self._credentials_to_save: | |
270 req.register_hook("response", self.save_credentials) | |
271 | |
272 # Send our new request | |
273 new_resp = resp.connection.send(req, **kwargs) | |
274 new_resp.history.append(resp) | |
275 | |
276 return new_resp | |
277 | |
278 def warn_on_401(self, resp, **kwargs): | |
279 """Response callback to warn about incorrect credentials.""" | |
280 if resp.status_code == 401: | |
281 logger.warning( | |
282 '401 Error, Credentials not correct for %s', resp.request.url, | |
283 ) | |
284 | |
285 def save_credentials(self, resp, **kwargs): | |
286 """Response callback to save credentials on success.""" | |
287 assert keyring is not None, "should never reach here without keyring" | |
288 if not keyring: | |
289 return | |
290 | |
291 creds = self._credentials_to_save | |
292 self._credentials_to_save = None | |
293 if creds and resp.status_code < 400: | |
294 try: | |
295 logger.info('Saving credentials to keyring') | |
296 keyring.set_password(*creds) | |
297 except Exception: | |
298 logger.exception('Failed to save credentials') |