Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/network/auth.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
| author | guerler |
|---|---|
| date | Mon, 27 Jul 2020 03:47:31 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:9e54283cc701 |
|---|---|
| 1 """Network Authentication Helpers | |
| 2 | |
| 3 Contains interface (MultiDomainBasicAuth) and associated glue code for | |
| 4 providing credentials in the context of network requests. | |
| 5 """ | |
| 6 | |
| 7 # The following comment should be removed at some point in the future. | |
| 8 # mypy: disallow-untyped-defs=False | |
| 9 | |
| 10 import logging | |
| 11 | |
| 12 from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth | |
| 13 from pip._vendor.requests.utils import get_netrc_auth | |
| 14 from pip._vendor.six.moves.urllib import parse as urllib_parse | |
| 15 | |
| 16 from pip._internal.utils.misc import ( | |
| 17 ask, | |
| 18 ask_input, | |
| 19 ask_password, | |
| 20 remove_auth_from_url, | |
| 21 split_auth_netloc_from_url, | |
| 22 ) | |
| 23 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
| 24 | |
| 25 if MYPY_CHECK_RUNNING: | |
| 26 from optparse import Values | |
| 27 from typing import Dict, Optional, Tuple | |
| 28 | |
| 29 from pip._internal.vcs.versioncontrol import AuthInfo | |
| 30 | |
| 31 Credentials = Tuple[str, str, str] | |
| 32 | |
| 33 logger = logging.getLogger(__name__) | |
| 34 | |
| 35 try: | |
| 36 import keyring # noqa | |
| 37 except ImportError: | |
| 38 keyring = None | |
| 39 except Exception as exc: | |
| 40 logger.warning( | |
| 41 "Keyring is skipped due to an exception: %s", str(exc), | |
| 42 ) | |
| 43 keyring = None | |
| 44 | |
| 45 | |
| 46 def get_keyring_auth(url, username): | |
| 47 """Return the tuple auth for a given url from keyring.""" | |
| 48 if not url or not keyring: | |
| 49 return None | |
| 50 | |
| 51 try: | |
| 52 try: | |
| 53 get_credential = keyring.get_credential | |
| 54 except AttributeError: | |
| 55 pass | |
| 56 else: | |
| 57 logger.debug("Getting credentials from keyring for %s", url) | |
| 58 cred = get_credential(url, username) | |
| 59 if cred is not None: | |
| 60 return cred.username, cred.password | |
| 61 return None | |
| 62 | |
| 63 if username: | |
| 64 logger.debug("Getting password from keyring for %s", url) | |
| 65 password = keyring.get_password(url, username) | |
| 66 if password: | |
| 67 return username, password | |
| 68 | |
| 69 except Exception as exc: | |
| 70 logger.warning( | |
| 71 "Keyring is skipped due to an exception: %s", str(exc), | |
| 72 ) | |
| 73 | |
| 74 | |
| 75 class MultiDomainBasicAuth(AuthBase): | |
| 76 | |
| 77 def __init__(self, prompting=True, index_urls=None): | |
| 78 # type: (bool, Optional[Values]) -> None | |
| 79 self.prompting = prompting | |
| 80 self.index_urls = index_urls | |
| 81 self.passwords = {} # type: Dict[str, AuthInfo] | |
| 82 # When the user is prompted to enter credentials and keyring is | |
| 83 # available, we will offer to save them. If the user accepts, | |
| 84 # this value is set to the credentials they entered. After the | |
| 85 # request authenticates, the caller should call | |
| 86 # ``save_credentials`` to save these. | |
| 87 self._credentials_to_save = None # type: Optional[Credentials] | |
| 88 | |
| 89 def _get_index_url(self, url): | |
| 90 """Return the original index URL matching the requested URL. | |
| 91 | |
| 92 Cached or dynamically generated credentials may work against | |
| 93 the original index URL rather than just the netloc. | |
| 94 | |
| 95 The provided url should have had its username and password | |
| 96 removed already. If the original index url had credentials then | |
| 97 they will be included in the return value. | |
| 98 | |
| 99 Returns None if no matching index was found, or if --no-index | |
| 100 was specified by the user. | |
| 101 """ | |
| 102 if not url or not self.index_urls: | |
| 103 return None | |
| 104 | |
| 105 for u in self.index_urls: | |
| 106 prefix = remove_auth_from_url(u).rstrip("/") + "/" | |
| 107 if url.startswith(prefix): | |
| 108 return u | |
| 109 | |
| 110 def _get_new_credentials(self, original_url, allow_netrc=True, | |
| 111 allow_keyring=True): | |
| 112 """Find and return credentials for the specified URL.""" | |
| 113 # Split the credentials and netloc from the url. | |
| 114 url, netloc, url_user_password = split_auth_netloc_from_url( | |
| 115 original_url, | |
| 116 ) | |
| 117 | |
| 118 # Start with the credentials embedded in the url | |
| 119 username, password = url_user_password | |
| 120 if username is not None and password is not None: | |
| 121 logger.debug("Found credentials in url for %s", netloc) | |
| 122 return url_user_password | |
| 123 | |
| 124 # Find a matching index url for this request | |
| 125 index_url = self._get_index_url(url) | |
| 126 if index_url: | |
| 127 # Split the credentials from the url. | |
| 128 index_info = split_auth_netloc_from_url(index_url) | |
| 129 if index_info: | |
| 130 index_url, _, index_url_user_password = index_info | |
| 131 logger.debug("Found index url %s", index_url) | |
| 132 | |
| 133 # If an index URL was found, try its embedded credentials | |
| 134 if index_url and index_url_user_password[0] is not None: | |
| 135 username, password = index_url_user_password | |
| 136 if username is not None and password is not None: | |
| 137 logger.debug("Found credentials in index url for %s", netloc) | |
| 138 return index_url_user_password | |
| 139 | |
| 140 # Get creds from netrc if we still don't have them | |
| 141 if allow_netrc: | |
| 142 netrc_auth = get_netrc_auth(original_url) | |
| 143 if netrc_auth: | |
| 144 logger.debug("Found credentials in netrc for %s", netloc) | |
| 145 return netrc_auth | |
| 146 | |
| 147 # If we don't have a password and keyring is available, use it. | |
| 148 if allow_keyring: | |
| 149 # The index url is more specific than the netloc, so try it first | |
| 150 kr_auth = ( | |
| 151 get_keyring_auth(index_url, username) or | |
| 152 get_keyring_auth(netloc, username) | |
| 153 ) | |
| 154 if kr_auth: | |
| 155 logger.debug("Found credentials in keyring for %s", netloc) | |
| 156 return kr_auth | |
| 157 | |
| 158 return username, password | |
| 159 | |
| 160 def _get_url_and_credentials(self, original_url): | |
| 161 """Return the credentials to use for the provided URL. | |
| 162 | |
| 163 If allowed, netrc and keyring may be used to obtain the | |
| 164 correct credentials. | |
| 165 | |
| 166 Returns (url_without_credentials, username, password). Note | |
| 167 that even if the original URL contains credentials, this | |
| 168 function may return a different username and password. | |
| 169 """ | |
| 170 url, netloc, _ = split_auth_netloc_from_url(original_url) | |
| 171 | |
| 172 # Use any stored credentials that we have for this netloc | |
| 173 username, password = self.passwords.get(netloc, (None, None)) | |
| 174 | |
| 175 if username is None and password is None: | |
| 176 # No stored credentials. Acquire new credentials without prompting | |
| 177 # the user. (e.g. from netrc, keyring, or the URL itself) | |
| 178 username, password = self._get_new_credentials(original_url) | |
| 179 | |
| 180 if username is not None or password is not None: | |
| 181 # Convert the username and password if they're None, so that | |
| 182 # this netloc will show up as "cached" in the conditional above. | |
| 183 # Further, HTTPBasicAuth doesn't accept None, so it makes sense to | |
| 184 # cache the value that is going to be used. | |
| 185 username = username or "" | |
| 186 password = password or "" | |
| 187 | |
| 188 # Store any acquired credentials. | |
| 189 self.passwords[netloc] = (username, password) | |
| 190 | |
| 191 assert ( | |
| 192 # Credentials were found | |
| 193 (username is not None and password is not None) or | |
| 194 # Credentials were not found | |
| 195 (username is None and password is None) | |
| 196 ), "Could not load credentials from url: {}".format(original_url) | |
| 197 | |
| 198 return url, username, password | |
| 199 | |
| 200 def __call__(self, req): | |
| 201 # Get credentials for this request | |
| 202 url, username, password = self._get_url_and_credentials(req.url) | |
| 203 | |
| 204 # Set the url of the request to the url without any credentials | |
| 205 req.url = url | |
| 206 | |
| 207 if username is not None and password is not None: | |
| 208 # Send the basic auth with this request | |
| 209 req = HTTPBasicAuth(username, password)(req) | |
| 210 | |
| 211 # Attach a hook to handle 401 responses | |
| 212 req.register_hook("response", self.handle_401) | |
| 213 | |
| 214 return req | |
| 215 | |
| 216 # Factored out to allow for easy patching in tests | |
| 217 def _prompt_for_password(self, netloc): | |
| 218 username = ask_input("User for %s: " % netloc) | |
| 219 if not username: | |
| 220 return None, None | |
| 221 auth = get_keyring_auth(netloc, username) | |
| 222 if auth: | |
| 223 return auth[0], auth[1], False | |
| 224 password = ask_password("Password: ") | |
| 225 return username, password, True | |
| 226 | |
| 227 # Factored out to allow for easy patching in tests | |
| 228 def _should_save_password_to_keyring(self): | |
| 229 if not keyring: | |
| 230 return False | |
| 231 return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" | |
| 232 | |
| 233 def handle_401(self, resp, **kwargs): | |
| 234 # We only care about 401 responses, anything else we want to just | |
| 235 # pass through the actual response | |
| 236 if resp.status_code != 401: | |
| 237 return resp | |
| 238 | |
| 239 # We are not able to prompt the user so simply return the response | |
| 240 if not self.prompting: | |
| 241 return resp | |
| 242 | |
| 243 parsed = urllib_parse.urlparse(resp.url) | |
| 244 | |
| 245 # Prompt the user for a new username and password | |
| 246 username, password, save = self._prompt_for_password(parsed.netloc) | |
| 247 | |
| 248 # Store the new username and password to use for future requests | |
| 249 self._credentials_to_save = None | |
| 250 if username is not None and password is not None: | |
| 251 self.passwords[parsed.netloc] = (username, password) | |
| 252 | |
| 253 # Prompt to save the password to keyring | |
| 254 if save and self._should_save_password_to_keyring(): | |
| 255 self._credentials_to_save = (parsed.netloc, username, password) | |
| 256 | |
| 257 # Consume content and release the original connection to allow our new | |
| 258 # request to reuse the same one. | |
| 259 resp.content | |
| 260 resp.raw.release_conn() | |
| 261 | |
| 262 # Add our new username and password to the request | |
| 263 req = HTTPBasicAuth(username or "", password or "")(resp.request) | |
| 264 req.register_hook("response", self.warn_on_401) | |
| 265 | |
| 266 # On successful request, save the credentials that were used to | |
| 267 # keyring. (Note that if the user responded "no" above, this member | |
| 268 # is not set and nothing will be saved.) | |
| 269 if self._credentials_to_save: | |
| 270 req.register_hook("response", self.save_credentials) | |
| 271 | |
| 272 # Send our new request | |
| 273 new_resp = resp.connection.send(req, **kwargs) | |
| 274 new_resp.history.append(resp) | |
| 275 | |
| 276 return new_resp | |
| 277 | |
| 278 def warn_on_401(self, resp, **kwargs): | |
| 279 """Response callback to warn about incorrect credentials.""" | |
| 280 if resp.status_code == 401: | |
| 281 logger.warning( | |
| 282 '401 Error, Credentials not correct for %s', resp.request.url, | |
| 283 ) | |
| 284 | |
| 285 def save_credentials(self, resp, **kwargs): | |
| 286 """Response callback to save credentials on success.""" | |
| 287 assert keyring is not None, "should never reach here without keyring" | |
| 288 if not keyring: | |
| 289 return | |
| 290 | |
| 291 creds = self._credentials_to_save | |
| 292 self._credentials_to_save = None | |
| 293 if creds and resp.status_code < 400: | |
| 294 try: | |
| 295 logger.info('Saving credentials to keyring') | |
| 296 keyring.set_password(*creds) | |
| 297 except Exception: | |
| 298 logger.exception('Failed to save credentials') |
