comparison env/lib/python3.7/site-packages/requests_toolbelt/auth/handler.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 # -*- coding: utf-8 -*-
2 """
3
4 requests_toolbelt.auth.handler
5 ==============================
6
7 This holds all of the implementation details of the Authentication Handler.
8
9 """
10
11 from requests.auth import AuthBase, HTTPBasicAuth
12 from requests.compat import urlparse, urlunparse
13
14
15 class AuthHandler(AuthBase):
16
17 """
18
19 The ``AuthHandler`` object takes a dictionary of domains paired with
20 authentication strategies and will use this to determine which credentials
21 to use when making a request. For example, you could do the following:
22
23 .. code-block:: python
24
25 from requests import HTTPDigestAuth
26 from requests_toolbelt.auth.handler import AuthHandler
27
28 import requests
29
30 auth = AuthHandler({
31 'https://api.github.com': ('sigmavirus24', 'fakepassword'),
32 'https://example.com': HTTPDigestAuth('username', 'password')
33 })
34
35 r = requests.get('https://api.github.com/user', auth=auth)
36 # => <Response [200]>
37 r = requests.get('https://example.com/some/path', auth=auth)
38 # => <Response [200]>
39
40 s = requests.Session()
41 s.auth = auth
42 r = s.get('https://api.github.com/user')
43 # => <Response [200]>
44
45 .. warning::
46
47 :class:`requests.auth.HTTPDigestAuth` is not yet thread-safe. If you
48 use :class:`AuthHandler` across multiple threads you should
49 instantiate a new AuthHandler for each thread with a new
50 HTTPDigestAuth instance for each thread.
51
52 """
53
54 def __init__(self, strategies):
55 self.strategies = dict(strategies)
56 self._make_uniform()
57
58 def __call__(self, request):
59 auth = self.get_strategy_for(request.url)
60 return auth(request)
61
62 def __repr__(self):
63 return '<AuthHandler({0!r})>'.format(self.strategies)
64
65 def _make_uniform(self):
66 existing_strategies = list(self.strategies.items())
67 self.strategies = {}
68
69 for (k, v) in existing_strategies:
70 self.add_strategy(k, v)
71
72 @staticmethod
73 def _key_from_url(url):
74 parsed = urlparse(url)
75 return urlunparse((parsed.scheme.lower(),
76 parsed.netloc.lower(),
77 '', '', '', ''))
78
79 def add_strategy(self, domain, strategy):
80 """Add a new domain and authentication strategy.
81
82 :param str domain: The domain you wish to match against. For example:
83 ``'https://api.github.com'``
84 :param str strategy: The authentication strategy you wish to use for
85 that domain. For example: ``('username', 'password')`` or
86 ``requests.HTTPDigestAuth('username', 'password')``
87
88 .. code-block:: python
89
90 a = AuthHandler({})
91 a.add_strategy('https://api.github.com', ('username', 'password'))
92
93 """
94 # Turn tuples into Basic Authentication objects
95 if isinstance(strategy, tuple):
96 strategy = HTTPBasicAuth(*strategy)
97
98 key = self._key_from_url(domain)
99 self.strategies[key] = strategy
100
101 def get_strategy_for(self, url):
102 """Retrieve the authentication strategy for a specified URL.
103
104 :param str url: The full URL you will be making a request against. For
105 example, ``'https://api.github.com/user'``
106 :returns: Callable that adds authentication to a request.
107
108 .. code-block:: python
109
110 import requests
111 a = AuthHandler({'example.com', ('foo', 'bar')})
112 strategy = a.get_strategy_for('http://example.com/example')
113 assert isinstance(strategy, requests.auth.HTTPBasicAuth)
114
115 """
116 key = self._key_from_url(url)
117 return self.strategies.get(key, NullAuthStrategy())
118
119 def remove_strategy(self, domain):
120 """Remove the domain and strategy from the collection of strategies.
121
122 :param str domain: The domain you wish remove. For example,
123 ``'https://api.github.com'``.
124
125 .. code-block:: python
126
127 a = AuthHandler({'example.com', ('foo', 'bar')})
128 a.remove_strategy('example.com')
129 assert a.strategies == {}
130
131 """
132 key = self._key_from_url(domain)
133 if key in self.strategies:
134 del self.strategies[key]
135
136
137 class NullAuthStrategy(AuthBase):
138 def __repr__(self):
139 return '<NullAuthStrategy>'
140
141 def __call__(self, r):
142 return r