Mercurial > repos > guerler > hhblits
diff lib/python3.8/site-packages/pip/_internal/network/auth.py @ 1:64071f2a4cf0 draft default tip
Deleted selected files
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:55:49 -0400 |
parents | 9e54283cc701 |
children |
line wrap: on
line diff
--- a/lib/python3.8/site-packages/pip/_internal/network/auth.py Mon Jul 27 03:47:31 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,298 +0,0 @@ -"""Network Authentication Helpers - -Contains interface (MultiDomainBasicAuth) and associated glue code for -providing credentials in the context of network requests. -""" - -# The following comment should be removed at some point in the future. -# mypy: disallow-untyped-defs=False - -import logging - -from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth -from pip._vendor.requests.utils import get_netrc_auth -from pip._vendor.six.moves.urllib import parse as urllib_parse - -from pip._internal.utils.misc import ( - ask, - ask_input, - ask_password, - remove_auth_from_url, - split_auth_netloc_from_url, -) -from pip._internal.utils.typing import MYPY_CHECK_RUNNING - -if MYPY_CHECK_RUNNING: - from optparse import Values - from typing import Dict, Optional, Tuple - - from pip._internal.vcs.versioncontrol import AuthInfo - - Credentials = Tuple[str, str, str] - -logger = logging.getLogger(__name__) - -try: - import keyring # noqa -except ImportError: - keyring = None -except Exception as exc: - logger.warning( - "Keyring is skipped due to an exception: %s", str(exc), - ) - keyring = None - - -def get_keyring_auth(url, username): - """Return the tuple auth for a given url from keyring.""" - if not url or not keyring: - return None - - try: - try: - get_credential = keyring.get_credential - except AttributeError: - pass - else: - logger.debug("Getting credentials from keyring for %s", url) - cred = get_credential(url, username) - if cred is not None: - return cred.username, cred.password - return None - - if username: - logger.debug("Getting password from keyring for %s", url) - password = keyring.get_password(url, username) - if password: - return username, password - - except Exception as exc: - logger.warning( - "Keyring is skipped due to an exception: %s", str(exc), - ) - - -class MultiDomainBasicAuth(AuthBase): - - def __init__(self, prompting=True, index_urls=None): - # type: (bool, Optional[Values]) -> None - self.prompting = prompting - self.index_urls = index_urls - self.passwords = {} # type: Dict[str, AuthInfo] - # When the user is prompted to enter credentials and keyring is - # available, we will offer to save them. If the user accepts, - # this value is set to the credentials they entered. After the - # request authenticates, the caller should call - # ``save_credentials`` to save these. - self._credentials_to_save = None # type: Optional[Credentials] - - def _get_index_url(self, url): - """Return the original index URL matching the requested URL. - - Cached or dynamically generated credentials may work against - the original index URL rather than just the netloc. - - The provided url should have had its username and password - removed already. If the original index url had credentials then - they will be included in the return value. - - Returns None if no matching index was found, or if --no-index - was specified by the user. - """ - if not url or not self.index_urls: - return None - - for u in self.index_urls: - prefix = remove_auth_from_url(u).rstrip("/") + "/" - if url.startswith(prefix): - return u - - def _get_new_credentials(self, original_url, allow_netrc=True, - allow_keyring=True): - """Find and return credentials for the specified URL.""" - # Split the credentials and netloc from the url. - url, netloc, url_user_password = split_auth_netloc_from_url( - original_url, - ) - - # Start with the credentials embedded in the url - username, password = url_user_password - if username is not None and password is not None: - logger.debug("Found credentials in url for %s", netloc) - return url_user_password - - # Find a matching index url for this request - index_url = self._get_index_url(url) - if index_url: - # Split the credentials from the url. - index_info = split_auth_netloc_from_url(index_url) - if index_info: - index_url, _, index_url_user_password = index_info - logger.debug("Found index url %s", index_url) - - # If an index URL was found, try its embedded credentials - if index_url and index_url_user_password[0] is not None: - username, password = index_url_user_password - if username is not None and password is not None: - logger.debug("Found credentials in index url for %s", netloc) - return index_url_user_password - - # Get creds from netrc if we still don't have them - if allow_netrc: - netrc_auth = get_netrc_auth(original_url) - if netrc_auth: - logger.debug("Found credentials in netrc for %s", netloc) - return netrc_auth - - # If we don't have a password and keyring is available, use it. - if allow_keyring: - # The index url is more specific than the netloc, so try it first - kr_auth = ( - get_keyring_auth(index_url, username) or - get_keyring_auth(netloc, username) - ) - if kr_auth: - logger.debug("Found credentials in keyring for %s", netloc) - return kr_auth - - return username, password - - def _get_url_and_credentials(self, original_url): - """Return the credentials to use for the provided URL. - - If allowed, netrc and keyring may be used to obtain the - correct credentials. - - Returns (url_without_credentials, username, password). Note - that even if the original URL contains credentials, this - function may return a different username and password. - """ - url, netloc, _ = split_auth_netloc_from_url(original_url) - - # Use any stored credentials that we have for this netloc - username, password = self.passwords.get(netloc, (None, None)) - - if username is None and password is None: - # No stored credentials. Acquire new credentials without prompting - # the user. (e.g. from netrc, keyring, or the URL itself) - username, password = self._get_new_credentials(original_url) - - if username is not None or password is not None: - # Convert the username and password if they're None, so that - # this netloc will show up as "cached" in the conditional above. - # Further, HTTPBasicAuth doesn't accept None, so it makes sense to - # cache the value that is going to be used. - username = username or "" - password = password or "" - - # Store any acquired credentials. - self.passwords[netloc] = (username, password) - - assert ( - # Credentials were found - (username is not None and password is not None) or - # Credentials were not found - (username is None and password is None) - ), "Could not load credentials from url: {}".format(original_url) - - return url, username, password - - def __call__(self, req): - # Get credentials for this request - url, username, password = self._get_url_and_credentials(req.url) - - # Set the url of the request to the url without any credentials - req.url = url - - if username is not None and password is not None: - # Send the basic auth with this request - req = HTTPBasicAuth(username, password)(req) - - # Attach a hook to handle 401 responses - req.register_hook("response", self.handle_401) - - return req - - # Factored out to allow for easy patching in tests - def _prompt_for_password(self, netloc): - username = ask_input("User for %s: " % netloc) - if not username: - return None, None - auth = get_keyring_auth(netloc, username) - if auth: - return auth[0], auth[1], False - password = ask_password("Password: ") - return username, password, True - - # Factored out to allow for easy patching in tests - def _should_save_password_to_keyring(self): - if not keyring: - return False - return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" - - def handle_401(self, resp, **kwargs): - # We only care about 401 responses, anything else we want to just - # pass through the actual response - if resp.status_code != 401: - return resp - - # We are not able to prompt the user so simply return the response - if not self.prompting: - return resp - - parsed = urllib_parse.urlparse(resp.url) - - # Prompt the user for a new username and password - username, password, save = self._prompt_for_password(parsed.netloc) - - # Store the new username and password to use for future requests - self._credentials_to_save = None - if username is not None and password is not None: - self.passwords[parsed.netloc] = (username, password) - - # Prompt to save the password to keyring - if save and self._should_save_password_to_keyring(): - self._credentials_to_save = (parsed.netloc, username, password) - - # Consume content and release the original connection to allow our new - # request to reuse the same one. - resp.content - resp.raw.release_conn() - - # Add our new username and password to the request - req = HTTPBasicAuth(username or "", password or "")(resp.request) - req.register_hook("response", self.warn_on_401) - - # On successful request, save the credentials that were used to - # keyring. (Note that if the user responded "no" above, this member - # is not set and nothing will be saved.) - if self._credentials_to_save: - req.register_hook("response", self.save_credentials) - - # Send our new request - new_resp = resp.connection.send(req, **kwargs) - new_resp.history.append(resp) - - return new_resp - - def warn_on_401(self, resp, **kwargs): - """Response callback to warn about incorrect credentials.""" - if resp.status_code == 401: - logger.warning( - '401 Error, Credentials not correct for %s', resp.request.url, - ) - - def save_credentials(self, resp, **kwargs): - """Response callback to save credentials on success.""" - assert keyring is not None, "should never reach here without keyring" - if not keyring: - return - - creds = self._credentials_to_save - self._credentials_to_save = None - if creds and resp.status_code < 400: - try: - logger.info('Saving credentials to keyring') - keyring.set_password(*creds) - except Exception: - logger.exception('Failed to save credentials')