Mercurial > repos > guerler > hhblits
diff lib/python3.8/site-packages/pip/_internal/network/auth.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:47:31 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/python3.8/site-packages/pip/_internal/network/auth.py Mon Jul 27 03:47:31 2020 -0400 @@ -0,0 +1,298 @@ +"""Network Authentication Helpers + +Contains interface (MultiDomainBasicAuth) and associated glue code for +providing credentials in the context of network requests. +""" + +# The following comment should be removed at some point in the future. +# mypy: disallow-untyped-defs=False + +import logging + +from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth +from pip._vendor.requests.utils import get_netrc_auth +from pip._vendor.six.moves.urllib import parse as urllib_parse + +from pip._internal.utils.misc import ( + ask, + ask_input, + ask_password, + remove_auth_from_url, + split_auth_netloc_from_url, +) +from pip._internal.utils.typing import MYPY_CHECK_RUNNING + +if MYPY_CHECK_RUNNING: + from optparse import Values + from typing import Dict, Optional, Tuple + + from pip._internal.vcs.versioncontrol import AuthInfo + + Credentials = Tuple[str, str, str] + +logger = logging.getLogger(__name__) + +try: + import keyring # noqa +except ImportError: + keyring = None +except Exception as exc: + logger.warning( + "Keyring is skipped due to an exception: %s", str(exc), + ) + keyring = None + + +def get_keyring_auth(url, username): + """Return the tuple auth for a given url from keyring.""" + if not url or not keyring: + return None + + try: + try: + get_credential = keyring.get_credential + except AttributeError: + pass + else: + logger.debug("Getting credentials from keyring for %s", url) + cred = get_credential(url, username) + if cred is not None: + return cred.username, cred.password + return None + + if username: + logger.debug("Getting password from keyring for %s", url) + password = keyring.get_password(url, username) + if password: + return username, password + + except Exception as exc: + logger.warning( + "Keyring is skipped due to an exception: %s", str(exc), + ) + + +class MultiDomainBasicAuth(AuthBase): + + def __init__(self, prompting=True, index_urls=None): + # type: (bool, Optional[Values]) -> None + self.prompting = prompting + self.index_urls = index_urls + self.passwords = {} # type: Dict[str, AuthInfo] + # When the user is prompted to enter credentials and keyring is + # available, we will offer to save them. If the user accepts, + # this value is set to the credentials they entered. After the + # request authenticates, the caller should call + # ``save_credentials`` to save these. + self._credentials_to_save = None # type: Optional[Credentials] + + def _get_index_url(self, url): + """Return the original index URL matching the requested URL. + + Cached or dynamically generated credentials may work against + the original index URL rather than just the netloc. + + The provided url should have had its username and password + removed already. If the original index url had credentials then + they will be included in the return value. + + Returns None if no matching index was found, or if --no-index + was specified by the user. + """ + if not url or not self.index_urls: + return None + + for u in self.index_urls: + prefix = remove_auth_from_url(u).rstrip("/") + "/" + if url.startswith(prefix): + return u + + def _get_new_credentials(self, original_url, allow_netrc=True, + allow_keyring=True): + """Find and return credentials for the specified URL.""" + # Split the credentials and netloc from the url. + url, netloc, url_user_password = split_auth_netloc_from_url( + original_url, + ) + + # Start with the credentials embedded in the url + username, password = url_user_password + if username is not None and password is not None: + logger.debug("Found credentials in url for %s", netloc) + return url_user_password + + # Find a matching index url for this request + index_url = self._get_index_url(url) + if index_url: + # Split the credentials from the url. + index_info = split_auth_netloc_from_url(index_url) + if index_info: + index_url, _, index_url_user_password = index_info + logger.debug("Found index url %s", index_url) + + # If an index URL was found, try its embedded credentials + if index_url and index_url_user_password[0] is not None: + username, password = index_url_user_password + if username is not None and password is not None: + logger.debug("Found credentials in index url for %s", netloc) + return index_url_user_password + + # Get creds from netrc if we still don't have them + if allow_netrc: + netrc_auth = get_netrc_auth(original_url) + if netrc_auth: + logger.debug("Found credentials in netrc for %s", netloc) + return netrc_auth + + # If we don't have a password and keyring is available, use it. + if allow_keyring: + # The index url is more specific than the netloc, so try it first + kr_auth = ( + get_keyring_auth(index_url, username) or + get_keyring_auth(netloc, username) + ) + if kr_auth: + logger.debug("Found credentials in keyring for %s", netloc) + return kr_auth + + return username, password + + def _get_url_and_credentials(self, original_url): + """Return the credentials to use for the provided URL. + + If allowed, netrc and keyring may be used to obtain the + correct credentials. + + Returns (url_without_credentials, username, password). Note + that even if the original URL contains credentials, this + function may return a different username and password. + """ + url, netloc, _ = split_auth_netloc_from_url(original_url) + + # Use any stored credentials that we have for this netloc + username, password = self.passwords.get(netloc, (None, None)) + + if username is None and password is None: + # No stored credentials. Acquire new credentials without prompting + # the user. (e.g. from netrc, keyring, or the URL itself) + username, password = self._get_new_credentials(original_url) + + if username is not None or password is not None: + # Convert the username and password if they're None, so that + # this netloc will show up as "cached" in the conditional above. + # Further, HTTPBasicAuth doesn't accept None, so it makes sense to + # cache the value that is going to be used. + username = username or "" + password = password or "" + + # Store any acquired credentials. + self.passwords[netloc] = (username, password) + + assert ( + # Credentials were found + (username is not None and password is not None) or + # Credentials were not found + (username is None and password is None) + ), "Could not load credentials from url: {}".format(original_url) + + return url, username, password + + def __call__(self, req): + # Get credentials for this request + url, username, password = self._get_url_and_credentials(req.url) + + # Set the url of the request to the url without any credentials + req.url = url + + if username is not None and password is not None: + # Send the basic auth with this request + req = HTTPBasicAuth(username, password)(req) + + # Attach a hook to handle 401 responses + req.register_hook("response", self.handle_401) + + return req + + # Factored out to allow for easy patching in tests + def _prompt_for_password(self, netloc): + username = ask_input("User for %s: " % netloc) + if not username: + return None, None + auth = get_keyring_auth(netloc, username) + if auth: + return auth[0], auth[1], False + password = ask_password("Password: ") + return username, password, True + + # Factored out to allow for easy patching in tests + def _should_save_password_to_keyring(self): + if not keyring: + return False + return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" + + def handle_401(self, resp, **kwargs): + # We only care about 401 responses, anything else we want to just + # pass through the actual response + if resp.status_code != 401: + return resp + + # We are not able to prompt the user so simply return the response + if not self.prompting: + return resp + + parsed = urllib_parse.urlparse(resp.url) + + # Prompt the user for a new username and password + username, password, save = self._prompt_for_password(parsed.netloc) + + # Store the new username and password to use for future requests + self._credentials_to_save = None + if username is not None and password is not None: + self.passwords[parsed.netloc] = (username, password) + + # Prompt to save the password to keyring + if save and self._should_save_password_to_keyring(): + self._credentials_to_save = (parsed.netloc, username, password) + + # Consume content and release the original connection to allow our new + # request to reuse the same one. + resp.content + resp.raw.release_conn() + + # Add our new username and password to the request + req = HTTPBasicAuth(username or "", password or "")(resp.request) + req.register_hook("response", self.warn_on_401) + + # On successful request, save the credentials that were used to + # keyring. (Note that if the user responded "no" above, this member + # is not set and nothing will be saved.) + if self._credentials_to_save: + req.register_hook("response", self.save_credentials) + + # Send our new request + new_resp = resp.connection.send(req, **kwargs) + new_resp.history.append(resp) + + return new_resp + + def warn_on_401(self, resp, **kwargs): + """Response callback to warn about incorrect credentials.""" + if resp.status_code == 401: + logger.warning( + '401 Error, Credentials not correct for %s', resp.request.url, + ) + + def save_credentials(self, resp, **kwargs): + """Response callback to save credentials on success.""" + assert keyring is not None, "should never reach here without keyring" + if not keyring: + return + + creds = self._credentials_to_save + self._credentials_to_save = None + if creds and resp.status_code < 400: + try: + logger.info('Saving credentials to keyring') + keyring.set_password(*creds) + except Exception: + logger.exception('Failed to save credentials')