Mercurial > repos > guerler > hhblits
diff lib/python3.8/site-packages/pip/_vendor/requests/auth.py @ 1:64071f2a4cf0 draft default tip
Deleted selected files
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:55:49 -0400 |
parents | 9e54283cc701 |
children |
line wrap: on
line diff
--- a/lib/python3.8/site-packages/pip/_vendor/requests/auth.py Mon Jul 27 03:47:31 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,305 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -requests.auth -~~~~~~~~~~~~~ - -This module contains the authentication handlers for Requests. -""" - -import os -import re -import time -import hashlib -import threading -import warnings - -from base64 import b64encode - -from .compat import urlparse, str, basestring -from .cookies import extract_cookies_to_jar -from ._internal_utils import to_native_string -from .utils import parse_dict_header - -CONTENT_TYPE_FORM_URLENCODED = 'application/x-www-form-urlencoded' -CONTENT_TYPE_MULTI_PART = 'multipart/form-data' - - -def _basic_auth_str(username, password): - """Returns a Basic Auth string.""" - - # "I want us to put a big-ol' comment on top of it that - # says that this behaviour is dumb but we need to preserve - # it because people are relying on it." - # - Lukasa - # - # These are here solely to maintain backwards compatibility - # for things like ints. This will be removed in 3.0.0. - if not isinstance(username, basestring): - warnings.warn( - "Non-string usernames will no longer be supported in Requests " - "3.0.0. Please convert the object you've passed in ({!r}) to " - "a string or bytes object in the near future to avoid " - "problems.".format(username), - category=DeprecationWarning, - ) - username = str(username) - - if not isinstance(password, basestring): - warnings.warn( - "Non-string passwords will no longer be supported in Requests " - "3.0.0. Please convert the object you've passed in ({!r}) to " - "a string or bytes object in the near future to avoid " - "problems.".format(password), - category=DeprecationWarning, - ) - password = str(password) - # -- End Removal -- - - if isinstance(username, str): - username = username.encode('latin1') - - if isinstance(password, str): - password = password.encode('latin1') - - authstr = 'Basic ' + to_native_string( - b64encode(b':'.join((username, password))).strip() - ) - - return authstr - - -class AuthBase(object): - """Base class that all auth implementations derive from""" - - def __call__(self, r): - raise NotImplementedError('Auth hooks must be callable.') - - -class HTTPBasicAuth(AuthBase): - """Attaches HTTP Basic Authentication to the given Request object.""" - - def __init__(self, username, password): - self.username = username - self.password = password - - def __eq__(self, other): - return all([ - self.username == getattr(other, 'username', None), - self.password == getattr(other, 'password', None) - ]) - - def __ne__(self, other): - return not self == other - - def __call__(self, r): - r.headers['Authorization'] = _basic_auth_str(self.username, self.password) - return r - - -class HTTPProxyAuth(HTTPBasicAuth): - """Attaches HTTP Proxy Authentication to a given Request object.""" - - def __call__(self, r): - r.headers['Proxy-Authorization'] = _basic_auth_str(self.username, self.password) - return r - - -class HTTPDigestAuth(AuthBase): - """Attaches HTTP Digest Authentication to the given Request object.""" - - def __init__(self, username, password): - self.username = username - self.password = password - # Keep state in per-thread local storage - self._thread_local = threading.local() - - def init_per_thread_state(self): - # Ensure state is initialized just once per-thread - if not hasattr(self._thread_local, 'init'): - self._thread_local.init = True - self._thread_local.last_nonce = '' - self._thread_local.nonce_count = 0 - self._thread_local.chal = {} - self._thread_local.pos = None - self._thread_local.num_401_calls = None - - def build_digest_header(self, method, url): - """ - :rtype: str - """ - - realm = self._thread_local.chal['realm'] - nonce = self._thread_local.chal['nonce'] - qop = self._thread_local.chal.get('qop') - algorithm = self._thread_local.chal.get('algorithm') - opaque = self._thread_local.chal.get('opaque') - hash_utf8 = None - - if algorithm is None: - _algorithm = 'MD5' - else: - _algorithm = algorithm.upper() - # lambdas assume digest modules are imported at the top level - if _algorithm == 'MD5' or _algorithm == 'MD5-SESS': - def md5_utf8(x): - if isinstance(x, str): - x = x.encode('utf-8') - return hashlib.md5(x).hexdigest() - hash_utf8 = md5_utf8 - elif _algorithm == 'SHA': - def sha_utf8(x): - if isinstance(x, str): - x = x.encode('utf-8') - return hashlib.sha1(x).hexdigest() - hash_utf8 = sha_utf8 - elif _algorithm == 'SHA-256': - def sha256_utf8(x): - if isinstance(x, str): - x = x.encode('utf-8') - return hashlib.sha256(x).hexdigest() - hash_utf8 = sha256_utf8 - elif _algorithm == 'SHA-512': - def sha512_utf8(x): - if isinstance(x, str): - x = x.encode('utf-8') - return hashlib.sha512(x).hexdigest() - hash_utf8 = sha512_utf8 - - KD = lambda s, d: hash_utf8("%s:%s" % (s, d)) - - if hash_utf8 is None: - return None - - # XXX not implemented yet - entdig = None - p_parsed = urlparse(url) - #: path is request-uri defined in RFC 2616 which should not be empty - path = p_parsed.path or "/" - if p_parsed.query: - path += '?' + p_parsed.query - - A1 = '%s:%s:%s' % (self.username, realm, self.password) - A2 = '%s:%s' % (method, path) - - HA1 = hash_utf8(A1) - HA2 = hash_utf8(A2) - - if nonce == self._thread_local.last_nonce: - self._thread_local.nonce_count += 1 - else: - self._thread_local.nonce_count = 1 - ncvalue = '%08x' % self._thread_local.nonce_count - s = str(self._thread_local.nonce_count).encode('utf-8') - s += nonce.encode('utf-8') - s += time.ctime().encode('utf-8') - s += os.urandom(8) - - cnonce = (hashlib.sha1(s).hexdigest()[:16]) - if _algorithm == 'MD5-SESS': - HA1 = hash_utf8('%s:%s:%s' % (HA1, nonce, cnonce)) - - if not qop: - respdig = KD(HA1, "%s:%s" % (nonce, HA2)) - elif qop == 'auth' or 'auth' in qop.split(','): - noncebit = "%s:%s:%s:%s:%s" % ( - nonce, ncvalue, cnonce, 'auth', HA2 - ) - respdig = KD(HA1, noncebit) - else: - # XXX handle auth-int. - return None - - self._thread_local.last_nonce = nonce - - # XXX should the partial digests be encoded too? - base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \ - 'response="%s"' % (self.username, realm, nonce, path, respdig) - if opaque: - base += ', opaque="%s"' % opaque - if algorithm: - base += ', algorithm="%s"' % algorithm - if entdig: - base += ', digest="%s"' % entdig - if qop: - base += ', qop="auth", nc=%s, cnonce="%s"' % (ncvalue, cnonce) - - return 'Digest %s' % (base) - - def handle_redirect(self, r, **kwargs): - """Reset num_401_calls counter on redirects.""" - if r.is_redirect: - self._thread_local.num_401_calls = 1 - - def handle_401(self, r, **kwargs): - """ - Takes the given response and tries digest-auth, if needed. - - :rtype: requests.Response - """ - - # If response is not 4xx, do not auth - # See https://github.com/requests/requests/issues/3772 - if not 400 <= r.status_code < 500: - self._thread_local.num_401_calls = 1 - return r - - if self._thread_local.pos is not None: - # Rewind the file position indicator of the body to where - # it was to resend the request. - r.request.body.seek(self._thread_local.pos) - s_auth = r.headers.get('www-authenticate', '') - - if 'digest' in s_auth.lower() and self._thread_local.num_401_calls < 2: - - self._thread_local.num_401_calls += 1 - pat = re.compile(r'digest ', flags=re.IGNORECASE) - self._thread_local.chal = parse_dict_header(pat.sub('', s_auth, count=1)) - - # Consume content and release the original connection - # to allow our new request to reuse the same one. - r.content - r.close() - prep = r.request.copy() - extract_cookies_to_jar(prep._cookies, r.request, r.raw) - prep.prepare_cookies(prep._cookies) - - prep.headers['Authorization'] = self.build_digest_header( - prep.method, prep.url) - _r = r.connection.send(prep, **kwargs) - _r.history.append(r) - _r.request = prep - - return _r - - self._thread_local.num_401_calls = 1 - return r - - def __call__(self, r): - # Initialize per-thread state, if needed - self.init_per_thread_state() - # If we have a saved nonce, skip the 401 - if self._thread_local.last_nonce: - r.headers['Authorization'] = self.build_digest_header(r.method, r.url) - try: - self._thread_local.pos = r.body.tell() - except AttributeError: - # In the case of HTTPDigestAuth being reused and the body of - # the previous request was a file-like object, pos has the - # file position of the previous body. Ensure it's set to - # None. - self._thread_local.pos = None - r.register_hook('response', self.handle_401) - r.register_hook('response', self.handle_redirect) - self._thread_local.num_401_calls = 1 - - return r - - def __eq__(self, other): - return all([ - self.username == getattr(other, 'username', None), - self.password == getattr(other, 'password', None) - ]) - - def __ne__(self, other): - return not self == other