Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/boto/dynamodb2/table.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
comparison
equal
deleted
inserted
replaced
1:75ca89e9b81c | 2:6af9afd405e9 |
---|---|
1 import boto | |
2 from boto.dynamodb2 import exceptions | |
3 from boto.dynamodb2.fields import (HashKey, RangeKey, | |
4 AllIndex, KeysOnlyIndex, IncludeIndex, | |
5 GlobalAllIndex, GlobalKeysOnlyIndex, | |
6 GlobalIncludeIndex) | |
7 from boto.dynamodb2.items import Item | |
8 from boto.dynamodb2.layer1 import DynamoDBConnection | |
9 from boto.dynamodb2.results import ResultSet, BatchGetResultSet | |
10 from boto.dynamodb2.types import (NonBooleanDynamizer, Dynamizer, FILTER_OPERATORS, | |
11 QUERY_OPERATORS, STRING) | |
12 from boto.exception import JSONResponseError | |
13 | |
14 | |
15 class Table(object): | |
16 """ | |
17 Interacts & models the behavior of a DynamoDB table. | |
18 | |
19 The ``Table`` object represents a set (or rough categorization) of | |
20 records within DynamoDB. The important part is that all records within the | |
21 table, while largely-schema-free, share the same schema & are essentially | |
22 namespaced for use in your application. For example, you might have a | |
23 ``users`` table or a ``forums`` table. | |
24 """ | |
25 max_batch_get = 100 | |
26 | |
27 _PROJECTION_TYPE_TO_INDEX = dict( | |
28 global_indexes=dict( | |
29 ALL=GlobalAllIndex, | |
30 KEYS_ONLY=GlobalKeysOnlyIndex, | |
31 INCLUDE=GlobalIncludeIndex, | |
32 ), local_indexes=dict( | |
33 ALL=AllIndex, | |
34 KEYS_ONLY=KeysOnlyIndex, | |
35 INCLUDE=IncludeIndex, | |
36 ) | |
37 ) | |
38 | |
39 def __init__(self, table_name, schema=None, throughput=None, indexes=None, | |
40 global_indexes=None, connection=None): | |
41 """ | |
42 Sets up a new in-memory ``Table``. | |
43 | |
44 This is useful if the table already exists within DynamoDB & you simply | |
45 want to use it for additional interactions. The only required parameter | |
46 is the ``table_name``. However, under the hood, the object will call | |
47 ``describe_table`` to determine the schema/indexes/throughput. You | |
48 can avoid this extra call by passing in ``schema`` & ``indexes``. | |
49 | |
50 **IMPORTANT** - If you're creating a new ``Table`` for the first time, | |
51 you should use the ``Table.create`` method instead, as it will | |
52 persist the table structure to DynamoDB. | |
53 | |
54 Requires a ``table_name`` parameter, which should be a simple string | |
55 of the name of the table. | |
56 | |
57 Optionally accepts a ``schema`` parameter, which should be a list of | |
58 ``BaseSchemaField`` subclasses representing the desired schema. | |
59 | |
60 Optionally accepts a ``throughput`` parameter, which should be a | |
61 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
62 both of which should have an integer value associated with them. | |
63 | |
64 Optionally accepts a ``indexes`` parameter, which should be a list of | |
65 ``BaseIndexField`` subclasses representing the desired indexes. | |
66 | |
67 Optionally accepts a ``global_indexes`` parameter, which should be a | |
68 list of ``GlobalBaseIndexField`` subclasses representing the desired | |
69 indexes. | |
70 | |
71 Optionally accepts a ``connection`` parameter, which should be a | |
72 ``DynamoDBConnection`` instance (or subclass). This is primarily useful | |
73 for specifying alternate connection parameters. | |
74 | |
75 Example:: | |
76 | |
77 # The simple, it-already-exists case. | |
78 >>> conn = Table('users') | |
79 | |
80 # The full, minimum-extra-calls case. | |
81 >>> from boto import dynamodb2 | |
82 >>> users = Table('users', schema=[ | |
83 ... HashKey('username'), | |
84 ... RangeKey('date_joined', data_type=NUMBER) | |
85 ... ], throughput={ | |
86 ... 'read':20, | |
87 ... 'write': 10, | |
88 ... }, indexes=[ | |
89 ... KeysOnlyIndex('MostRecentlyJoined', parts=[ | |
90 ... HashKey('username') | |
91 ... RangeKey('date_joined') | |
92 ... ]), | |
93 ... ], global_indexes=[ | |
94 ... GlobalAllIndex('UsersByZipcode', parts=[ | |
95 ... HashKey('zipcode'), | |
96 ... RangeKey('username'), | |
97 ... ], | |
98 ... throughput={ | |
99 ... 'read':10, | |
100 ... 'write':10, | |
101 ... }), | |
102 ... ], connection=dynamodb2.connect_to_region('us-west-2', | |
103 ... aws_access_key_id='key', | |
104 ... aws_secret_access_key='key', | |
105 ... )) | |
106 | |
107 """ | |
108 self.table_name = table_name | |
109 self.connection = connection | |
110 self.throughput = { | |
111 'read': 5, | |
112 'write': 5, | |
113 } | |
114 self.schema = schema | |
115 self.indexes = indexes | |
116 self.global_indexes = global_indexes | |
117 | |
118 if self.connection is None: | |
119 self.connection = DynamoDBConnection() | |
120 | |
121 if throughput is not None: | |
122 self.throughput = throughput | |
123 | |
124 self._dynamizer = NonBooleanDynamizer() | |
125 | |
126 def use_boolean(self): | |
127 self._dynamizer = Dynamizer() | |
128 | |
129 @classmethod | |
130 def create(cls, table_name, schema, throughput=None, indexes=None, | |
131 global_indexes=None, connection=None): | |
132 """ | |
133 Creates a new table in DynamoDB & returns an in-memory ``Table`` object. | |
134 | |
135 This will setup a brand new table within DynamoDB. The ``table_name`` | |
136 must be unique for your AWS account. The ``schema`` is also required | |
137 to define the key structure of the table. | |
138 | |
139 **IMPORTANT** - You should consider the usage pattern of your table | |
140 up-front, as the schema can **NOT** be modified once the table is | |
141 created, requiring the creation of a new table & migrating the data | |
142 should you wish to revise it. | |
143 | |
144 **IMPORTANT** - If the table already exists in DynamoDB, additional | |
145 calls to this method will result in an error. If you just need | |
146 a ``Table`` object to interact with the existing table, you should | |
147 just initialize a new ``Table`` object, which requires only the | |
148 ``table_name``. | |
149 | |
150 Requires a ``table_name`` parameter, which should be a simple string | |
151 of the name of the table. | |
152 | |
153 Requires a ``schema`` parameter, which should be a list of | |
154 ``BaseSchemaField`` subclasses representing the desired schema. | |
155 | |
156 Optionally accepts a ``throughput`` parameter, which should be a | |
157 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
158 both of which should have an integer value associated with them. | |
159 | |
160 Optionally accepts a ``indexes`` parameter, which should be a list of | |
161 ``BaseIndexField`` subclasses representing the desired indexes. | |
162 | |
163 Optionally accepts a ``global_indexes`` parameter, which should be a | |
164 list of ``GlobalBaseIndexField`` subclasses representing the desired | |
165 indexes. | |
166 | |
167 Optionally accepts a ``connection`` parameter, which should be a | |
168 ``DynamoDBConnection`` instance (or subclass). This is primarily useful | |
169 for specifying alternate connection parameters. | |
170 | |
171 Example:: | |
172 | |
173 >>> users = Table.create('users', schema=[ | |
174 ... HashKey('username'), | |
175 ... RangeKey('date_joined', data_type=NUMBER) | |
176 ... ], throughput={ | |
177 ... 'read':20, | |
178 ... 'write': 10, | |
179 ... }, indexes=[ | |
180 ... KeysOnlyIndex('MostRecentlyJoined', parts=[ | |
181 ... HashKey('username'), | |
182 ... RangeKey('date_joined'), | |
183 ... ]), global_indexes=[ | |
184 ... GlobalAllIndex('UsersByZipcode', parts=[ | |
185 ... HashKey('zipcode'), | |
186 ... RangeKey('username'), | |
187 ... ], | |
188 ... throughput={ | |
189 ... 'read':10, | |
190 ... 'write':10, | |
191 ... }), | |
192 ... ]) | |
193 | |
194 """ | |
195 table = cls(table_name=table_name, connection=connection) | |
196 table.schema = schema | |
197 | |
198 if throughput is not None: | |
199 table.throughput = throughput | |
200 | |
201 if indexes is not None: | |
202 table.indexes = indexes | |
203 | |
204 if global_indexes is not None: | |
205 table.global_indexes = global_indexes | |
206 | |
207 # Prep the schema. | |
208 raw_schema = [] | |
209 attr_defs = [] | |
210 seen_attrs = set() | |
211 | |
212 for field in table.schema: | |
213 raw_schema.append(field.schema()) | |
214 # Build the attributes off what we know. | |
215 seen_attrs.add(field.name) | |
216 attr_defs.append(field.definition()) | |
217 | |
218 raw_throughput = { | |
219 'ReadCapacityUnits': int(table.throughput['read']), | |
220 'WriteCapacityUnits': int(table.throughput['write']), | |
221 } | |
222 kwargs = {} | |
223 | |
224 kwarg_map = { | |
225 'indexes': 'local_secondary_indexes', | |
226 'global_indexes': 'global_secondary_indexes', | |
227 } | |
228 for index_attr in ('indexes', 'global_indexes'): | |
229 table_indexes = getattr(table, index_attr) | |
230 if table_indexes: | |
231 raw_indexes = [] | |
232 for index_field in table_indexes: | |
233 raw_indexes.append(index_field.schema()) | |
234 # Make sure all attributes specified in the indexes are | |
235 # added to the definition | |
236 for field in index_field.parts: | |
237 if field.name not in seen_attrs: | |
238 seen_attrs.add(field.name) | |
239 attr_defs.append(field.definition()) | |
240 | |
241 kwargs[kwarg_map[index_attr]] = raw_indexes | |
242 | |
243 table.connection.create_table( | |
244 table_name=table.table_name, | |
245 attribute_definitions=attr_defs, | |
246 key_schema=raw_schema, | |
247 provisioned_throughput=raw_throughput, | |
248 **kwargs | |
249 ) | |
250 return table | |
251 | |
252 def _introspect_schema(self, raw_schema, raw_attributes=None): | |
253 """ | |
254 Given a raw schema structure back from a DynamoDB response, parse | |
255 out & build the high-level Python objects that represent them. | |
256 """ | |
257 schema = [] | |
258 sane_attributes = {} | |
259 | |
260 if raw_attributes: | |
261 for field in raw_attributes: | |
262 sane_attributes[field['AttributeName']] = field['AttributeType'] | |
263 | |
264 for field in raw_schema: | |
265 data_type = sane_attributes.get(field['AttributeName'], STRING) | |
266 | |
267 if field['KeyType'] == 'HASH': | |
268 schema.append( | |
269 HashKey(field['AttributeName'], data_type=data_type) | |
270 ) | |
271 elif field['KeyType'] == 'RANGE': | |
272 schema.append( | |
273 RangeKey(field['AttributeName'], data_type=data_type) | |
274 ) | |
275 else: | |
276 raise exceptions.UnknownSchemaFieldError( | |
277 "%s was seen, but is unknown. Please report this at " | |
278 "https://github.com/boto/boto/issues." % field['KeyType'] | |
279 ) | |
280 | |
281 return schema | |
282 | |
283 def _introspect_all_indexes(self, raw_indexes, map_indexes_projection): | |
284 """ | |
285 Given a raw index/global index structure back from a DynamoDB response, | |
286 parse out & build the high-level Python objects that represent them. | |
287 """ | |
288 indexes = [] | |
289 | |
290 for field in raw_indexes: | |
291 index_klass = map_indexes_projection.get('ALL') | |
292 kwargs = { | |
293 'parts': [] | |
294 } | |
295 | |
296 if field['Projection']['ProjectionType'] == 'ALL': | |
297 index_klass = map_indexes_projection.get('ALL') | |
298 elif field['Projection']['ProjectionType'] == 'KEYS_ONLY': | |
299 index_klass = map_indexes_projection.get('KEYS_ONLY') | |
300 elif field['Projection']['ProjectionType'] == 'INCLUDE': | |
301 index_klass = map_indexes_projection.get('INCLUDE') | |
302 kwargs['includes'] = field['Projection']['NonKeyAttributes'] | |
303 else: | |
304 raise exceptions.UnknownIndexFieldError( | |
305 "%s was seen, but is unknown. Please report this at " | |
306 "https://github.com/boto/boto/issues." % \ | |
307 field['Projection']['ProjectionType'] | |
308 ) | |
309 | |
310 name = field['IndexName'] | |
311 kwargs['parts'] = self._introspect_schema(field['KeySchema'], None) | |
312 indexes.append(index_klass(name, **kwargs)) | |
313 | |
314 return indexes | |
315 | |
316 def _introspect_indexes(self, raw_indexes): | |
317 """ | |
318 Given a raw index structure back from a DynamoDB response, parse | |
319 out & build the high-level Python objects that represent them. | |
320 """ | |
321 return self._introspect_all_indexes( | |
322 raw_indexes, self._PROJECTION_TYPE_TO_INDEX.get('local_indexes')) | |
323 | |
324 def _introspect_global_indexes(self, raw_global_indexes): | |
325 """ | |
326 Given a raw global index structure back from a DynamoDB response, parse | |
327 out & build the high-level Python objects that represent them. | |
328 """ | |
329 return self._introspect_all_indexes( | |
330 raw_global_indexes, | |
331 self._PROJECTION_TYPE_TO_INDEX.get('global_indexes')) | |
332 | |
333 def describe(self): | |
334 """ | |
335 Describes the current structure of the table in DynamoDB. | |
336 | |
337 This information will be used to update the ``schema``, ``indexes``, | |
338 ``global_indexes`` and ``throughput`` information on the ``Table``. Some | |
339 calls, such as those involving creating keys or querying, will require | |
340 this information to be populated. | |
341 | |
342 It also returns the full raw data structure from DynamoDB, in the | |
343 event you'd like to parse out additional information (such as the | |
344 ``ItemCount`` or usage information). | |
345 | |
346 Example:: | |
347 | |
348 >>> users.describe() | |
349 { | |
350 # Lots of keys here... | |
351 } | |
352 >>> len(users.schema) | |
353 2 | |
354 | |
355 """ | |
356 result = self.connection.describe_table(self.table_name) | |
357 | |
358 # Blindly update throughput, since what's on DynamoDB's end is likely | |
359 # more correct. | |
360 raw_throughput = result['Table']['ProvisionedThroughput'] | |
361 self.throughput['read'] = int(raw_throughput['ReadCapacityUnits']) | |
362 self.throughput['write'] = int(raw_throughput['WriteCapacityUnits']) | |
363 | |
364 if not self.schema: | |
365 # Since we have the data, build the schema. | |
366 raw_schema = result['Table'].get('KeySchema', []) | |
367 raw_attributes = result['Table'].get('AttributeDefinitions', []) | |
368 self.schema = self._introspect_schema(raw_schema, raw_attributes) | |
369 | |
370 if not self.indexes: | |
371 # Build the index information as well. | |
372 raw_indexes = result['Table'].get('LocalSecondaryIndexes', []) | |
373 self.indexes = self._introspect_indexes(raw_indexes) | |
374 | |
375 # Build the global index information as well. | |
376 raw_global_indexes = result['Table'].get('GlobalSecondaryIndexes', []) | |
377 self.global_indexes = self._introspect_global_indexes(raw_global_indexes) | |
378 | |
379 # This is leaky. | |
380 return result | |
381 | |
382 def update(self, throughput=None, global_indexes=None): | |
383 """ | |
384 Updates table attributes and global indexes in DynamoDB. | |
385 | |
386 Optionally accepts a ``throughput`` parameter, which should be a | |
387 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
388 both of which should have an integer value associated with them. | |
389 | |
390 Optionally accepts a ``global_indexes`` parameter, which should be a | |
391 dictionary. If provided, it should specify the index name, which is also | |
392 a dict containing a ``read`` & ``write`` key, both of which | |
393 should have an integer value associated with them. If you are writing | |
394 new code, please use ``Table.update_global_secondary_index``. | |
395 | |
396 Returns ``True`` on success. | |
397 | |
398 Example:: | |
399 | |
400 # For a read-heavier application... | |
401 >>> users.update(throughput={ | |
402 ... 'read': 20, | |
403 ... 'write': 10, | |
404 ... }) | |
405 True | |
406 | |
407 # To also update the global index(es) throughput. | |
408 >>> users.update(throughput={ | |
409 ... 'read': 20, | |
410 ... 'write': 10, | |
411 ... }, | |
412 ... global_secondary_indexes={ | |
413 ... 'TheIndexNameHere': { | |
414 ... 'read': 15, | |
415 ... 'write': 5, | |
416 ... } | |
417 ... }) | |
418 True | |
419 """ | |
420 | |
421 data = None | |
422 | |
423 if throughput: | |
424 self.throughput = throughput | |
425 data = { | |
426 'ReadCapacityUnits': int(self.throughput['read']), | |
427 'WriteCapacityUnits': int(self.throughput['write']), | |
428 } | |
429 | |
430 gsi_data = None | |
431 | |
432 if global_indexes: | |
433 gsi_data = [] | |
434 | |
435 for gsi_name, gsi_throughput in global_indexes.items(): | |
436 gsi_data.append({ | |
437 "Update": { | |
438 "IndexName": gsi_name, | |
439 "ProvisionedThroughput": { | |
440 "ReadCapacityUnits": int(gsi_throughput['read']), | |
441 "WriteCapacityUnits": int(gsi_throughput['write']), | |
442 }, | |
443 }, | |
444 }) | |
445 | |
446 if throughput or global_indexes: | |
447 self.connection.update_table( | |
448 self.table_name, | |
449 provisioned_throughput=data, | |
450 global_secondary_index_updates=gsi_data, | |
451 ) | |
452 | |
453 return True | |
454 else: | |
455 msg = 'You need to provide either the throughput or the ' \ | |
456 'global_indexes to update method' | |
457 boto.log.error(msg) | |
458 | |
459 return False | |
460 | |
461 def create_global_secondary_index(self, global_index): | |
462 """ | |
463 Creates a global index in DynamoDB after the table has been created. | |
464 | |
465 Requires a ``global_indexes`` parameter, which should be a | |
466 ``GlobalBaseIndexField`` subclass representing the desired index. | |
467 | |
468 To update ``global_indexes`` information on the ``Table``, you'll need | |
469 to call ``Table.describe``. | |
470 | |
471 Returns ``True`` on success. | |
472 | |
473 Example:: | |
474 | |
475 # To create a global index | |
476 >>> users.create_global_secondary_index( | |
477 ... global_index=GlobalAllIndex( | |
478 ... 'TheIndexNameHere', parts=[ | |
479 ... HashKey('requiredHashkey', data_type=STRING), | |
480 ... RangeKey('optionalRangeKey', data_type=STRING) | |
481 ... ], | |
482 ... throughput={ | |
483 ... 'read': 2, | |
484 ... 'write': 1, | |
485 ... }) | |
486 ... ) | |
487 True | |
488 | |
489 """ | |
490 | |
491 if global_index: | |
492 gsi_data = [] | |
493 gsi_data_attr_def = [] | |
494 | |
495 gsi_data.append({ | |
496 "Create": global_index.schema() | |
497 }) | |
498 | |
499 for attr_def in global_index.parts: | |
500 gsi_data_attr_def.append(attr_def.definition()) | |
501 | |
502 self.connection.update_table( | |
503 self.table_name, | |
504 global_secondary_index_updates=gsi_data, | |
505 attribute_definitions=gsi_data_attr_def | |
506 ) | |
507 | |
508 return True | |
509 else: | |
510 msg = 'You need to provide the global_index to ' \ | |
511 'create_global_secondary_index method' | |
512 boto.log.error(msg) | |
513 | |
514 return False | |
515 | |
516 def delete_global_secondary_index(self, global_index_name): | |
517 """ | |
518 Deletes a global index in DynamoDB after the table has been created. | |
519 | |
520 Requires a ``global_index_name`` parameter, which should be a simple | |
521 string of the name of the global secondary index. | |
522 | |
523 To update ``global_indexes`` information on the ``Table``, you'll need | |
524 to call ``Table.describe``. | |
525 | |
526 Returns ``True`` on success. | |
527 | |
528 Example:: | |
529 | |
530 # To delete a global index | |
531 >>> users.delete_global_secondary_index('TheIndexNameHere') | |
532 True | |
533 | |
534 """ | |
535 | |
536 if global_index_name: | |
537 gsi_data = [ | |
538 { | |
539 "Delete": { | |
540 "IndexName": global_index_name | |
541 } | |
542 } | |
543 ] | |
544 | |
545 self.connection.update_table( | |
546 self.table_name, | |
547 global_secondary_index_updates=gsi_data, | |
548 ) | |
549 | |
550 return True | |
551 else: | |
552 msg = 'You need to provide the global index name to ' \ | |
553 'delete_global_secondary_index method' | |
554 boto.log.error(msg) | |
555 | |
556 return False | |
557 | |
558 def update_global_secondary_index(self, global_indexes): | |
559 """ | |
560 Updates a global index(es) in DynamoDB after the table has been created. | |
561 | |
562 Requires a ``global_indexes`` parameter, which should be a | |
563 dictionary. If provided, it should specify the index name, which is also | |
564 a dict containing a ``read`` & ``write`` key, both of which | |
565 should have an integer value associated with them. | |
566 | |
567 To update ``global_indexes`` information on the ``Table``, you'll need | |
568 to call ``Table.describe``. | |
569 | |
570 Returns ``True`` on success. | |
571 | |
572 Example:: | |
573 | |
574 # To update a global index | |
575 >>> users.update_global_secondary_index(global_indexes={ | |
576 ... 'TheIndexNameHere': { | |
577 ... 'read': 15, | |
578 ... 'write': 5, | |
579 ... } | |
580 ... }) | |
581 True | |
582 | |
583 """ | |
584 | |
585 if global_indexes: | |
586 gsi_data = [] | |
587 | |
588 for gsi_name, gsi_throughput in global_indexes.items(): | |
589 gsi_data.append({ | |
590 "Update": { | |
591 "IndexName": gsi_name, | |
592 "ProvisionedThroughput": { | |
593 "ReadCapacityUnits": int(gsi_throughput['read']), | |
594 "WriteCapacityUnits": int(gsi_throughput['write']), | |
595 }, | |
596 }, | |
597 }) | |
598 | |
599 self.connection.update_table( | |
600 self.table_name, | |
601 global_secondary_index_updates=gsi_data, | |
602 ) | |
603 return True | |
604 else: | |
605 msg = 'You need to provide the global indexes to ' \ | |
606 'update_global_secondary_index method' | |
607 boto.log.error(msg) | |
608 | |
609 return False | |
610 | |
611 def delete(self): | |
612 """ | |
613 Deletes a table in DynamoDB. | |
614 | |
615 **IMPORTANT** - Be careful when using this method, there is no undo. | |
616 | |
617 Returns ``True`` on success. | |
618 | |
619 Example:: | |
620 | |
621 >>> users.delete() | |
622 True | |
623 | |
624 """ | |
625 self.connection.delete_table(self.table_name) | |
626 return True | |
627 | |
628 def _encode_keys(self, keys): | |
629 """ | |
630 Given a flat Python dictionary of keys/values, converts it into the | |
631 nested dictionary DynamoDB expects. | |
632 | |
633 Converts:: | |
634 | |
635 { | |
636 'username': 'john', | |
637 'tags': [1, 2, 5], | |
638 } | |
639 | |
640 ...to...:: | |
641 | |
642 { | |
643 'username': {'S': 'john'}, | |
644 'tags': {'NS': ['1', '2', '5']}, | |
645 } | |
646 | |
647 """ | |
648 raw_key = {} | |
649 | |
650 for key, value in keys.items(): | |
651 raw_key[key] = self._dynamizer.encode(value) | |
652 | |
653 return raw_key | |
654 | |
655 def get_item(self, consistent=False, attributes=None, **kwargs): | |
656 """ | |
657 Fetches an item (record) from a table in DynamoDB. | |
658 | |
659 To specify the key of the item you'd like to get, you can specify the | |
660 key attributes as kwargs. | |
661 | |
662 Optionally accepts a ``consistent`` parameter, which should be a | |
663 boolean. If you provide ``True``, it will perform | |
664 a consistent (but more expensive) read from DynamoDB. | |
665 (Default: ``False``) | |
666 | |
667 Optionally accepts an ``attributes`` parameter, which should be a | |
668 list of fieldname to fetch. (Default: ``None``, which means all fields | |
669 should be fetched) | |
670 | |
671 Returns an ``Item`` instance containing all the data for that record. | |
672 | |
673 Raises an ``ItemNotFound`` exception if the item is not found. | |
674 | |
675 Example:: | |
676 | |
677 # A simple hash key. | |
678 >>> john = users.get_item(username='johndoe') | |
679 >>> john['first_name'] | |
680 'John' | |
681 | |
682 # A complex hash+range key. | |
683 >>> john = users.get_item(username='johndoe', last_name='Doe') | |
684 >>> john['first_name'] | |
685 'John' | |
686 | |
687 # A consistent read (assuming the data might have just changed). | |
688 >>> john = users.get_item(username='johndoe', consistent=True) | |
689 >>> john['first_name'] | |
690 'Johann' | |
691 | |
692 # With a key that is an invalid variable name in Python. | |
693 # Also, assumes a different schema than previous examples. | |
694 >>> john = users.get_item(**{ | |
695 ... 'date-joined': 127549192, | |
696 ... }) | |
697 >>> john['first_name'] | |
698 'John' | |
699 | |
700 """ | |
701 raw_key = self._encode_keys(kwargs) | |
702 item_data = self.connection.get_item( | |
703 self.table_name, | |
704 raw_key, | |
705 attributes_to_get=attributes, | |
706 consistent_read=consistent | |
707 ) | |
708 if 'Item' not in item_data: | |
709 raise exceptions.ItemNotFound("Item %s couldn't be found." % kwargs) | |
710 item = Item(self) | |
711 item.load(item_data) | |
712 return item | |
713 | |
714 def has_item(self, **kwargs): | |
715 """ | |
716 Return whether an item (record) exists within a table in DynamoDB. | |
717 | |
718 To specify the key of the item you'd like to get, you can specify the | |
719 key attributes as kwargs. | |
720 | |
721 Optionally accepts a ``consistent`` parameter, which should be a | |
722 boolean. If you provide ``True``, it will perform | |
723 a consistent (but more expensive) read from DynamoDB. | |
724 (Default: ``False``) | |
725 | |
726 Optionally accepts an ``attributes`` parameter, which should be a | |
727 list of fieldnames to fetch. (Default: ``None``, which means all fields | |
728 should be fetched) | |
729 | |
730 Returns ``True`` if an ``Item`` is present, ``False`` if not. | |
731 | |
732 Example:: | |
733 | |
734 # Simple, just hash-key schema. | |
735 >>> users.has_item(username='johndoe') | |
736 True | |
737 | |
738 # Complex schema, item not present. | |
739 >>> users.has_item( | |
740 ... username='johndoe', | |
741 ... date_joined='2014-01-07' | |
742 ... ) | |
743 False | |
744 | |
745 """ | |
746 try: | |
747 self.get_item(**kwargs) | |
748 except (JSONResponseError, exceptions.ItemNotFound): | |
749 return False | |
750 | |
751 return True | |
752 | |
753 def lookup(self, *args, **kwargs): | |
754 """ | |
755 Look up an entry in DynamoDB. This is mostly backwards compatible | |
756 with boto.dynamodb. Unlike get_item, it takes hash_key and range_key first, | |
757 although you may still specify keyword arguments instead. | |
758 | |
759 Also unlike the get_item command, if the returned item has no keys | |
760 (i.e., it does not exist in DynamoDB), a None result is returned, instead | |
761 of an empty key object. | |
762 | |
763 Example:: | |
764 >>> user = users.lookup(username) | |
765 >>> user = users.lookup(username, consistent=True) | |
766 >>> app = apps.lookup('my_customer_id', 'my_app_id') | |
767 | |
768 """ | |
769 if not self.schema: | |
770 self.describe() | |
771 for x, arg in enumerate(args): | |
772 kwargs[self.schema[x].name] = arg | |
773 ret = self.get_item(**kwargs) | |
774 if not ret.keys(): | |
775 return None | |
776 return ret | |
777 | |
778 def new_item(self, *args): | |
779 """ | |
780 Returns a new, blank item | |
781 | |
782 This is mostly for consistency with boto.dynamodb | |
783 """ | |
784 if not self.schema: | |
785 self.describe() | |
786 data = {} | |
787 for x, arg in enumerate(args): | |
788 data[self.schema[x].name] = arg | |
789 return Item(self, data=data) | |
790 | |
791 def put_item(self, data, overwrite=False): | |
792 """ | |
793 Saves an entire item to DynamoDB. | |
794 | |
795 By default, if any part of the ``Item``'s original data doesn't match | |
796 what's currently in DynamoDB, this request will fail. This prevents | |
797 other processes from updating the data in between when you read the | |
798 item & when your request to update the item's data is processed, which | |
799 would typically result in some data loss. | |
800 | |
801 Requires a ``data`` parameter, which should be a dictionary of the data | |
802 you'd like to store in DynamoDB. | |
803 | |
804 Optionally accepts an ``overwrite`` parameter, which should be a | |
805 boolean. If you provide ``True``, this will tell DynamoDB to blindly | |
806 overwrite whatever data is present, if any. | |
807 | |
808 Returns ``True`` on success. | |
809 | |
810 Example:: | |
811 | |
812 >>> users.put_item(data={ | |
813 ... 'username': 'jane', | |
814 ... 'first_name': 'Jane', | |
815 ... 'last_name': 'Doe', | |
816 ... 'date_joined': 126478915, | |
817 ... }) | |
818 True | |
819 | |
820 """ | |
821 item = Item(self, data=data) | |
822 return item.save(overwrite=overwrite) | |
823 | |
824 def _put_item(self, item_data, expects=None): | |
825 """ | |
826 The internal variant of ``put_item`` (full data). This is used by the | |
827 ``Item`` objects, since that operation is represented at the | |
828 table-level by the API, but conceptually maps better to telling an | |
829 individual ``Item`` to save itself. | |
830 """ | |
831 kwargs = {} | |
832 | |
833 if expects is not None: | |
834 kwargs['expected'] = expects | |
835 | |
836 self.connection.put_item(self.table_name, item_data, **kwargs) | |
837 return True | |
838 | |
839 def _update_item(self, key, item_data, expects=None): | |
840 """ | |
841 The internal variant of ``put_item`` (partial data). This is used by the | |
842 ``Item`` objects, since that operation is represented at the | |
843 table-level by the API, but conceptually maps better to telling an | |
844 individual ``Item`` to save itself. | |
845 """ | |
846 raw_key = self._encode_keys(key) | |
847 kwargs = {} | |
848 | |
849 if expects is not None: | |
850 kwargs['expected'] = expects | |
851 | |
852 self.connection.update_item(self.table_name, raw_key, item_data, **kwargs) | |
853 return True | |
854 | |
855 def delete_item(self, expected=None, conditional_operator=None, **kwargs): | |
856 """ | |
857 Deletes a single item. You can perform a conditional delete operation | |
858 that deletes the item if it exists, or if it has an expected attribute | |
859 value. | |
860 | |
861 Conditional deletes are useful for only deleting items if specific | |
862 conditions are met. If those conditions are met, DynamoDB performs | |
863 the delete. Otherwise, the item is not deleted. | |
864 | |
865 To specify the expected attribute values of the item, you can pass a | |
866 dictionary of conditions to ``expected``. Each condition should follow | |
867 the pattern ``<attributename>__<comparison_operator>=<value_to_expect>``. | |
868 | |
869 **IMPORTANT** - Be careful when using this method, there is no undo. | |
870 | |
871 To specify the key of the item you'd like to get, you can specify the | |
872 key attributes as kwargs. | |
873 | |
874 Optionally accepts an ``expected`` parameter which is a dictionary of | |
875 expected attribute value conditions. | |
876 | |
877 Optionally accepts a ``conditional_operator`` which applies to the | |
878 expected attribute value conditions: | |
879 | |
880 + `AND` - If all of the conditions evaluate to true (default) | |
881 + `OR` - True if at least one condition evaluates to true | |
882 | |
883 Returns ``True`` on success, ``False`` on failed conditional delete. | |
884 | |
885 Example:: | |
886 | |
887 # A simple hash key. | |
888 >>> users.delete_item(username='johndoe') | |
889 True | |
890 | |
891 # A complex hash+range key. | |
892 >>> users.delete_item(username='jane', last_name='Doe') | |
893 True | |
894 | |
895 # With a key that is an invalid variable name in Python. | |
896 # Also, assumes a different schema than previous examples. | |
897 >>> users.delete_item(**{ | |
898 ... 'date-joined': 127549192, | |
899 ... }) | |
900 True | |
901 | |
902 # Conditional delete | |
903 >>> users.delete_item(username='johndoe', | |
904 ... expected={'balance__eq': 0}) | |
905 True | |
906 """ | |
907 expected = self._build_filters(expected, using=FILTER_OPERATORS) | |
908 raw_key = self._encode_keys(kwargs) | |
909 | |
910 try: | |
911 self.connection.delete_item(self.table_name, raw_key, | |
912 expected=expected, | |
913 conditional_operator=conditional_operator) | |
914 except exceptions.ConditionalCheckFailedException: | |
915 return False | |
916 | |
917 return True | |
918 | |
919 def get_key_fields(self): | |
920 """ | |
921 Returns the fields necessary to make a key for a table. | |
922 | |
923 If the ``Table`` does not already have a populated ``schema``, | |
924 this will request it via a ``Table.describe`` call. | |
925 | |
926 Returns a list of fieldnames (strings). | |
927 | |
928 Example:: | |
929 | |
930 # A simple hash key. | |
931 >>> users.get_key_fields() | |
932 ['username'] | |
933 | |
934 # A complex hash+range key. | |
935 >>> users.get_key_fields() | |
936 ['username', 'last_name'] | |
937 | |
938 """ | |
939 if not self.schema: | |
940 # We don't know the structure of the table. Get a description to | |
941 # populate the schema. | |
942 self.describe() | |
943 | |
944 return [field.name for field in self.schema] | |
945 | |
946 def batch_write(self): | |
947 """ | |
948 Allows the batching of writes to DynamoDB. | |
949 | |
950 Since each write/delete call to DynamoDB has a cost associated with it, | |
951 when loading lots of data, it makes sense to batch them, creating as | |
952 few calls as possible. | |
953 | |
954 This returns a context manager that will transparently handle creating | |
955 these batches. The object you get back lightly-resembles a ``Table`` | |
956 object, sharing just the ``put_item`` & ``delete_item`` methods | |
957 (which are all that DynamoDB can batch in terms of writing data). | |
958 | |
959 DynamoDB's maximum batch size is 25 items per request. If you attempt | |
960 to put/delete more than that, the context manager will batch as many | |
961 as it can up to that number, then flush them to DynamoDB & continue | |
962 batching as more calls come in. | |
963 | |
964 Example:: | |
965 | |
966 # Assuming a table with one record... | |
967 >>> with users.batch_write() as batch: | |
968 ... batch.put_item(data={ | |
969 ... 'username': 'johndoe', | |
970 ... 'first_name': 'John', | |
971 ... 'last_name': 'Doe', | |
972 ... 'owner': 1, | |
973 ... }) | |
974 ... # Nothing across the wire yet. | |
975 ... batch.delete_item(username='bob') | |
976 ... # Still no requests sent. | |
977 ... batch.put_item(data={ | |
978 ... 'username': 'jane', | |
979 ... 'first_name': 'Jane', | |
980 ... 'last_name': 'Doe', | |
981 ... 'date_joined': 127436192, | |
982 ... }) | |
983 ... # Nothing yet, but once we leave the context, the | |
984 ... # put/deletes will be sent. | |
985 | |
986 """ | |
987 # PHENOMENAL COSMIC DOCS!!! itty-bitty code. | |
988 return BatchTable(self) | |
989 | |
990 def _build_filters(self, filter_kwargs, using=QUERY_OPERATORS): | |
991 """ | |
992 An internal method for taking query/scan-style ``**kwargs`` & turning | |
993 them into the raw structure DynamoDB expects for filtering. | |
994 """ | |
995 if filter_kwargs is None: | |
996 return | |
997 | |
998 filters = {} | |
999 | |
1000 for field_and_op, value in filter_kwargs.items(): | |
1001 field_bits = field_and_op.split('__') | |
1002 fieldname = '__'.join(field_bits[:-1]) | |
1003 | |
1004 try: | |
1005 op = using[field_bits[-1]] | |
1006 except KeyError: | |
1007 raise exceptions.UnknownFilterTypeError( | |
1008 "Operator '%s' from '%s' is not recognized." % ( | |
1009 field_bits[-1], | |
1010 field_and_op | |
1011 ) | |
1012 ) | |
1013 | |
1014 lookup = { | |
1015 'AttributeValueList': [], | |
1016 'ComparisonOperator': op, | |
1017 } | |
1018 | |
1019 # Special-case the ``NULL/NOT_NULL`` case. | |
1020 if field_bits[-1] == 'null': | |
1021 del lookup['AttributeValueList'] | |
1022 | |
1023 if value is False: | |
1024 lookup['ComparisonOperator'] = 'NOT_NULL' | |
1025 else: | |
1026 lookup['ComparisonOperator'] = 'NULL' | |
1027 # Special-case the ``BETWEEN`` case. | |
1028 elif field_bits[-1] == 'between': | |
1029 if len(value) == 2 and isinstance(value, (list, tuple)): | |
1030 lookup['AttributeValueList'].append( | |
1031 self._dynamizer.encode(value[0]) | |
1032 ) | |
1033 lookup['AttributeValueList'].append( | |
1034 self._dynamizer.encode(value[1]) | |
1035 ) | |
1036 # Special-case the ``IN`` case | |
1037 elif field_bits[-1] == 'in': | |
1038 for val in value: | |
1039 lookup['AttributeValueList'].append(self._dynamizer.encode(val)) | |
1040 else: | |
1041 # Fix up the value for encoding, because it was built to only work | |
1042 # with ``set``s. | |
1043 if isinstance(value, (list, tuple)): | |
1044 value = set(value) | |
1045 lookup['AttributeValueList'].append( | |
1046 self._dynamizer.encode(value) | |
1047 ) | |
1048 | |
1049 # Finally, insert it into the filters. | |
1050 filters[fieldname] = lookup | |
1051 | |
1052 return filters | |
1053 | |
1054 def query(self, limit=None, index=None, reverse=False, consistent=False, | |
1055 attributes=None, max_page_size=None, **filter_kwargs): | |
1056 """ | |
1057 **WARNING:** This method is provided **strictly** for | |
1058 backward-compatibility. It returns results in an incorrect order. | |
1059 | |
1060 If you are writing new code, please use ``Table.query_2``. | |
1061 """ | |
1062 reverse = not reverse | |
1063 return self.query_2(limit=limit, index=index, reverse=reverse, | |
1064 consistent=consistent, attributes=attributes, | |
1065 max_page_size=max_page_size, **filter_kwargs) | |
1066 | |
1067 def query_2(self, limit=None, index=None, reverse=False, | |
1068 consistent=False, attributes=None, max_page_size=None, | |
1069 query_filter=None, conditional_operator=None, | |
1070 **filter_kwargs): | |
1071 """ | |
1072 Queries for a set of matching items in a DynamoDB table. | |
1073 | |
1074 Queries can be performed against a hash key, a hash+range key or | |
1075 against any data stored in your local secondary indexes. Query filters | |
1076 can be used to filter on arbitrary fields. | |
1077 | |
1078 **Note** - You can not query against arbitrary fields within the data | |
1079 stored in DynamoDB unless you specify ``query_filter`` values. | |
1080 | |
1081 To specify the filters of the items you'd like to get, you can specify | |
1082 the filters as kwargs. Each filter kwarg should follow the pattern | |
1083 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters | |
1084 are specified in the same way. | |
1085 | |
1086 Optionally accepts a ``limit`` parameter, which should be an integer | |
1087 count of the total number of items to return. (Default: ``None`` - | |
1088 all results) | |
1089 | |
1090 Optionally accepts an ``index`` parameter, which should be a string of | |
1091 name of the local secondary index you want to query against. | |
1092 (Default: ``None``) | |
1093 | |
1094 Optionally accepts a ``reverse`` parameter, which will present the | |
1095 results in reverse order. (Default: ``False`` - normal order) | |
1096 | |
1097 Optionally accepts a ``consistent`` parameter, which should be a | |
1098 boolean. If you provide ``True``, it will force a consistent read of | |
1099 the data (more expensive). (Default: ``False`` - use eventually | |
1100 consistent reads) | |
1101 | |
1102 Optionally accepts a ``attributes`` parameter, which should be a | |
1103 tuple. If you provide any attributes only these will be fetched | |
1104 from DynamoDB. This uses the ``AttributesToGet`` and set's | |
1105 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. | |
1106 | |
1107 Optionally accepts a ``max_page_size`` parameter, which should be an | |
1108 integer count of the maximum number of items to retrieve | |
1109 **per-request**. This is useful in making faster requests & prevent | |
1110 the scan from drowning out other queries. (Default: ``None`` - | |
1111 fetch as many as DynamoDB will return) | |
1112 | |
1113 Optionally accepts a ``query_filter`` which is a dictionary of filter | |
1114 conditions against any arbitrary field in the returned data. | |
1115 | |
1116 Optionally accepts a ``conditional_operator`` which applies to the | |
1117 query filter conditions: | |
1118 | |
1119 + `AND` - True if all filter conditions evaluate to true (default) | |
1120 + `OR` - True if at least one filter condition evaluates to true | |
1121 | |
1122 Returns a ``ResultSet`` containing ``Item``s, which transparently handles the pagination of | |
1123 results you get back. | |
1124 | |
1125 Example:: | |
1126 | |
1127 # Look for last names equal to "Doe". | |
1128 >>> results = users.query(last_name__eq='Doe') | |
1129 >>> for res in results: | |
1130 ... print res['first_name'] | |
1131 'John' | |
1132 'Jane' | |
1133 | |
1134 # Look for last names beginning with "D", in reverse order, limit 3. | |
1135 >>> results = users.query( | |
1136 ... last_name__beginswith='D', | |
1137 ... reverse=True, | |
1138 ... limit=3 | |
1139 ... ) | |
1140 >>> for res in results: | |
1141 ... print res['first_name'] | |
1142 'Alice' | |
1143 'Jane' | |
1144 'John' | |
1145 | |
1146 # Use an LSI & a consistent read. | |
1147 >>> results = users.query( | |
1148 ... date_joined__gte=1236451000, | |
1149 ... owner__eq=1, | |
1150 ... index='DateJoinedIndex', | |
1151 ... consistent=True | |
1152 ... ) | |
1153 >>> for res in results: | |
1154 ... print res['first_name'] | |
1155 'Alice' | |
1156 'Bob' | |
1157 'John' | |
1158 'Fred' | |
1159 | |
1160 # Filter by non-indexed field(s) | |
1161 >>> results = users.query( | |
1162 ... last_name__eq='Doe', | |
1163 ... reverse=True, | |
1164 ... query_filter={ | |
1165 ... 'first_name__beginswith': 'A' | |
1166 ... } | |
1167 ... ) | |
1168 >>> for res in results: | |
1169 ... print res['first_name'] + ' ' + res['last_name'] | |
1170 'Alice Doe' | |
1171 | |
1172 """ | |
1173 if self.schema: | |
1174 if len(self.schema) == 1: | |
1175 if len(filter_kwargs) <= 1: | |
1176 if not self.global_indexes or not len(self.global_indexes): | |
1177 # If the schema only has one field, there's <= 1 filter | |
1178 # param & no Global Secondary Indexes, this is user | |
1179 # error. Bail early. | |
1180 raise exceptions.QueryError( | |
1181 "You must specify more than one key to filter on." | |
1182 ) | |
1183 | |
1184 if attributes is not None: | |
1185 select = 'SPECIFIC_ATTRIBUTES' | |
1186 else: | |
1187 select = None | |
1188 | |
1189 results = ResultSet( | |
1190 max_page_size=max_page_size | |
1191 ) | |
1192 kwargs = filter_kwargs.copy() | |
1193 kwargs.update({ | |
1194 'limit': limit, | |
1195 'index': index, | |
1196 'reverse': reverse, | |
1197 'consistent': consistent, | |
1198 'select': select, | |
1199 'attributes_to_get': attributes, | |
1200 'query_filter': query_filter, | |
1201 'conditional_operator': conditional_operator, | |
1202 }) | |
1203 results.to_call(self._query, **kwargs) | |
1204 return results | |
1205 | |
1206 def query_count(self, index=None, consistent=False, conditional_operator=None, | |
1207 query_filter=None, scan_index_forward=True, limit=None, | |
1208 exclusive_start_key=None, **filter_kwargs): | |
1209 """ | |
1210 Queries the exact count of matching items in a DynamoDB table. | |
1211 | |
1212 Queries can be performed against a hash key, a hash+range key or | |
1213 against any data stored in your local secondary indexes. Query filters | |
1214 can be used to filter on arbitrary fields. | |
1215 | |
1216 To specify the filters of the items you'd like to get, you can specify | |
1217 the filters as kwargs. Each filter kwarg should follow the pattern | |
1218 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters | |
1219 are specified in the same way. | |
1220 | |
1221 Optionally accepts an ``index`` parameter, which should be a string of | |
1222 name of the local secondary index you want to query against. | |
1223 (Default: ``None``) | |
1224 | |
1225 Optionally accepts a ``consistent`` parameter, which should be a | |
1226 boolean. If you provide ``True``, it will force a consistent read of | |
1227 the data (more expensive). (Default: ``False`` - use eventually | |
1228 consistent reads) | |
1229 | |
1230 Optionally accepts a ``query_filter`` which is a dictionary of filter | |
1231 conditions against any arbitrary field in the returned data. | |
1232 | |
1233 Optionally accepts a ``conditional_operator`` which applies to the | |
1234 query filter conditions: | |
1235 | |
1236 + `AND` - True if all filter conditions evaluate to true (default) | |
1237 + `OR` - True if at least one filter condition evaluates to true | |
1238 | |
1239 Optionally accept a ``exclusive_start_key`` which is used to get | |
1240 the remaining items when a query cannot return the complete count. | |
1241 | |
1242 Returns an integer which represents the exact amount of matched | |
1243 items. | |
1244 | |
1245 :type scan_index_forward: boolean | |
1246 :param scan_index_forward: Specifies ascending (true) or descending | |
1247 (false) traversal of the index. DynamoDB returns results reflecting | |
1248 the requested order determined by the range key. If the data type | |
1249 is Number, the results are returned in numeric order. For String, | |
1250 the results are returned in order of ASCII character code values. | |
1251 For Binary, DynamoDB treats each byte of the binary data as | |
1252 unsigned when it compares binary values. | |
1253 | |
1254 If ScanIndexForward is not specified, the results are returned in | |
1255 ascending order. | |
1256 | |
1257 :type limit: integer | |
1258 :param limit: The maximum number of items to evaluate (not necessarily | |
1259 the number of matching items). | |
1260 | |
1261 Example:: | |
1262 | |
1263 # Look for last names equal to "Doe". | |
1264 >>> users.query_count(last_name__eq='Doe') | |
1265 5 | |
1266 | |
1267 # Use an LSI & a consistent read. | |
1268 >>> users.query_count( | |
1269 ... date_joined__gte=1236451000, | |
1270 ... owner__eq=1, | |
1271 ... index='DateJoinedIndex', | |
1272 ... consistent=True | |
1273 ... ) | |
1274 2 | |
1275 | |
1276 """ | |
1277 key_conditions = self._build_filters( | |
1278 filter_kwargs, | |
1279 using=QUERY_OPERATORS | |
1280 ) | |
1281 | |
1282 built_query_filter = self._build_filters( | |
1283 query_filter, | |
1284 using=FILTER_OPERATORS | |
1285 ) | |
1286 | |
1287 count_buffer = 0 | |
1288 last_evaluated_key = exclusive_start_key | |
1289 | |
1290 while True: | |
1291 raw_results = self.connection.query( | |
1292 self.table_name, | |
1293 index_name=index, | |
1294 consistent_read=consistent, | |
1295 select='COUNT', | |
1296 key_conditions=key_conditions, | |
1297 query_filter=built_query_filter, | |
1298 conditional_operator=conditional_operator, | |
1299 limit=limit, | |
1300 scan_index_forward=scan_index_forward, | |
1301 exclusive_start_key=last_evaluated_key | |
1302 ) | |
1303 | |
1304 count_buffer += int(raw_results.get('Count', 0)) | |
1305 last_evaluated_key = raw_results.get('LastEvaluatedKey') | |
1306 if not last_evaluated_key or count_buffer < 1: | |
1307 break | |
1308 | |
1309 return count_buffer | |
1310 | |
1311 def _query(self, limit=None, index=None, reverse=False, consistent=False, | |
1312 exclusive_start_key=None, select=None, attributes_to_get=None, | |
1313 query_filter=None, conditional_operator=None, **filter_kwargs): | |
1314 """ | |
1315 The internal method that performs the actual queries. Used extensively | |
1316 by ``ResultSet`` to perform each (paginated) request. | |
1317 """ | |
1318 kwargs = { | |
1319 'limit': limit, | |
1320 'index_name': index, | |
1321 'consistent_read': consistent, | |
1322 'select': select, | |
1323 'attributes_to_get': attributes_to_get, | |
1324 'conditional_operator': conditional_operator, | |
1325 } | |
1326 | |
1327 if reverse: | |
1328 kwargs['scan_index_forward'] = False | |
1329 | |
1330 if exclusive_start_key: | |
1331 kwargs['exclusive_start_key'] = {} | |
1332 | |
1333 for key, value in exclusive_start_key.items(): | |
1334 kwargs['exclusive_start_key'][key] = \ | |
1335 self._dynamizer.encode(value) | |
1336 | |
1337 # Convert the filters into something we can actually use. | |
1338 kwargs['key_conditions'] = self._build_filters( | |
1339 filter_kwargs, | |
1340 using=QUERY_OPERATORS | |
1341 ) | |
1342 | |
1343 kwargs['query_filter'] = self._build_filters( | |
1344 query_filter, | |
1345 using=FILTER_OPERATORS | |
1346 ) | |
1347 | |
1348 raw_results = self.connection.query( | |
1349 self.table_name, | |
1350 **kwargs | |
1351 ) | |
1352 results = [] | |
1353 last_key = None | |
1354 | |
1355 for raw_item in raw_results.get('Items', []): | |
1356 item = Item(self) | |
1357 item.load({ | |
1358 'Item': raw_item, | |
1359 }) | |
1360 results.append(item) | |
1361 | |
1362 if raw_results.get('LastEvaluatedKey', None): | |
1363 last_key = {} | |
1364 | |
1365 for key, value in raw_results['LastEvaluatedKey'].items(): | |
1366 last_key[key] = self._dynamizer.decode(value) | |
1367 | |
1368 return { | |
1369 'results': results, | |
1370 'last_key': last_key, | |
1371 } | |
1372 | |
1373 def scan(self, limit=None, segment=None, total_segments=None, | |
1374 max_page_size=None, attributes=None, conditional_operator=None, | |
1375 **filter_kwargs): | |
1376 """ | |
1377 Scans across all items within a DynamoDB table. | |
1378 | |
1379 Scans can be performed against a hash key or a hash+range key. You can | |
1380 additionally filter the results after the table has been read but | |
1381 before the response is returned by using query filters. | |
1382 | |
1383 To specify the filters of the items you'd like to get, you can specify | |
1384 the filters as kwargs. Each filter kwarg should follow the pattern | |
1385 ``<fieldname>__<filter_operation>=<value_to_look_for>``. | |
1386 | |
1387 Optionally accepts a ``limit`` parameter, which should be an integer | |
1388 count of the total number of items to return. (Default: ``None`` - | |
1389 all results) | |
1390 | |
1391 Optionally accepts a ``segment`` parameter, which should be an integer | |
1392 of the segment to retrieve on. Please see the documentation about | |
1393 Parallel Scans (Default: ``None`` - no segments) | |
1394 | |
1395 Optionally accepts a ``total_segments`` parameter, which should be an | |
1396 integer count of number of segments to divide the table into. | |
1397 Please see the documentation about Parallel Scans (Default: ``None`` - | |
1398 no segments) | |
1399 | |
1400 Optionally accepts a ``max_page_size`` parameter, which should be an | |
1401 integer count of the maximum number of items to retrieve | |
1402 **per-request**. This is useful in making faster requests & prevent | |
1403 the scan from drowning out other queries. (Default: ``None`` - | |
1404 fetch as many as DynamoDB will return) | |
1405 | |
1406 Optionally accepts an ``attributes`` parameter, which should be a | |
1407 tuple. If you provide any attributes only these will be fetched | |
1408 from DynamoDB. This uses the ``AttributesToGet`` and set's | |
1409 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. | |
1410 | |
1411 Returns a ``ResultSet``, which transparently handles the pagination of | |
1412 results you get back. | |
1413 | |
1414 Example:: | |
1415 | |
1416 # All results. | |
1417 >>> everything = users.scan() | |
1418 | |
1419 # Look for last names beginning with "D". | |
1420 >>> results = users.scan(last_name__beginswith='D') | |
1421 >>> for res in results: | |
1422 ... print res['first_name'] | |
1423 'Alice' | |
1424 'John' | |
1425 'Jane' | |
1426 | |
1427 # Use an ``IN`` filter & limit. | |
1428 >>> results = users.scan( | |
1429 ... age__in=[25, 26, 27, 28, 29], | |
1430 ... limit=1 | |
1431 ... ) | |
1432 >>> for res in results: | |
1433 ... print res['first_name'] | |
1434 'Alice' | |
1435 | |
1436 """ | |
1437 results = ResultSet( | |
1438 max_page_size=max_page_size | |
1439 ) | |
1440 kwargs = filter_kwargs.copy() | |
1441 kwargs.update({ | |
1442 'limit': limit, | |
1443 'segment': segment, | |
1444 'total_segments': total_segments, | |
1445 'attributes': attributes, | |
1446 'conditional_operator': conditional_operator, | |
1447 }) | |
1448 results.to_call(self._scan, **kwargs) | |
1449 return results | |
1450 | |
1451 def _scan(self, limit=None, exclusive_start_key=None, segment=None, | |
1452 total_segments=None, attributes=None, conditional_operator=None, | |
1453 **filter_kwargs): | |
1454 """ | |
1455 The internal method that performs the actual scan. Used extensively | |
1456 by ``ResultSet`` to perform each (paginated) request. | |
1457 """ | |
1458 kwargs = { | |
1459 'limit': limit, | |
1460 'segment': segment, | |
1461 'total_segments': total_segments, | |
1462 'attributes_to_get': attributes, | |
1463 'conditional_operator': conditional_operator, | |
1464 } | |
1465 | |
1466 if exclusive_start_key: | |
1467 kwargs['exclusive_start_key'] = {} | |
1468 | |
1469 for key, value in exclusive_start_key.items(): | |
1470 kwargs['exclusive_start_key'][key] = \ | |
1471 self._dynamizer.encode(value) | |
1472 | |
1473 # Convert the filters into something we can actually use. | |
1474 kwargs['scan_filter'] = self._build_filters( | |
1475 filter_kwargs, | |
1476 using=FILTER_OPERATORS | |
1477 ) | |
1478 | |
1479 raw_results = self.connection.scan( | |
1480 self.table_name, | |
1481 **kwargs | |
1482 ) | |
1483 results = [] | |
1484 last_key = None | |
1485 | |
1486 for raw_item in raw_results.get('Items', []): | |
1487 item = Item(self) | |
1488 item.load({ | |
1489 'Item': raw_item, | |
1490 }) | |
1491 results.append(item) | |
1492 | |
1493 if raw_results.get('LastEvaluatedKey', None): | |
1494 last_key = {} | |
1495 | |
1496 for key, value in raw_results['LastEvaluatedKey'].items(): | |
1497 last_key[key] = self._dynamizer.decode(value) | |
1498 | |
1499 return { | |
1500 'results': results, | |
1501 'last_key': last_key, | |
1502 } | |
1503 | |
1504 def batch_get(self, keys, consistent=False, attributes=None): | |
1505 """ | |
1506 Fetches many specific items in batch from a table. | |
1507 | |
1508 Requires a ``keys`` parameter, which should be a list of dictionaries. | |
1509 Each dictionary should consist of the keys values to specify. | |
1510 | |
1511 Optionally accepts a ``consistent`` parameter, which should be a | |
1512 boolean. If you provide ``True``, a strongly consistent read will be | |
1513 used. (Default: False) | |
1514 | |
1515 Optionally accepts an ``attributes`` parameter, which should be a | |
1516 tuple. If you provide any attributes only these will be fetched | |
1517 from DynamoDB. | |
1518 | |
1519 Returns a ``ResultSet``, which transparently handles the pagination of | |
1520 results you get back. | |
1521 | |
1522 Example:: | |
1523 | |
1524 >>> results = users.batch_get(keys=[ | |
1525 ... { | |
1526 ... 'username': 'johndoe', | |
1527 ... }, | |
1528 ... { | |
1529 ... 'username': 'jane', | |
1530 ... }, | |
1531 ... { | |
1532 ... 'username': 'fred', | |
1533 ... }, | |
1534 ... ]) | |
1535 >>> for res in results: | |
1536 ... print res['first_name'] | |
1537 'John' | |
1538 'Jane' | |
1539 'Fred' | |
1540 | |
1541 """ | |
1542 # We pass the keys to the constructor instead, so it can maintain it's | |
1543 # own internal state as to what keys have been processed. | |
1544 results = BatchGetResultSet(keys=keys, max_batch_get=self.max_batch_get) | |
1545 results.to_call(self._batch_get, consistent=consistent, attributes=attributes) | |
1546 return results | |
1547 | |
1548 def _batch_get(self, keys, consistent=False, attributes=None): | |
1549 """ | |
1550 The internal method that performs the actual batch get. Used extensively | |
1551 by ``BatchGetResultSet`` to perform each (paginated) request. | |
1552 """ | |
1553 items = { | |
1554 self.table_name: { | |
1555 'Keys': [], | |
1556 }, | |
1557 } | |
1558 | |
1559 if consistent: | |
1560 items[self.table_name]['ConsistentRead'] = True | |
1561 | |
1562 if attributes is not None: | |
1563 items[self.table_name]['AttributesToGet'] = attributes | |
1564 | |
1565 for key_data in keys: | |
1566 raw_key = {} | |
1567 | |
1568 for key, value in key_data.items(): | |
1569 raw_key[key] = self._dynamizer.encode(value) | |
1570 | |
1571 items[self.table_name]['Keys'].append(raw_key) | |
1572 | |
1573 raw_results = self.connection.batch_get_item(request_items=items) | |
1574 results = [] | |
1575 unprocessed_keys = [] | |
1576 | |
1577 for raw_item in raw_results['Responses'].get(self.table_name, []): | |
1578 item = Item(self) | |
1579 item.load({ | |
1580 'Item': raw_item, | |
1581 }) | |
1582 results.append(item) | |
1583 | |
1584 raw_unprocessed = raw_results.get('UnprocessedKeys', {}).get(self.table_name, {}) | |
1585 | |
1586 for raw_key in raw_unprocessed.get('Keys', []): | |
1587 py_key = {} | |
1588 | |
1589 for key, value in raw_key.items(): | |
1590 py_key[key] = self._dynamizer.decode(value) | |
1591 | |
1592 unprocessed_keys.append(py_key) | |
1593 | |
1594 return { | |
1595 'results': results, | |
1596 # NEVER return a ``last_key``. Just in-case any part of | |
1597 # ``ResultSet`` peeks through, since much of the | |
1598 # original underlying implementation is based on this key. | |
1599 'last_key': None, | |
1600 'unprocessed_keys': unprocessed_keys, | |
1601 } | |
1602 | |
1603 def count(self): | |
1604 """ | |
1605 Returns a (very) eventually consistent count of the number of items | |
1606 in a table. | |
1607 | |
1608 Lag time is about 6 hours, so don't expect a high degree of accuracy. | |
1609 | |
1610 Example:: | |
1611 | |
1612 >>> users.count() | |
1613 6 | |
1614 | |
1615 """ | |
1616 info = self.describe() | |
1617 return info['Table'].get('ItemCount', 0) | |
1618 | |
1619 | |
1620 class BatchTable(object): | |
1621 """ | |
1622 Used by ``Table`` as the context manager for batch writes. | |
1623 | |
1624 You likely don't want to try to use this object directly. | |
1625 """ | |
1626 def __init__(self, table): | |
1627 self.table = table | |
1628 self._to_put = [] | |
1629 self._to_delete = [] | |
1630 self._unprocessed = [] | |
1631 | |
1632 def __enter__(self): | |
1633 return self | |
1634 | |
1635 def __exit__(self, type, value, traceback): | |
1636 if self._to_put or self._to_delete: | |
1637 # Flush anything that's left. | |
1638 self.flush() | |
1639 | |
1640 if self._unprocessed: | |
1641 # Finally, handle anything that wasn't processed. | |
1642 self.resend_unprocessed() | |
1643 | |
1644 def put_item(self, data, overwrite=False): | |
1645 self._to_put.append(data) | |
1646 | |
1647 if self.should_flush(): | |
1648 self.flush() | |
1649 | |
1650 def delete_item(self, **kwargs): | |
1651 self._to_delete.append(kwargs) | |
1652 | |
1653 if self.should_flush(): | |
1654 self.flush() | |
1655 | |
1656 def should_flush(self): | |
1657 if len(self._to_put) + len(self._to_delete) == 25: | |
1658 return True | |
1659 | |
1660 return False | |
1661 | |
1662 def flush(self): | |
1663 batch_data = { | |
1664 self.table.table_name: [ | |
1665 # We'll insert data here shortly. | |
1666 ], | |
1667 } | |
1668 | |
1669 for put in self._to_put: | |
1670 item = Item(self.table, data=put) | |
1671 batch_data[self.table.table_name].append({ | |
1672 'PutRequest': { | |
1673 'Item': item.prepare_full(), | |
1674 } | |
1675 }) | |
1676 | |
1677 for delete in self._to_delete: | |
1678 batch_data[self.table.table_name].append({ | |
1679 'DeleteRequest': { | |
1680 'Key': self.table._encode_keys(delete), | |
1681 } | |
1682 }) | |
1683 | |
1684 resp = self.table.connection.batch_write_item(batch_data) | |
1685 self.handle_unprocessed(resp) | |
1686 | |
1687 self._to_put = [] | |
1688 self._to_delete = [] | |
1689 return True | |
1690 | |
1691 def handle_unprocessed(self, resp): | |
1692 if len(resp.get('UnprocessedItems', [])): | |
1693 table_name = self.table.table_name | |
1694 unprocessed = resp['UnprocessedItems'].get(table_name, []) | |
1695 | |
1696 # Some items have not been processed. Stow them for now & | |
1697 # re-attempt processing on ``__exit__``. | |
1698 msg = "%s items were unprocessed. Storing for later." | |
1699 boto.log.info(msg % len(unprocessed)) | |
1700 self._unprocessed.extend(unprocessed) | |
1701 | |
1702 def resend_unprocessed(self): | |
1703 # If there are unprocessed records (for instance, the user was over | |
1704 # their throughput limitations), iterate over them & send until they're | |
1705 # all there. | |
1706 boto.log.info( | |
1707 "Re-sending %s unprocessed items." % len(self._unprocessed) | |
1708 ) | |
1709 | |
1710 while len(self._unprocessed): | |
1711 # Again, do 25 at a time. | |
1712 to_resend = self._unprocessed[:25] | |
1713 # Remove them from the list. | |
1714 self._unprocessed = self._unprocessed[25:] | |
1715 batch_data = { | |
1716 self.table.table_name: to_resend | |
1717 } | |
1718 boto.log.info("Sending %s items" % len(to_resend)) | |
1719 resp = self.table.connection.batch_write_item(batch_data) | |
1720 self.handle_unprocessed(resp) | |
1721 boto.log.info( | |
1722 "%s unprocessed items left" % len(self._unprocessed) | |
1723 ) |