Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/requests_toolbelt/threaded/pool.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/requests_toolbelt/threaded/pool.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,211 @@ +"""Module implementing the Pool for :mod:``requests_toolbelt.threaded``.""" +import multiprocessing +import requests + +from . import thread +from .._compat import queue + + +class Pool(object): + """Pool that manages the threads containing sessions. + + :param queue: + The queue you're expected to use to which you should add items. + :type queue: queue.Queue + :param initializer: + Function used to initialize an instance of ``session``. + :type initializer: collections.Callable + :param auth_generator: + Function used to generate new auth credentials for the session. + :type auth_generator: collections.Callable + :param int num_process: + Number of threads to create. + :param session: + :type session: requests.Session + """ + + def __init__(self, job_queue, initializer=None, auth_generator=None, + num_processes=None, session=requests.Session): + if num_processes is None: + num_processes = multiprocessing.cpu_count() or 1 + + if num_processes < 1: + raise ValueError("Number of processes should at least be 1.") + + self._job_queue = job_queue + self._response_queue = queue.Queue() + self._exc_queue = queue.Queue() + self._processes = num_processes + self._initializer = initializer or _identity + self._auth = auth_generator or _identity + self._session = session + self._pool = [ + thread.SessionThread(self._new_session(), self._job_queue, + self._response_queue, self._exc_queue) + for _ in range(self._processes) + ] + + def _new_session(self): + return self._auth(self._initializer(self._session())) + + @classmethod + def from_exceptions(cls, exceptions, **kwargs): + r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s. + + Provided an iterable that provides :class:`~ThreadException` objects, + this classmethod will generate a new pool to retry the requests that + caused the exceptions. + + :param exceptions: + Iterable that returns :class:`~ThreadException` + :type exceptions: iterable + :param kwargs: + Keyword arguments passed to the :class:`~Pool` initializer. + :returns: An initialized :class:`~Pool` object. + :rtype: :class:`~Pool` + """ + job_queue = queue.Queue() + for exc in exceptions: + job_queue.put(exc.request_kwargs) + + return cls(job_queue=job_queue, **kwargs) + + @classmethod + def from_urls(cls, urls, request_kwargs=None, **kwargs): + """Create a :class:`~Pool` from an iterable of URLs. + + :param urls: + Iterable that returns URLs with which we create a pool. + :type urls: iterable + :param dict request_kwargs: + Dictionary of other keyword arguments to provide to the request + method. + :param kwargs: + Keyword arguments passed to the :class:`~Pool` initializer. + :returns: An initialized :class:`~Pool` object. + :rtype: :class:`~Pool` + """ + request_dict = {'method': 'GET'} + request_dict.update(request_kwargs or {}) + job_queue = queue.Queue() + for url in urls: + job = request_dict.copy() + job.update({'url': url}) + job_queue.put(job) + + return cls(job_queue=job_queue, **kwargs) + + def exceptions(self): + """Iterate over all the exceptions in the pool. + + :returns: Generator of :class:`~ThreadException` + """ + while True: + exc = self.get_exception() + if exc is None: + break + yield exc + + def get_exception(self): + """Get an exception from the pool. + + :rtype: :class:`~ThreadException` + """ + try: + (request, exc) = self._exc_queue.get_nowait() + except queue.Empty: + return None + else: + return ThreadException(request, exc) + + def get_response(self): + """Get a response from the pool. + + :rtype: :class:`~ThreadResponse` + """ + try: + (request, response) = self._response_queue.get_nowait() + except queue.Empty: + return None + else: + return ThreadResponse(request, response) + + def responses(self): + """Iterate over all the responses in the pool. + + :returns: Generator of :class:`~ThreadResponse` + """ + while True: + resp = self.get_response() + if resp is None: + break + yield resp + + def join_all(self): + """Join all the threads to the master thread.""" + for session_thread in self._pool: + session_thread.join() + + +class ThreadProxy(object): + proxied_attr = None + + def __getattr__(self, attr): + """Proxy attribute accesses to the proxied object.""" + get = object.__getattribute__ + if attr not in self.attrs: + response = get(self, self.proxied_attr) + return getattr(response, attr) + else: + return get(self, attr) + + +class ThreadResponse(ThreadProxy): + """A wrapper around a requests Response object. + + This will proxy most attribute access actions to the Response object. For + example, if you wanted the parsed JSON from the response, you might do: + + .. code-block:: python + + thread_response = pool.get_response() + json = thread_response.json() + + """ + proxied_attr = 'response' + attrs = frozenset(['request_kwargs', 'response']) + + def __init__(self, request_kwargs, response): + #: The original keyword arguments provided to the queue + self.request_kwargs = request_kwargs + #: The wrapped response + self.response = response + + +class ThreadException(ThreadProxy): + """A wrapper around an exception raised during a request. + + This will proxy most attribute access actions to the exception object. For + example, if you wanted the message from the exception, you might do: + + .. code-block:: python + + thread_exc = pool.get_exception() + msg = thread_exc.message + + """ + proxied_attr = 'exception' + attrs = frozenset(['request_kwargs', 'exception']) + + def __init__(self, request_kwargs, exception): + #: The original keyword arguments provided to the queue + self.request_kwargs = request_kwargs + #: The captured and wrapped exception + self.exception = exception + + +def _identity(session_obj): + return session_obj + + +__all__ = ['ThreadException', 'ThreadResponse', 'Pool']