Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/boto/dynamodb2/results.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/boto/dynamodb2/results.py Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,204 @@ +class ResultSet(object): + """ + A class used to lazily handle page-to-page navigation through a set of + results. + + It presents a transparent iterator interface, so that all the user has + to do is use it in a typical ``for`` loop (or list comprehension, etc.) + to fetch results, even if they weren't present in the current page of + results. + + This is used by the ``Table.query`` & ``Table.scan`` methods. + + Example:: + + >>> users = Table('users') + >>> results = ResultSet() + >>> results.to_call(users.query, username__gte='johndoe') + # Now iterate. When it runs out of results, it'll fetch the next page. + >>> for res in results: + ... print res['username'] + + """ + def __init__(self, max_page_size=None): + super(ResultSet, self).__init__() + self.the_callable = None + self.call_args = [] + self.call_kwargs = {} + self._results = [] + self._offset = -1 + self._results_left = True + self._last_key_seen = None + self._fetches = 0 + self._max_page_size = max_page_size + self._limit = None + + @property + def first_key(self): + return 'exclusive_start_key' + + def _reset(self): + """ + Resets the internal state of the ``ResultSet``. + + This prevents results from being cached long-term & consuming + excess memory. + + Largely internal. + """ + self._results = [] + self._offset = 0 + + def __iter__(self): + return self + + def __next__(self): + self._offset += 1 + + if self._offset >= len(self._results): + if self._results_left is False: + raise StopIteration() + + self.fetch_more() + + # It's possible that previous call to ``fetch_more`` may not return + # anything useful but there may be more results. Loop until we get + # something back, making sure we guard for no results left. + while not len(self._results) and self._results_left: + self.fetch_more() + + if self._offset < len(self._results): + if self._limit is not None: + self._limit -= 1 + + if self._limit < 0: + raise StopIteration() + + return self._results[self._offset] + else: + raise StopIteration() + + next = __next__ + + def to_call(self, the_callable, *args, **kwargs): + """ + Sets up the callable & any arguments to run it with. + + This is stored for subsequent calls so that those queries can be + run without requiring user intervention. + + Example:: + + # Just an example callable. + >>> def squares_to(y): + ... for x in range(1, y): + ... yield x**2 + >>> rs = ResultSet() + # Set up what to call & arguments. + >>> rs.to_call(squares_to, y=3) + + """ + if not callable(the_callable): + raise ValueError( + 'You must supply an object or function to be called.' + ) + + # We pop the ``limit``, if present, to track how many we should return + # to the user. This isn't the same as the ``limit`` that the low-level + # DDB api calls use (which limit page size, not the overall result set). + self._limit = kwargs.pop('limit', None) + + if self._limit is not None and self._limit < 0: + self._limit = None + + self.the_callable = the_callable + self.call_args = args + self.call_kwargs = kwargs + + def fetch_more(self): + """ + When the iterator runs out of results, this method is run to re-execute + the callable (& arguments) to fetch the next page. + + Largely internal. + """ + self._reset() + + args = self.call_args[:] + kwargs = self.call_kwargs.copy() + + if self._last_key_seen is not None: + kwargs[self.first_key] = self._last_key_seen + + # If the page size is greater than limit set them + # to the same value + if self._limit and self._max_page_size and self._max_page_size > self._limit: + self._max_page_size = self._limit + + # Put in the max page size. + if self._max_page_size is not None: + kwargs['limit'] = self._max_page_size + elif self._limit is not None: + # If max_page_size is not set and limit is available + # use it as the page size + kwargs['limit'] = self._limit + + results = self.the_callable(*args, **kwargs) + self._fetches += 1 + new_results = results.get('results', []) + self._last_key_seen = results.get('last_key', None) + + if len(new_results): + self._results.extend(results['results']) + + # Check the limit, if it's present. + if self._limit is not None and self._limit >= 0: + limit = self._limit + limit -= len(results['results']) + # If we've exceeded the limit, we don't have any more + # results to look for. + if limit <= 0: + self._results_left = False + + if self._last_key_seen is None: + self._results_left = False + + +class BatchGetResultSet(ResultSet): + def __init__(self, *args, **kwargs): + self._keys_left = kwargs.pop('keys', []) + self._max_batch_get = kwargs.pop('max_batch_get', 100) + super(BatchGetResultSet, self).__init__(*args, **kwargs) + + def fetch_more(self): + self._reset() + + args = self.call_args[:] + kwargs = self.call_kwargs.copy() + + # Slice off the max we can fetch. + kwargs['keys'] = self._keys_left[:self._max_batch_get] + self._keys_left = self._keys_left[self._max_batch_get:] + + if len(self._keys_left) <= 0: + self._results_left = False + + results = self.the_callable(*args, **kwargs) + + if not len(results.get('results', [])): + return + + self._results.extend(results['results']) + + for offset, key_data in enumerate(results.get('unprocessed_keys', [])): + # We've got an unprocessed key. Reinsert it into the list. + # DynamoDB only returns valid keys, so there should be no risk of + # missing keys ever making it here. + self._keys_left.insert(offset, key_data) + + if len(self._keys_left) > 0: + self._results_left = True + + # Decrease the limit, if it's present. + if self.call_kwargs.get('limit'): + self.call_kwargs['limit'] -= len(results['results'])