Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/boto/dynamodb2/table.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 import boto | |
| 2 from boto.dynamodb2 import exceptions | |
| 3 from boto.dynamodb2.fields import (HashKey, RangeKey, | |
| 4 AllIndex, KeysOnlyIndex, IncludeIndex, | |
| 5 GlobalAllIndex, GlobalKeysOnlyIndex, | |
| 6 GlobalIncludeIndex) | |
| 7 from boto.dynamodb2.items import Item | |
| 8 from boto.dynamodb2.layer1 import DynamoDBConnection | |
| 9 from boto.dynamodb2.results import ResultSet, BatchGetResultSet | |
| 10 from boto.dynamodb2.types import (NonBooleanDynamizer, Dynamizer, FILTER_OPERATORS, | |
| 11 QUERY_OPERATORS, STRING) | |
| 12 from boto.exception import JSONResponseError | |
| 13 | |
| 14 | |
| 15 class Table(object): | |
| 16 """ | |
| 17 Interacts & models the behavior of a DynamoDB table. | |
| 18 | |
| 19 The ``Table`` object represents a set (or rough categorization) of | |
| 20 records within DynamoDB. The important part is that all records within the | |
| 21 table, while largely-schema-free, share the same schema & are essentially | |
| 22 namespaced for use in your application. For example, you might have a | |
| 23 ``users`` table or a ``forums`` table. | |
| 24 """ | |
| 25 max_batch_get = 100 | |
| 26 | |
| 27 _PROJECTION_TYPE_TO_INDEX = dict( | |
| 28 global_indexes=dict( | |
| 29 ALL=GlobalAllIndex, | |
| 30 KEYS_ONLY=GlobalKeysOnlyIndex, | |
| 31 INCLUDE=GlobalIncludeIndex, | |
| 32 ), local_indexes=dict( | |
| 33 ALL=AllIndex, | |
| 34 KEYS_ONLY=KeysOnlyIndex, | |
| 35 INCLUDE=IncludeIndex, | |
| 36 ) | |
| 37 ) | |
| 38 | |
| 39 def __init__(self, table_name, schema=None, throughput=None, indexes=None, | |
| 40 global_indexes=None, connection=None): | |
| 41 """ | |
| 42 Sets up a new in-memory ``Table``. | |
| 43 | |
| 44 This is useful if the table already exists within DynamoDB & you simply | |
| 45 want to use it for additional interactions. The only required parameter | |
| 46 is the ``table_name``. However, under the hood, the object will call | |
| 47 ``describe_table`` to determine the schema/indexes/throughput. You | |
| 48 can avoid this extra call by passing in ``schema`` & ``indexes``. | |
| 49 | |
| 50 **IMPORTANT** - If you're creating a new ``Table`` for the first time, | |
| 51 you should use the ``Table.create`` method instead, as it will | |
| 52 persist the table structure to DynamoDB. | |
| 53 | |
| 54 Requires a ``table_name`` parameter, which should be a simple string | |
| 55 of the name of the table. | |
| 56 | |
| 57 Optionally accepts a ``schema`` parameter, which should be a list of | |
| 58 ``BaseSchemaField`` subclasses representing the desired schema. | |
| 59 | |
| 60 Optionally accepts a ``throughput`` parameter, which should be a | |
| 61 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
| 62 both of which should have an integer value associated with them. | |
| 63 | |
| 64 Optionally accepts a ``indexes`` parameter, which should be a list of | |
| 65 ``BaseIndexField`` subclasses representing the desired indexes. | |
| 66 | |
| 67 Optionally accepts a ``global_indexes`` parameter, which should be a | |
| 68 list of ``GlobalBaseIndexField`` subclasses representing the desired | |
| 69 indexes. | |
| 70 | |
| 71 Optionally accepts a ``connection`` parameter, which should be a | |
| 72 ``DynamoDBConnection`` instance (or subclass). This is primarily useful | |
| 73 for specifying alternate connection parameters. | |
| 74 | |
| 75 Example:: | |
| 76 | |
| 77 # The simple, it-already-exists case. | |
| 78 >>> conn = Table('users') | |
| 79 | |
| 80 # The full, minimum-extra-calls case. | |
| 81 >>> from boto import dynamodb2 | |
| 82 >>> users = Table('users', schema=[ | |
| 83 ... HashKey('username'), | |
| 84 ... RangeKey('date_joined', data_type=NUMBER) | |
| 85 ... ], throughput={ | |
| 86 ... 'read':20, | |
| 87 ... 'write': 10, | |
| 88 ... }, indexes=[ | |
| 89 ... KeysOnlyIndex('MostRecentlyJoined', parts=[ | |
| 90 ... HashKey('username') | |
| 91 ... RangeKey('date_joined') | |
| 92 ... ]), | |
| 93 ... ], global_indexes=[ | |
| 94 ... GlobalAllIndex('UsersByZipcode', parts=[ | |
| 95 ... HashKey('zipcode'), | |
| 96 ... RangeKey('username'), | |
| 97 ... ], | |
| 98 ... throughput={ | |
| 99 ... 'read':10, | |
| 100 ... 'write':10, | |
| 101 ... }), | |
| 102 ... ], connection=dynamodb2.connect_to_region('us-west-2', | |
| 103 ... aws_access_key_id='key', | |
| 104 ... aws_secret_access_key='key', | |
| 105 ... )) | |
| 106 | |
| 107 """ | |
| 108 self.table_name = table_name | |
| 109 self.connection = connection | |
| 110 self.throughput = { | |
| 111 'read': 5, | |
| 112 'write': 5, | |
| 113 } | |
| 114 self.schema = schema | |
| 115 self.indexes = indexes | |
| 116 self.global_indexes = global_indexes | |
| 117 | |
| 118 if self.connection is None: | |
| 119 self.connection = DynamoDBConnection() | |
| 120 | |
| 121 if throughput is not None: | |
| 122 self.throughput = throughput | |
| 123 | |
| 124 self._dynamizer = NonBooleanDynamizer() | |
| 125 | |
| 126 def use_boolean(self): | |
| 127 self._dynamizer = Dynamizer() | |
| 128 | |
| 129 @classmethod | |
| 130 def create(cls, table_name, schema, throughput=None, indexes=None, | |
| 131 global_indexes=None, connection=None): | |
| 132 """ | |
| 133 Creates a new table in DynamoDB & returns an in-memory ``Table`` object. | |
| 134 | |
| 135 This will setup a brand new table within DynamoDB. The ``table_name`` | |
| 136 must be unique for your AWS account. The ``schema`` is also required | |
| 137 to define the key structure of the table. | |
| 138 | |
| 139 **IMPORTANT** - You should consider the usage pattern of your table | |
| 140 up-front, as the schema can **NOT** be modified once the table is | |
| 141 created, requiring the creation of a new table & migrating the data | |
| 142 should you wish to revise it. | |
| 143 | |
| 144 **IMPORTANT** - If the table already exists in DynamoDB, additional | |
| 145 calls to this method will result in an error. If you just need | |
| 146 a ``Table`` object to interact with the existing table, you should | |
| 147 just initialize a new ``Table`` object, which requires only the | |
| 148 ``table_name``. | |
| 149 | |
| 150 Requires a ``table_name`` parameter, which should be a simple string | |
| 151 of the name of the table. | |
| 152 | |
| 153 Requires a ``schema`` parameter, which should be a list of | |
| 154 ``BaseSchemaField`` subclasses representing the desired schema. | |
| 155 | |
| 156 Optionally accepts a ``throughput`` parameter, which should be a | |
| 157 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
| 158 both of which should have an integer value associated with them. | |
| 159 | |
| 160 Optionally accepts a ``indexes`` parameter, which should be a list of | |
| 161 ``BaseIndexField`` subclasses representing the desired indexes. | |
| 162 | |
| 163 Optionally accepts a ``global_indexes`` parameter, which should be a | |
| 164 list of ``GlobalBaseIndexField`` subclasses representing the desired | |
| 165 indexes. | |
| 166 | |
| 167 Optionally accepts a ``connection`` parameter, which should be a | |
| 168 ``DynamoDBConnection`` instance (or subclass). This is primarily useful | |
| 169 for specifying alternate connection parameters. | |
| 170 | |
| 171 Example:: | |
| 172 | |
| 173 >>> users = Table.create('users', schema=[ | |
| 174 ... HashKey('username'), | |
| 175 ... RangeKey('date_joined', data_type=NUMBER) | |
| 176 ... ], throughput={ | |
| 177 ... 'read':20, | |
| 178 ... 'write': 10, | |
| 179 ... }, indexes=[ | |
| 180 ... KeysOnlyIndex('MostRecentlyJoined', parts=[ | |
| 181 ... HashKey('username'), | |
| 182 ... RangeKey('date_joined'), | |
| 183 ... ]), global_indexes=[ | |
| 184 ... GlobalAllIndex('UsersByZipcode', parts=[ | |
| 185 ... HashKey('zipcode'), | |
| 186 ... RangeKey('username'), | |
| 187 ... ], | |
| 188 ... throughput={ | |
| 189 ... 'read':10, | |
| 190 ... 'write':10, | |
| 191 ... }), | |
| 192 ... ]) | |
| 193 | |
| 194 """ | |
| 195 table = cls(table_name=table_name, connection=connection) | |
| 196 table.schema = schema | |
| 197 | |
| 198 if throughput is not None: | |
| 199 table.throughput = throughput | |
| 200 | |
| 201 if indexes is not None: | |
| 202 table.indexes = indexes | |
| 203 | |
| 204 if global_indexes is not None: | |
| 205 table.global_indexes = global_indexes | |
| 206 | |
| 207 # Prep the schema. | |
| 208 raw_schema = [] | |
| 209 attr_defs = [] | |
| 210 seen_attrs = set() | |
| 211 | |
| 212 for field in table.schema: | |
| 213 raw_schema.append(field.schema()) | |
| 214 # Build the attributes off what we know. | |
| 215 seen_attrs.add(field.name) | |
| 216 attr_defs.append(field.definition()) | |
| 217 | |
| 218 raw_throughput = { | |
| 219 'ReadCapacityUnits': int(table.throughput['read']), | |
| 220 'WriteCapacityUnits': int(table.throughput['write']), | |
| 221 } | |
| 222 kwargs = {} | |
| 223 | |
| 224 kwarg_map = { | |
| 225 'indexes': 'local_secondary_indexes', | |
| 226 'global_indexes': 'global_secondary_indexes', | |
| 227 } | |
| 228 for index_attr in ('indexes', 'global_indexes'): | |
| 229 table_indexes = getattr(table, index_attr) | |
| 230 if table_indexes: | |
| 231 raw_indexes = [] | |
| 232 for index_field in table_indexes: | |
| 233 raw_indexes.append(index_field.schema()) | |
| 234 # Make sure all attributes specified in the indexes are | |
| 235 # added to the definition | |
| 236 for field in index_field.parts: | |
| 237 if field.name not in seen_attrs: | |
| 238 seen_attrs.add(field.name) | |
| 239 attr_defs.append(field.definition()) | |
| 240 | |
| 241 kwargs[kwarg_map[index_attr]] = raw_indexes | |
| 242 | |
| 243 table.connection.create_table( | |
| 244 table_name=table.table_name, | |
| 245 attribute_definitions=attr_defs, | |
| 246 key_schema=raw_schema, | |
| 247 provisioned_throughput=raw_throughput, | |
| 248 **kwargs | |
| 249 ) | |
| 250 return table | |
| 251 | |
| 252 def _introspect_schema(self, raw_schema, raw_attributes=None): | |
| 253 """ | |
| 254 Given a raw schema structure back from a DynamoDB response, parse | |
| 255 out & build the high-level Python objects that represent them. | |
| 256 """ | |
| 257 schema = [] | |
| 258 sane_attributes = {} | |
| 259 | |
| 260 if raw_attributes: | |
| 261 for field in raw_attributes: | |
| 262 sane_attributes[field['AttributeName']] = field['AttributeType'] | |
| 263 | |
| 264 for field in raw_schema: | |
| 265 data_type = sane_attributes.get(field['AttributeName'], STRING) | |
| 266 | |
| 267 if field['KeyType'] == 'HASH': | |
| 268 schema.append( | |
| 269 HashKey(field['AttributeName'], data_type=data_type) | |
| 270 ) | |
| 271 elif field['KeyType'] == 'RANGE': | |
| 272 schema.append( | |
| 273 RangeKey(field['AttributeName'], data_type=data_type) | |
| 274 ) | |
| 275 else: | |
| 276 raise exceptions.UnknownSchemaFieldError( | |
| 277 "%s was seen, but is unknown. Please report this at " | |
| 278 "https://github.com/boto/boto/issues." % field['KeyType'] | |
| 279 ) | |
| 280 | |
| 281 return schema | |
| 282 | |
| 283 def _introspect_all_indexes(self, raw_indexes, map_indexes_projection): | |
| 284 """ | |
| 285 Given a raw index/global index structure back from a DynamoDB response, | |
| 286 parse out & build the high-level Python objects that represent them. | |
| 287 """ | |
| 288 indexes = [] | |
| 289 | |
| 290 for field in raw_indexes: | |
| 291 index_klass = map_indexes_projection.get('ALL') | |
| 292 kwargs = { | |
| 293 'parts': [] | |
| 294 } | |
| 295 | |
| 296 if field['Projection']['ProjectionType'] == 'ALL': | |
| 297 index_klass = map_indexes_projection.get('ALL') | |
| 298 elif field['Projection']['ProjectionType'] == 'KEYS_ONLY': | |
| 299 index_klass = map_indexes_projection.get('KEYS_ONLY') | |
| 300 elif field['Projection']['ProjectionType'] == 'INCLUDE': | |
| 301 index_klass = map_indexes_projection.get('INCLUDE') | |
| 302 kwargs['includes'] = field['Projection']['NonKeyAttributes'] | |
| 303 else: | |
| 304 raise exceptions.UnknownIndexFieldError( | |
| 305 "%s was seen, but is unknown. Please report this at " | |
| 306 "https://github.com/boto/boto/issues." % \ | |
| 307 field['Projection']['ProjectionType'] | |
| 308 ) | |
| 309 | |
| 310 name = field['IndexName'] | |
| 311 kwargs['parts'] = self._introspect_schema(field['KeySchema'], None) | |
| 312 indexes.append(index_klass(name, **kwargs)) | |
| 313 | |
| 314 return indexes | |
| 315 | |
| 316 def _introspect_indexes(self, raw_indexes): | |
| 317 """ | |
| 318 Given a raw index structure back from a DynamoDB response, parse | |
| 319 out & build the high-level Python objects that represent them. | |
| 320 """ | |
| 321 return self._introspect_all_indexes( | |
| 322 raw_indexes, self._PROJECTION_TYPE_TO_INDEX.get('local_indexes')) | |
| 323 | |
| 324 def _introspect_global_indexes(self, raw_global_indexes): | |
| 325 """ | |
| 326 Given a raw global index structure back from a DynamoDB response, parse | |
| 327 out & build the high-level Python objects that represent them. | |
| 328 """ | |
| 329 return self._introspect_all_indexes( | |
| 330 raw_global_indexes, | |
| 331 self._PROJECTION_TYPE_TO_INDEX.get('global_indexes')) | |
| 332 | |
| 333 def describe(self): | |
| 334 """ | |
| 335 Describes the current structure of the table in DynamoDB. | |
| 336 | |
| 337 This information will be used to update the ``schema``, ``indexes``, | |
| 338 ``global_indexes`` and ``throughput`` information on the ``Table``. Some | |
| 339 calls, such as those involving creating keys or querying, will require | |
| 340 this information to be populated. | |
| 341 | |
| 342 It also returns the full raw data structure from DynamoDB, in the | |
| 343 event you'd like to parse out additional information (such as the | |
| 344 ``ItemCount`` or usage information). | |
| 345 | |
| 346 Example:: | |
| 347 | |
| 348 >>> users.describe() | |
| 349 { | |
| 350 # Lots of keys here... | |
| 351 } | |
| 352 >>> len(users.schema) | |
| 353 2 | |
| 354 | |
| 355 """ | |
| 356 result = self.connection.describe_table(self.table_name) | |
| 357 | |
| 358 # Blindly update throughput, since what's on DynamoDB's end is likely | |
| 359 # more correct. | |
| 360 raw_throughput = result['Table']['ProvisionedThroughput'] | |
| 361 self.throughput['read'] = int(raw_throughput['ReadCapacityUnits']) | |
| 362 self.throughput['write'] = int(raw_throughput['WriteCapacityUnits']) | |
| 363 | |
| 364 if not self.schema: | |
| 365 # Since we have the data, build the schema. | |
| 366 raw_schema = result['Table'].get('KeySchema', []) | |
| 367 raw_attributes = result['Table'].get('AttributeDefinitions', []) | |
| 368 self.schema = self._introspect_schema(raw_schema, raw_attributes) | |
| 369 | |
| 370 if not self.indexes: | |
| 371 # Build the index information as well. | |
| 372 raw_indexes = result['Table'].get('LocalSecondaryIndexes', []) | |
| 373 self.indexes = self._introspect_indexes(raw_indexes) | |
| 374 | |
| 375 # Build the global index information as well. | |
| 376 raw_global_indexes = result['Table'].get('GlobalSecondaryIndexes', []) | |
| 377 self.global_indexes = self._introspect_global_indexes(raw_global_indexes) | |
| 378 | |
| 379 # This is leaky. | |
| 380 return result | |
| 381 | |
| 382 def update(self, throughput=None, global_indexes=None): | |
| 383 """ | |
| 384 Updates table attributes and global indexes in DynamoDB. | |
| 385 | |
| 386 Optionally accepts a ``throughput`` parameter, which should be a | |
| 387 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
| 388 both of which should have an integer value associated with them. | |
| 389 | |
| 390 Optionally accepts a ``global_indexes`` parameter, which should be a | |
| 391 dictionary. If provided, it should specify the index name, which is also | |
| 392 a dict containing a ``read`` & ``write`` key, both of which | |
| 393 should have an integer value associated with them. If you are writing | |
| 394 new code, please use ``Table.update_global_secondary_index``. | |
| 395 | |
| 396 Returns ``True`` on success. | |
| 397 | |
| 398 Example:: | |
| 399 | |
| 400 # For a read-heavier application... | |
| 401 >>> users.update(throughput={ | |
| 402 ... 'read': 20, | |
| 403 ... 'write': 10, | |
| 404 ... }) | |
| 405 True | |
| 406 | |
| 407 # To also update the global index(es) throughput. | |
| 408 >>> users.update(throughput={ | |
| 409 ... 'read': 20, | |
| 410 ... 'write': 10, | |
| 411 ... }, | |
| 412 ... global_secondary_indexes={ | |
| 413 ... 'TheIndexNameHere': { | |
| 414 ... 'read': 15, | |
| 415 ... 'write': 5, | |
| 416 ... } | |
| 417 ... }) | |
| 418 True | |
| 419 """ | |
| 420 | |
| 421 data = None | |
| 422 | |
| 423 if throughput: | |
| 424 self.throughput = throughput | |
| 425 data = { | |
| 426 'ReadCapacityUnits': int(self.throughput['read']), | |
| 427 'WriteCapacityUnits': int(self.throughput['write']), | |
| 428 } | |
| 429 | |
| 430 gsi_data = None | |
| 431 | |
| 432 if global_indexes: | |
| 433 gsi_data = [] | |
| 434 | |
| 435 for gsi_name, gsi_throughput in global_indexes.items(): | |
| 436 gsi_data.append({ | |
| 437 "Update": { | |
| 438 "IndexName": gsi_name, | |
| 439 "ProvisionedThroughput": { | |
| 440 "ReadCapacityUnits": int(gsi_throughput['read']), | |
| 441 "WriteCapacityUnits": int(gsi_throughput['write']), | |
| 442 }, | |
| 443 }, | |
| 444 }) | |
| 445 | |
| 446 if throughput or global_indexes: | |
| 447 self.connection.update_table( | |
| 448 self.table_name, | |
| 449 provisioned_throughput=data, | |
| 450 global_secondary_index_updates=gsi_data, | |
| 451 ) | |
| 452 | |
| 453 return True | |
| 454 else: | |
| 455 msg = 'You need to provide either the throughput or the ' \ | |
| 456 'global_indexes to update method' | |
| 457 boto.log.error(msg) | |
| 458 | |
| 459 return False | |
| 460 | |
| 461 def create_global_secondary_index(self, global_index): | |
| 462 """ | |
| 463 Creates a global index in DynamoDB after the table has been created. | |
| 464 | |
| 465 Requires a ``global_indexes`` parameter, which should be a | |
| 466 ``GlobalBaseIndexField`` subclass representing the desired index. | |
| 467 | |
| 468 To update ``global_indexes`` information on the ``Table``, you'll need | |
| 469 to call ``Table.describe``. | |
| 470 | |
| 471 Returns ``True`` on success. | |
| 472 | |
| 473 Example:: | |
| 474 | |
| 475 # To create a global index | |
| 476 >>> users.create_global_secondary_index( | |
| 477 ... global_index=GlobalAllIndex( | |
| 478 ... 'TheIndexNameHere', parts=[ | |
| 479 ... HashKey('requiredHashkey', data_type=STRING), | |
| 480 ... RangeKey('optionalRangeKey', data_type=STRING) | |
| 481 ... ], | |
| 482 ... throughput={ | |
| 483 ... 'read': 2, | |
| 484 ... 'write': 1, | |
| 485 ... }) | |
| 486 ... ) | |
| 487 True | |
| 488 | |
| 489 """ | |
| 490 | |
| 491 if global_index: | |
| 492 gsi_data = [] | |
| 493 gsi_data_attr_def = [] | |
| 494 | |
| 495 gsi_data.append({ | |
| 496 "Create": global_index.schema() | |
| 497 }) | |
| 498 | |
| 499 for attr_def in global_index.parts: | |
| 500 gsi_data_attr_def.append(attr_def.definition()) | |
| 501 | |
| 502 self.connection.update_table( | |
| 503 self.table_name, | |
| 504 global_secondary_index_updates=gsi_data, | |
| 505 attribute_definitions=gsi_data_attr_def | |
| 506 ) | |
| 507 | |
| 508 return True | |
| 509 else: | |
| 510 msg = 'You need to provide the global_index to ' \ | |
| 511 'create_global_secondary_index method' | |
| 512 boto.log.error(msg) | |
| 513 | |
| 514 return False | |
| 515 | |
| 516 def delete_global_secondary_index(self, global_index_name): | |
| 517 """ | |
| 518 Deletes a global index in DynamoDB after the table has been created. | |
| 519 | |
| 520 Requires a ``global_index_name`` parameter, which should be a simple | |
| 521 string of the name of the global secondary index. | |
| 522 | |
| 523 To update ``global_indexes`` information on the ``Table``, you'll need | |
| 524 to call ``Table.describe``. | |
| 525 | |
| 526 Returns ``True`` on success. | |
| 527 | |
| 528 Example:: | |
| 529 | |
| 530 # To delete a global index | |
| 531 >>> users.delete_global_secondary_index('TheIndexNameHere') | |
| 532 True | |
| 533 | |
| 534 """ | |
| 535 | |
| 536 if global_index_name: | |
| 537 gsi_data = [ | |
| 538 { | |
| 539 "Delete": { | |
| 540 "IndexName": global_index_name | |
| 541 } | |
| 542 } | |
| 543 ] | |
| 544 | |
| 545 self.connection.update_table( | |
| 546 self.table_name, | |
| 547 global_secondary_index_updates=gsi_data, | |
| 548 ) | |
| 549 | |
| 550 return True | |
| 551 else: | |
| 552 msg = 'You need to provide the global index name to ' \ | |
| 553 'delete_global_secondary_index method' | |
| 554 boto.log.error(msg) | |
| 555 | |
| 556 return False | |
| 557 | |
| 558 def update_global_secondary_index(self, global_indexes): | |
| 559 """ | |
| 560 Updates a global index(es) in DynamoDB after the table has been created. | |
| 561 | |
| 562 Requires a ``global_indexes`` parameter, which should be a | |
| 563 dictionary. If provided, it should specify the index name, which is also | |
| 564 a dict containing a ``read`` & ``write`` key, both of which | |
| 565 should have an integer value associated with them. | |
| 566 | |
| 567 To update ``global_indexes`` information on the ``Table``, you'll need | |
| 568 to call ``Table.describe``. | |
| 569 | |
| 570 Returns ``True`` on success. | |
| 571 | |
| 572 Example:: | |
| 573 | |
| 574 # To update a global index | |
| 575 >>> users.update_global_secondary_index(global_indexes={ | |
| 576 ... 'TheIndexNameHere': { | |
| 577 ... 'read': 15, | |
| 578 ... 'write': 5, | |
| 579 ... } | |
| 580 ... }) | |
| 581 True | |
| 582 | |
| 583 """ | |
| 584 | |
| 585 if global_indexes: | |
| 586 gsi_data = [] | |
| 587 | |
| 588 for gsi_name, gsi_throughput in global_indexes.items(): | |
| 589 gsi_data.append({ | |
| 590 "Update": { | |
| 591 "IndexName": gsi_name, | |
| 592 "ProvisionedThroughput": { | |
| 593 "ReadCapacityUnits": int(gsi_throughput['read']), | |
| 594 "WriteCapacityUnits": int(gsi_throughput['write']), | |
| 595 }, | |
| 596 }, | |
| 597 }) | |
| 598 | |
| 599 self.connection.update_table( | |
| 600 self.table_name, | |
| 601 global_secondary_index_updates=gsi_data, | |
| 602 ) | |
| 603 return True | |
| 604 else: | |
| 605 msg = 'You need to provide the global indexes to ' \ | |
| 606 'update_global_secondary_index method' | |
| 607 boto.log.error(msg) | |
| 608 | |
| 609 return False | |
| 610 | |
| 611 def delete(self): | |
| 612 """ | |
| 613 Deletes a table in DynamoDB. | |
| 614 | |
| 615 **IMPORTANT** - Be careful when using this method, there is no undo. | |
| 616 | |
| 617 Returns ``True`` on success. | |
| 618 | |
| 619 Example:: | |
| 620 | |
| 621 >>> users.delete() | |
| 622 True | |
| 623 | |
| 624 """ | |
| 625 self.connection.delete_table(self.table_name) | |
| 626 return True | |
| 627 | |
| 628 def _encode_keys(self, keys): | |
| 629 """ | |
| 630 Given a flat Python dictionary of keys/values, converts it into the | |
| 631 nested dictionary DynamoDB expects. | |
| 632 | |
| 633 Converts:: | |
| 634 | |
| 635 { | |
| 636 'username': 'john', | |
| 637 'tags': [1, 2, 5], | |
| 638 } | |
| 639 | |
| 640 ...to...:: | |
| 641 | |
| 642 { | |
| 643 'username': {'S': 'john'}, | |
| 644 'tags': {'NS': ['1', '2', '5']}, | |
| 645 } | |
| 646 | |
| 647 """ | |
| 648 raw_key = {} | |
| 649 | |
| 650 for key, value in keys.items(): | |
| 651 raw_key[key] = self._dynamizer.encode(value) | |
| 652 | |
| 653 return raw_key | |
| 654 | |
| 655 def get_item(self, consistent=False, attributes=None, **kwargs): | |
| 656 """ | |
| 657 Fetches an item (record) from a table in DynamoDB. | |
| 658 | |
| 659 To specify the key of the item you'd like to get, you can specify the | |
| 660 key attributes as kwargs. | |
| 661 | |
| 662 Optionally accepts a ``consistent`` parameter, which should be a | |
| 663 boolean. If you provide ``True``, it will perform | |
| 664 a consistent (but more expensive) read from DynamoDB. | |
| 665 (Default: ``False``) | |
| 666 | |
| 667 Optionally accepts an ``attributes`` parameter, which should be a | |
| 668 list of fieldname to fetch. (Default: ``None``, which means all fields | |
| 669 should be fetched) | |
| 670 | |
| 671 Returns an ``Item`` instance containing all the data for that record. | |
| 672 | |
| 673 Raises an ``ItemNotFound`` exception if the item is not found. | |
| 674 | |
| 675 Example:: | |
| 676 | |
| 677 # A simple hash key. | |
| 678 >>> john = users.get_item(username='johndoe') | |
| 679 >>> john['first_name'] | |
| 680 'John' | |
| 681 | |
| 682 # A complex hash+range key. | |
| 683 >>> john = users.get_item(username='johndoe', last_name='Doe') | |
| 684 >>> john['first_name'] | |
| 685 'John' | |
| 686 | |
| 687 # A consistent read (assuming the data might have just changed). | |
| 688 >>> john = users.get_item(username='johndoe', consistent=True) | |
| 689 >>> john['first_name'] | |
| 690 'Johann' | |
| 691 | |
| 692 # With a key that is an invalid variable name in Python. | |
| 693 # Also, assumes a different schema than previous examples. | |
| 694 >>> john = users.get_item(**{ | |
| 695 ... 'date-joined': 127549192, | |
| 696 ... }) | |
| 697 >>> john['first_name'] | |
| 698 'John' | |
| 699 | |
| 700 """ | |
| 701 raw_key = self._encode_keys(kwargs) | |
| 702 item_data = self.connection.get_item( | |
| 703 self.table_name, | |
| 704 raw_key, | |
| 705 attributes_to_get=attributes, | |
| 706 consistent_read=consistent | |
| 707 ) | |
| 708 if 'Item' not in item_data: | |
| 709 raise exceptions.ItemNotFound("Item %s couldn't be found." % kwargs) | |
| 710 item = Item(self) | |
| 711 item.load(item_data) | |
| 712 return item | |
| 713 | |
| 714 def has_item(self, **kwargs): | |
| 715 """ | |
| 716 Return whether an item (record) exists within a table in DynamoDB. | |
| 717 | |
| 718 To specify the key of the item you'd like to get, you can specify the | |
| 719 key attributes as kwargs. | |
| 720 | |
| 721 Optionally accepts a ``consistent`` parameter, which should be a | |
| 722 boolean. If you provide ``True``, it will perform | |
| 723 a consistent (but more expensive) read from DynamoDB. | |
| 724 (Default: ``False``) | |
| 725 | |
| 726 Optionally accepts an ``attributes`` parameter, which should be a | |
| 727 list of fieldnames to fetch. (Default: ``None``, which means all fields | |
| 728 should be fetched) | |
| 729 | |
| 730 Returns ``True`` if an ``Item`` is present, ``False`` if not. | |
| 731 | |
| 732 Example:: | |
| 733 | |
| 734 # Simple, just hash-key schema. | |
| 735 >>> users.has_item(username='johndoe') | |
| 736 True | |
| 737 | |
| 738 # Complex schema, item not present. | |
| 739 >>> users.has_item( | |
| 740 ... username='johndoe', | |
| 741 ... date_joined='2014-01-07' | |
| 742 ... ) | |
| 743 False | |
| 744 | |
| 745 """ | |
| 746 try: | |
| 747 self.get_item(**kwargs) | |
| 748 except (JSONResponseError, exceptions.ItemNotFound): | |
| 749 return False | |
| 750 | |
| 751 return True | |
| 752 | |
| 753 def lookup(self, *args, **kwargs): | |
| 754 """ | |
| 755 Look up an entry in DynamoDB. This is mostly backwards compatible | |
| 756 with boto.dynamodb. Unlike get_item, it takes hash_key and range_key first, | |
| 757 although you may still specify keyword arguments instead. | |
| 758 | |
| 759 Also unlike the get_item command, if the returned item has no keys | |
| 760 (i.e., it does not exist in DynamoDB), a None result is returned, instead | |
| 761 of an empty key object. | |
| 762 | |
| 763 Example:: | |
| 764 >>> user = users.lookup(username) | |
| 765 >>> user = users.lookup(username, consistent=True) | |
| 766 >>> app = apps.lookup('my_customer_id', 'my_app_id') | |
| 767 | |
| 768 """ | |
| 769 if not self.schema: | |
| 770 self.describe() | |
| 771 for x, arg in enumerate(args): | |
| 772 kwargs[self.schema[x].name] = arg | |
| 773 ret = self.get_item(**kwargs) | |
| 774 if not ret.keys(): | |
| 775 return None | |
| 776 return ret | |
| 777 | |
| 778 def new_item(self, *args): | |
| 779 """ | |
| 780 Returns a new, blank item | |
| 781 | |
| 782 This is mostly for consistency with boto.dynamodb | |
| 783 """ | |
| 784 if not self.schema: | |
| 785 self.describe() | |
| 786 data = {} | |
| 787 for x, arg in enumerate(args): | |
| 788 data[self.schema[x].name] = arg | |
| 789 return Item(self, data=data) | |
| 790 | |
| 791 def put_item(self, data, overwrite=False): | |
| 792 """ | |
| 793 Saves an entire item to DynamoDB. | |
| 794 | |
| 795 By default, if any part of the ``Item``'s original data doesn't match | |
| 796 what's currently in DynamoDB, this request will fail. This prevents | |
| 797 other processes from updating the data in between when you read the | |
| 798 item & when your request to update the item's data is processed, which | |
| 799 would typically result in some data loss. | |
| 800 | |
| 801 Requires a ``data`` parameter, which should be a dictionary of the data | |
| 802 you'd like to store in DynamoDB. | |
| 803 | |
| 804 Optionally accepts an ``overwrite`` parameter, which should be a | |
| 805 boolean. If you provide ``True``, this will tell DynamoDB to blindly | |
| 806 overwrite whatever data is present, if any. | |
| 807 | |
| 808 Returns ``True`` on success. | |
| 809 | |
| 810 Example:: | |
| 811 | |
| 812 >>> users.put_item(data={ | |
| 813 ... 'username': 'jane', | |
| 814 ... 'first_name': 'Jane', | |
| 815 ... 'last_name': 'Doe', | |
| 816 ... 'date_joined': 126478915, | |
| 817 ... }) | |
| 818 True | |
| 819 | |
| 820 """ | |
| 821 item = Item(self, data=data) | |
| 822 return item.save(overwrite=overwrite) | |
| 823 | |
| 824 def _put_item(self, item_data, expects=None): | |
| 825 """ | |
| 826 The internal variant of ``put_item`` (full data). This is used by the | |
| 827 ``Item`` objects, since that operation is represented at the | |
| 828 table-level by the API, but conceptually maps better to telling an | |
| 829 individual ``Item`` to save itself. | |
| 830 """ | |
| 831 kwargs = {} | |
| 832 | |
| 833 if expects is not None: | |
| 834 kwargs['expected'] = expects | |
| 835 | |
| 836 self.connection.put_item(self.table_name, item_data, **kwargs) | |
| 837 return True | |
| 838 | |
| 839 def _update_item(self, key, item_data, expects=None): | |
| 840 """ | |
| 841 The internal variant of ``put_item`` (partial data). This is used by the | |
| 842 ``Item`` objects, since that operation is represented at the | |
| 843 table-level by the API, but conceptually maps better to telling an | |
| 844 individual ``Item`` to save itself. | |
| 845 """ | |
| 846 raw_key = self._encode_keys(key) | |
| 847 kwargs = {} | |
| 848 | |
| 849 if expects is not None: | |
| 850 kwargs['expected'] = expects | |
| 851 | |
| 852 self.connection.update_item(self.table_name, raw_key, item_data, **kwargs) | |
| 853 return True | |
| 854 | |
| 855 def delete_item(self, expected=None, conditional_operator=None, **kwargs): | |
| 856 """ | |
| 857 Deletes a single item. You can perform a conditional delete operation | |
| 858 that deletes the item if it exists, or if it has an expected attribute | |
| 859 value. | |
| 860 | |
| 861 Conditional deletes are useful for only deleting items if specific | |
| 862 conditions are met. If those conditions are met, DynamoDB performs | |
| 863 the delete. Otherwise, the item is not deleted. | |
| 864 | |
| 865 To specify the expected attribute values of the item, you can pass a | |
| 866 dictionary of conditions to ``expected``. Each condition should follow | |
| 867 the pattern ``<attributename>__<comparison_operator>=<value_to_expect>``. | |
| 868 | |
| 869 **IMPORTANT** - Be careful when using this method, there is no undo. | |
| 870 | |
| 871 To specify the key of the item you'd like to get, you can specify the | |
| 872 key attributes as kwargs. | |
| 873 | |
| 874 Optionally accepts an ``expected`` parameter which is a dictionary of | |
| 875 expected attribute value conditions. | |
| 876 | |
| 877 Optionally accepts a ``conditional_operator`` which applies to the | |
| 878 expected attribute value conditions: | |
| 879 | |
| 880 + `AND` - If all of the conditions evaluate to true (default) | |
| 881 + `OR` - True if at least one condition evaluates to true | |
| 882 | |
| 883 Returns ``True`` on success, ``False`` on failed conditional delete. | |
| 884 | |
| 885 Example:: | |
| 886 | |
| 887 # A simple hash key. | |
| 888 >>> users.delete_item(username='johndoe') | |
| 889 True | |
| 890 | |
| 891 # A complex hash+range key. | |
| 892 >>> users.delete_item(username='jane', last_name='Doe') | |
| 893 True | |
| 894 | |
| 895 # With a key that is an invalid variable name in Python. | |
| 896 # Also, assumes a different schema than previous examples. | |
| 897 >>> users.delete_item(**{ | |
| 898 ... 'date-joined': 127549192, | |
| 899 ... }) | |
| 900 True | |
| 901 | |
| 902 # Conditional delete | |
| 903 >>> users.delete_item(username='johndoe', | |
| 904 ... expected={'balance__eq': 0}) | |
| 905 True | |
| 906 """ | |
| 907 expected = self._build_filters(expected, using=FILTER_OPERATORS) | |
| 908 raw_key = self._encode_keys(kwargs) | |
| 909 | |
| 910 try: | |
| 911 self.connection.delete_item(self.table_name, raw_key, | |
| 912 expected=expected, | |
| 913 conditional_operator=conditional_operator) | |
| 914 except exceptions.ConditionalCheckFailedException: | |
| 915 return False | |
| 916 | |
| 917 return True | |
| 918 | |
| 919 def get_key_fields(self): | |
| 920 """ | |
| 921 Returns the fields necessary to make a key for a table. | |
| 922 | |
| 923 If the ``Table`` does not already have a populated ``schema``, | |
| 924 this will request it via a ``Table.describe`` call. | |
| 925 | |
| 926 Returns a list of fieldnames (strings). | |
| 927 | |
| 928 Example:: | |
| 929 | |
| 930 # A simple hash key. | |
| 931 >>> users.get_key_fields() | |
| 932 ['username'] | |
| 933 | |
| 934 # A complex hash+range key. | |
| 935 >>> users.get_key_fields() | |
| 936 ['username', 'last_name'] | |
| 937 | |
| 938 """ | |
| 939 if not self.schema: | |
| 940 # We don't know the structure of the table. Get a description to | |
| 941 # populate the schema. | |
| 942 self.describe() | |
| 943 | |
| 944 return [field.name for field in self.schema] | |
| 945 | |
| 946 def batch_write(self): | |
| 947 """ | |
| 948 Allows the batching of writes to DynamoDB. | |
| 949 | |
| 950 Since each write/delete call to DynamoDB has a cost associated with it, | |
| 951 when loading lots of data, it makes sense to batch them, creating as | |
| 952 few calls as possible. | |
| 953 | |
| 954 This returns a context manager that will transparently handle creating | |
| 955 these batches. The object you get back lightly-resembles a ``Table`` | |
| 956 object, sharing just the ``put_item`` & ``delete_item`` methods | |
| 957 (which are all that DynamoDB can batch in terms of writing data). | |
| 958 | |
| 959 DynamoDB's maximum batch size is 25 items per request. If you attempt | |
| 960 to put/delete more than that, the context manager will batch as many | |
| 961 as it can up to that number, then flush them to DynamoDB & continue | |
| 962 batching as more calls come in. | |
| 963 | |
| 964 Example:: | |
| 965 | |
| 966 # Assuming a table with one record... | |
| 967 >>> with users.batch_write() as batch: | |
| 968 ... batch.put_item(data={ | |
| 969 ... 'username': 'johndoe', | |
| 970 ... 'first_name': 'John', | |
| 971 ... 'last_name': 'Doe', | |
| 972 ... 'owner': 1, | |
| 973 ... }) | |
| 974 ... # Nothing across the wire yet. | |
| 975 ... batch.delete_item(username='bob') | |
| 976 ... # Still no requests sent. | |
| 977 ... batch.put_item(data={ | |
| 978 ... 'username': 'jane', | |
| 979 ... 'first_name': 'Jane', | |
| 980 ... 'last_name': 'Doe', | |
| 981 ... 'date_joined': 127436192, | |
| 982 ... }) | |
| 983 ... # Nothing yet, but once we leave the context, the | |
| 984 ... # put/deletes will be sent. | |
| 985 | |
| 986 """ | |
| 987 # PHENOMENAL COSMIC DOCS!!! itty-bitty code. | |
| 988 return BatchTable(self) | |
| 989 | |
| 990 def _build_filters(self, filter_kwargs, using=QUERY_OPERATORS): | |
| 991 """ | |
| 992 An internal method for taking query/scan-style ``**kwargs`` & turning | |
| 993 them into the raw structure DynamoDB expects for filtering. | |
| 994 """ | |
| 995 if filter_kwargs is None: | |
| 996 return | |
| 997 | |
| 998 filters = {} | |
| 999 | |
| 1000 for field_and_op, value in filter_kwargs.items(): | |
| 1001 field_bits = field_and_op.split('__') | |
| 1002 fieldname = '__'.join(field_bits[:-1]) | |
| 1003 | |
| 1004 try: | |
| 1005 op = using[field_bits[-1]] | |
| 1006 except KeyError: | |
| 1007 raise exceptions.UnknownFilterTypeError( | |
| 1008 "Operator '%s' from '%s' is not recognized." % ( | |
| 1009 field_bits[-1], | |
| 1010 field_and_op | |
| 1011 ) | |
| 1012 ) | |
| 1013 | |
| 1014 lookup = { | |
| 1015 'AttributeValueList': [], | |
| 1016 'ComparisonOperator': op, | |
| 1017 } | |
| 1018 | |
| 1019 # Special-case the ``NULL/NOT_NULL`` case. | |
| 1020 if field_bits[-1] == 'null': | |
| 1021 del lookup['AttributeValueList'] | |
| 1022 | |
| 1023 if value is False: | |
| 1024 lookup['ComparisonOperator'] = 'NOT_NULL' | |
| 1025 else: | |
| 1026 lookup['ComparisonOperator'] = 'NULL' | |
| 1027 # Special-case the ``BETWEEN`` case. | |
| 1028 elif field_bits[-1] == 'between': | |
| 1029 if len(value) == 2 and isinstance(value, (list, tuple)): | |
| 1030 lookup['AttributeValueList'].append( | |
| 1031 self._dynamizer.encode(value[0]) | |
| 1032 ) | |
| 1033 lookup['AttributeValueList'].append( | |
| 1034 self._dynamizer.encode(value[1]) | |
| 1035 ) | |
| 1036 # Special-case the ``IN`` case | |
| 1037 elif field_bits[-1] == 'in': | |
| 1038 for val in value: | |
| 1039 lookup['AttributeValueList'].append(self._dynamizer.encode(val)) | |
| 1040 else: | |
| 1041 # Fix up the value for encoding, because it was built to only work | |
| 1042 # with ``set``s. | |
| 1043 if isinstance(value, (list, tuple)): | |
| 1044 value = set(value) | |
| 1045 lookup['AttributeValueList'].append( | |
| 1046 self._dynamizer.encode(value) | |
| 1047 ) | |
| 1048 | |
| 1049 # Finally, insert it into the filters. | |
| 1050 filters[fieldname] = lookup | |
| 1051 | |
| 1052 return filters | |
| 1053 | |
| 1054 def query(self, limit=None, index=None, reverse=False, consistent=False, | |
| 1055 attributes=None, max_page_size=None, **filter_kwargs): | |
| 1056 """ | |
| 1057 **WARNING:** This method is provided **strictly** for | |
| 1058 backward-compatibility. It returns results in an incorrect order. | |
| 1059 | |
| 1060 If you are writing new code, please use ``Table.query_2``. | |
| 1061 """ | |
| 1062 reverse = not reverse | |
| 1063 return self.query_2(limit=limit, index=index, reverse=reverse, | |
| 1064 consistent=consistent, attributes=attributes, | |
| 1065 max_page_size=max_page_size, **filter_kwargs) | |
| 1066 | |
| 1067 def query_2(self, limit=None, index=None, reverse=False, | |
| 1068 consistent=False, attributes=None, max_page_size=None, | |
| 1069 query_filter=None, conditional_operator=None, | |
| 1070 **filter_kwargs): | |
| 1071 """ | |
| 1072 Queries for a set of matching items in a DynamoDB table. | |
| 1073 | |
| 1074 Queries can be performed against a hash key, a hash+range key or | |
| 1075 against any data stored in your local secondary indexes. Query filters | |
| 1076 can be used to filter on arbitrary fields. | |
| 1077 | |
| 1078 **Note** - You can not query against arbitrary fields within the data | |
| 1079 stored in DynamoDB unless you specify ``query_filter`` values. | |
| 1080 | |
| 1081 To specify the filters of the items you'd like to get, you can specify | |
| 1082 the filters as kwargs. Each filter kwarg should follow the pattern | |
| 1083 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters | |
| 1084 are specified in the same way. | |
| 1085 | |
| 1086 Optionally accepts a ``limit`` parameter, which should be an integer | |
| 1087 count of the total number of items to return. (Default: ``None`` - | |
| 1088 all results) | |
| 1089 | |
| 1090 Optionally accepts an ``index`` parameter, which should be a string of | |
| 1091 name of the local secondary index you want to query against. | |
| 1092 (Default: ``None``) | |
| 1093 | |
| 1094 Optionally accepts a ``reverse`` parameter, which will present the | |
| 1095 results in reverse order. (Default: ``False`` - normal order) | |
| 1096 | |
| 1097 Optionally accepts a ``consistent`` parameter, which should be a | |
| 1098 boolean. If you provide ``True``, it will force a consistent read of | |
| 1099 the data (more expensive). (Default: ``False`` - use eventually | |
| 1100 consistent reads) | |
| 1101 | |
| 1102 Optionally accepts a ``attributes`` parameter, which should be a | |
| 1103 tuple. If you provide any attributes only these will be fetched | |
| 1104 from DynamoDB. This uses the ``AttributesToGet`` and set's | |
| 1105 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. | |
| 1106 | |
| 1107 Optionally accepts a ``max_page_size`` parameter, which should be an | |
| 1108 integer count of the maximum number of items to retrieve | |
| 1109 **per-request**. This is useful in making faster requests & prevent | |
| 1110 the scan from drowning out other queries. (Default: ``None`` - | |
| 1111 fetch as many as DynamoDB will return) | |
| 1112 | |
| 1113 Optionally accepts a ``query_filter`` which is a dictionary of filter | |
| 1114 conditions against any arbitrary field in the returned data. | |
| 1115 | |
| 1116 Optionally accepts a ``conditional_operator`` which applies to the | |
| 1117 query filter conditions: | |
| 1118 | |
| 1119 + `AND` - True if all filter conditions evaluate to true (default) | |
| 1120 + `OR` - True if at least one filter condition evaluates to true | |
| 1121 | |
| 1122 Returns a ``ResultSet`` containing ``Item``s, which transparently handles the pagination of | |
| 1123 results you get back. | |
| 1124 | |
| 1125 Example:: | |
| 1126 | |
| 1127 # Look for last names equal to "Doe". | |
| 1128 >>> results = users.query(last_name__eq='Doe') | |
| 1129 >>> for res in results: | |
| 1130 ... print res['first_name'] | |
| 1131 'John' | |
| 1132 'Jane' | |
| 1133 | |
| 1134 # Look for last names beginning with "D", in reverse order, limit 3. | |
| 1135 >>> results = users.query( | |
| 1136 ... last_name__beginswith='D', | |
| 1137 ... reverse=True, | |
| 1138 ... limit=3 | |
| 1139 ... ) | |
| 1140 >>> for res in results: | |
| 1141 ... print res['first_name'] | |
| 1142 'Alice' | |
| 1143 'Jane' | |
| 1144 'John' | |
| 1145 | |
| 1146 # Use an LSI & a consistent read. | |
| 1147 >>> results = users.query( | |
| 1148 ... date_joined__gte=1236451000, | |
| 1149 ... owner__eq=1, | |
| 1150 ... index='DateJoinedIndex', | |
| 1151 ... consistent=True | |
| 1152 ... ) | |
| 1153 >>> for res in results: | |
| 1154 ... print res['first_name'] | |
| 1155 'Alice' | |
| 1156 'Bob' | |
| 1157 'John' | |
| 1158 'Fred' | |
| 1159 | |
| 1160 # Filter by non-indexed field(s) | |
| 1161 >>> results = users.query( | |
| 1162 ... last_name__eq='Doe', | |
| 1163 ... reverse=True, | |
| 1164 ... query_filter={ | |
| 1165 ... 'first_name__beginswith': 'A' | |
| 1166 ... } | |
| 1167 ... ) | |
| 1168 >>> for res in results: | |
| 1169 ... print res['first_name'] + ' ' + res['last_name'] | |
| 1170 'Alice Doe' | |
| 1171 | |
| 1172 """ | |
| 1173 if self.schema: | |
| 1174 if len(self.schema) == 1: | |
| 1175 if len(filter_kwargs) <= 1: | |
| 1176 if not self.global_indexes or not len(self.global_indexes): | |
| 1177 # If the schema only has one field, there's <= 1 filter | |
| 1178 # param & no Global Secondary Indexes, this is user | |
| 1179 # error. Bail early. | |
| 1180 raise exceptions.QueryError( | |
| 1181 "You must specify more than one key to filter on." | |
| 1182 ) | |
| 1183 | |
| 1184 if attributes is not None: | |
| 1185 select = 'SPECIFIC_ATTRIBUTES' | |
| 1186 else: | |
| 1187 select = None | |
| 1188 | |
| 1189 results = ResultSet( | |
| 1190 max_page_size=max_page_size | |
| 1191 ) | |
| 1192 kwargs = filter_kwargs.copy() | |
| 1193 kwargs.update({ | |
| 1194 'limit': limit, | |
| 1195 'index': index, | |
| 1196 'reverse': reverse, | |
| 1197 'consistent': consistent, | |
| 1198 'select': select, | |
| 1199 'attributes_to_get': attributes, | |
| 1200 'query_filter': query_filter, | |
| 1201 'conditional_operator': conditional_operator, | |
| 1202 }) | |
| 1203 results.to_call(self._query, **kwargs) | |
| 1204 return results | |
| 1205 | |
| 1206 def query_count(self, index=None, consistent=False, conditional_operator=None, | |
| 1207 query_filter=None, scan_index_forward=True, limit=None, | |
| 1208 exclusive_start_key=None, **filter_kwargs): | |
| 1209 """ | |
| 1210 Queries the exact count of matching items in a DynamoDB table. | |
| 1211 | |
| 1212 Queries can be performed against a hash key, a hash+range key or | |
| 1213 against any data stored in your local secondary indexes. Query filters | |
| 1214 can be used to filter on arbitrary fields. | |
| 1215 | |
| 1216 To specify the filters of the items you'd like to get, you can specify | |
| 1217 the filters as kwargs. Each filter kwarg should follow the pattern | |
| 1218 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters | |
| 1219 are specified in the same way. | |
| 1220 | |
| 1221 Optionally accepts an ``index`` parameter, which should be a string of | |
| 1222 name of the local secondary index you want to query against. | |
| 1223 (Default: ``None``) | |
| 1224 | |
| 1225 Optionally accepts a ``consistent`` parameter, which should be a | |
| 1226 boolean. If you provide ``True``, it will force a consistent read of | |
| 1227 the data (more expensive). (Default: ``False`` - use eventually | |
| 1228 consistent reads) | |
| 1229 | |
| 1230 Optionally accepts a ``query_filter`` which is a dictionary of filter | |
| 1231 conditions against any arbitrary field in the returned data. | |
| 1232 | |
| 1233 Optionally accepts a ``conditional_operator`` which applies to the | |
| 1234 query filter conditions: | |
| 1235 | |
| 1236 + `AND` - True if all filter conditions evaluate to true (default) | |
| 1237 + `OR` - True if at least one filter condition evaluates to true | |
| 1238 | |
| 1239 Optionally accept a ``exclusive_start_key`` which is used to get | |
| 1240 the remaining items when a query cannot return the complete count. | |
| 1241 | |
| 1242 Returns an integer which represents the exact amount of matched | |
| 1243 items. | |
| 1244 | |
| 1245 :type scan_index_forward: boolean | |
| 1246 :param scan_index_forward: Specifies ascending (true) or descending | |
| 1247 (false) traversal of the index. DynamoDB returns results reflecting | |
| 1248 the requested order determined by the range key. If the data type | |
| 1249 is Number, the results are returned in numeric order. For String, | |
| 1250 the results are returned in order of ASCII character code values. | |
| 1251 For Binary, DynamoDB treats each byte of the binary data as | |
| 1252 unsigned when it compares binary values. | |
| 1253 | |
| 1254 If ScanIndexForward is not specified, the results are returned in | |
| 1255 ascending order. | |
| 1256 | |
| 1257 :type limit: integer | |
| 1258 :param limit: The maximum number of items to evaluate (not necessarily | |
| 1259 the number of matching items). | |
| 1260 | |
| 1261 Example:: | |
| 1262 | |
| 1263 # Look for last names equal to "Doe". | |
| 1264 >>> users.query_count(last_name__eq='Doe') | |
| 1265 5 | |
| 1266 | |
| 1267 # Use an LSI & a consistent read. | |
| 1268 >>> users.query_count( | |
| 1269 ... date_joined__gte=1236451000, | |
| 1270 ... owner__eq=1, | |
| 1271 ... index='DateJoinedIndex', | |
| 1272 ... consistent=True | |
| 1273 ... ) | |
| 1274 2 | |
| 1275 | |
| 1276 """ | |
| 1277 key_conditions = self._build_filters( | |
| 1278 filter_kwargs, | |
| 1279 using=QUERY_OPERATORS | |
| 1280 ) | |
| 1281 | |
| 1282 built_query_filter = self._build_filters( | |
| 1283 query_filter, | |
| 1284 using=FILTER_OPERATORS | |
| 1285 ) | |
| 1286 | |
| 1287 count_buffer = 0 | |
| 1288 last_evaluated_key = exclusive_start_key | |
| 1289 | |
| 1290 while True: | |
| 1291 raw_results = self.connection.query( | |
| 1292 self.table_name, | |
| 1293 index_name=index, | |
| 1294 consistent_read=consistent, | |
| 1295 select='COUNT', | |
| 1296 key_conditions=key_conditions, | |
| 1297 query_filter=built_query_filter, | |
| 1298 conditional_operator=conditional_operator, | |
| 1299 limit=limit, | |
| 1300 scan_index_forward=scan_index_forward, | |
| 1301 exclusive_start_key=last_evaluated_key | |
| 1302 ) | |
| 1303 | |
| 1304 count_buffer += int(raw_results.get('Count', 0)) | |
| 1305 last_evaluated_key = raw_results.get('LastEvaluatedKey') | |
| 1306 if not last_evaluated_key or count_buffer < 1: | |
| 1307 break | |
| 1308 | |
| 1309 return count_buffer | |
| 1310 | |
| 1311 def _query(self, limit=None, index=None, reverse=False, consistent=False, | |
| 1312 exclusive_start_key=None, select=None, attributes_to_get=None, | |
| 1313 query_filter=None, conditional_operator=None, **filter_kwargs): | |
| 1314 """ | |
| 1315 The internal method that performs the actual queries. Used extensively | |
| 1316 by ``ResultSet`` to perform each (paginated) request. | |
| 1317 """ | |
| 1318 kwargs = { | |
| 1319 'limit': limit, | |
| 1320 'index_name': index, | |
| 1321 'consistent_read': consistent, | |
| 1322 'select': select, | |
| 1323 'attributes_to_get': attributes_to_get, | |
| 1324 'conditional_operator': conditional_operator, | |
| 1325 } | |
| 1326 | |
| 1327 if reverse: | |
| 1328 kwargs['scan_index_forward'] = False | |
| 1329 | |
| 1330 if exclusive_start_key: | |
| 1331 kwargs['exclusive_start_key'] = {} | |
| 1332 | |
| 1333 for key, value in exclusive_start_key.items(): | |
| 1334 kwargs['exclusive_start_key'][key] = \ | |
| 1335 self._dynamizer.encode(value) | |
| 1336 | |
| 1337 # Convert the filters into something we can actually use. | |
| 1338 kwargs['key_conditions'] = self._build_filters( | |
| 1339 filter_kwargs, | |
| 1340 using=QUERY_OPERATORS | |
| 1341 ) | |
| 1342 | |
| 1343 kwargs['query_filter'] = self._build_filters( | |
| 1344 query_filter, | |
| 1345 using=FILTER_OPERATORS | |
| 1346 ) | |
| 1347 | |
| 1348 raw_results = self.connection.query( | |
| 1349 self.table_name, | |
| 1350 **kwargs | |
| 1351 ) | |
| 1352 results = [] | |
| 1353 last_key = None | |
| 1354 | |
| 1355 for raw_item in raw_results.get('Items', []): | |
| 1356 item = Item(self) | |
| 1357 item.load({ | |
| 1358 'Item': raw_item, | |
| 1359 }) | |
| 1360 results.append(item) | |
| 1361 | |
| 1362 if raw_results.get('LastEvaluatedKey', None): | |
| 1363 last_key = {} | |
| 1364 | |
| 1365 for key, value in raw_results['LastEvaluatedKey'].items(): | |
| 1366 last_key[key] = self._dynamizer.decode(value) | |
| 1367 | |
| 1368 return { | |
| 1369 'results': results, | |
| 1370 'last_key': last_key, | |
| 1371 } | |
| 1372 | |
| 1373 def scan(self, limit=None, segment=None, total_segments=None, | |
| 1374 max_page_size=None, attributes=None, conditional_operator=None, | |
| 1375 **filter_kwargs): | |
| 1376 """ | |
| 1377 Scans across all items within a DynamoDB table. | |
| 1378 | |
| 1379 Scans can be performed against a hash key or a hash+range key. You can | |
| 1380 additionally filter the results after the table has been read but | |
| 1381 before the response is returned by using query filters. | |
| 1382 | |
| 1383 To specify the filters of the items you'd like to get, you can specify | |
| 1384 the filters as kwargs. Each filter kwarg should follow the pattern | |
| 1385 ``<fieldname>__<filter_operation>=<value_to_look_for>``. | |
| 1386 | |
| 1387 Optionally accepts a ``limit`` parameter, which should be an integer | |
| 1388 count of the total number of items to return. (Default: ``None`` - | |
| 1389 all results) | |
| 1390 | |
| 1391 Optionally accepts a ``segment`` parameter, which should be an integer | |
| 1392 of the segment to retrieve on. Please see the documentation about | |
| 1393 Parallel Scans (Default: ``None`` - no segments) | |
| 1394 | |
| 1395 Optionally accepts a ``total_segments`` parameter, which should be an | |
| 1396 integer count of number of segments to divide the table into. | |
| 1397 Please see the documentation about Parallel Scans (Default: ``None`` - | |
| 1398 no segments) | |
| 1399 | |
| 1400 Optionally accepts a ``max_page_size`` parameter, which should be an | |
| 1401 integer count of the maximum number of items to retrieve | |
| 1402 **per-request**. This is useful in making faster requests & prevent | |
| 1403 the scan from drowning out other queries. (Default: ``None`` - | |
| 1404 fetch as many as DynamoDB will return) | |
| 1405 | |
| 1406 Optionally accepts an ``attributes`` parameter, which should be a | |
| 1407 tuple. If you provide any attributes only these will be fetched | |
| 1408 from DynamoDB. This uses the ``AttributesToGet`` and set's | |
| 1409 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. | |
| 1410 | |
| 1411 Returns a ``ResultSet``, which transparently handles the pagination of | |
| 1412 results you get back. | |
| 1413 | |
| 1414 Example:: | |
| 1415 | |
| 1416 # All results. | |
| 1417 >>> everything = users.scan() | |
| 1418 | |
| 1419 # Look for last names beginning with "D". | |
| 1420 >>> results = users.scan(last_name__beginswith='D') | |
| 1421 >>> for res in results: | |
| 1422 ... print res['first_name'] | |
| 1423 'Alice' | |
| 1424 'John' | |
| 1425 'Jane' | |
| 1426 | |
| 1427 # Use an ``IN`` filter & limit. | |
| 1428 >>> results = users.scan( | |
| 1429 ... age__in=[25, 26, 27, 28, 29], | |
| 1430 ... limit=1 | |
| 1431 ... ) | |
| 1432 >>> for res in results: | |
| 1433 ... print res['first_name'] | |
| 1434 'Alice' | |
| 1435 | |
| 1436 """ | |
| 1437 results = ResultSet( | |
| 1438 max_page_size=max_page_size | |
| 1439 ) | |
| 1440 kwargs = filter_kwargs.copy() | |
| 1441 kwargs.update({ | |
| 1442 'limit': limit, | |
| 1443 'segment': segment, | |
| 1444 'total_segments': total_segments, | |
| 1445 'attributes': attributes, | |
| 1446 'conditional_operator': conditional_operator, | |
| 1447 }) | |
| 1448 results.to_call(self._scan, **kwargs) | |
| 1449 return results | |
| 1450 | |
| 1451 def _scan(self, limit=None, exclusive_start_key=None, segment=None, | |
| 1452 total_segments=None, attributes=None, conditional_operator=None, | |
| 1453 **filter_kwargs): | |
| 1454 """ | |
| 1455 The internal method that performs the actual scan. Used extensively | |
| 1456 by ``ResultSet`` to perform each (paginated) request. | |
| 1457 """ | |
| 1458 kwargs = { | |
| 1459 'limit': limit, | |
| 1460 'segment': segment, | |
| 1461 'total_segments': total_segments, | |
| 1462 'attributes_to_get': attributes, | |
| 1463 'conditional_operator': conditional_operator, | |
| 1464 } | |
| 1465 | |
| 1466 if exclusive_start_key: | |
| 1467 kwargs['exclusive_start_key'] = {} | |
| 1468 | |
| 1469 for key, value in exclusive_start_key.items(): | |
| 1470 kwargs['exclusive_start_key'][key] = \ | |
| 1471 self._dynamizer.encode(value) | |
| 1472 | |
| 1473 # Convert the filters into something we can actually use. | |
| 1474 kwargs['scan_filter'] = self._build_filters( | |
| 1475 filter_kwargs, | |
| 1476 using=FILTER_OPERATORS | |
| 1477 ) | |
| 1478 | |
| 1479 raw_results = self.connection.scan( | |
| 1480 self.table_name, | |
| 1481 **kwargs | |
| 1482 ) | |
| 1483 results = [] | |
| 1484 last_key = None | |
| 1485 | |
| 1486 for raw_item in raw_results.get('Items', []): | |
| 1487 item = Item(self) | |
| 1488 item.load({ | |
| 1489 'Item': raw_item, | |
| 1490 }) | |
| 1491 results.append(item) | |
| 1492 | |
| 1493 if raw_results.get('LastEvaluatedKey', None): | |
| 1494 last_key = {} | |
| 1495 | |
| 1496 for key, value in raw_results['LastEvaluatedKey'].items(): | |
| 1497 last_key[key] = self._dynamizer.decode(value) | |
| 1498 | |
| 1499 return { | |
| 1500 'results': results, | |
| 1501 'last_key': last_key, | |
| 1502 } | |
| 1503 | |
| 1504 def batch_get(self, keys, consistent=False, attributes=None): | |
| 1505 """ | |
| 1506 Fetches many specific items in batch from a table. | |
| 1507 | |
| 1508 Requires a ``keys`` parameter, which should be a list of dictionaries. | |
| 1509 Each dictionary should consist of the keys values to specify. | |
| 1510 | |
| 1511 Optionally accepts a ``consistent`` parameter, which should be a | |
| 1512 boolean. If you provide ``True``, a strongly consistent read will be | |
| 1513 used. (Default: False) | |
| 1514 | |
| 1515 Optionally accepts an ``attributes`` parameter, which should be a | |
| 1516 tuple. If you provide any attributes only these will be fetched | |
| 1517 from DynamoDB. | |
| 1518 | |
| 1519 Returns a ``ResultSet``, which transparently handles the pagination of | |
| 1520 results you get back. | |
| 1521 | |
| 1522 Example:: | |
| 1523 | |
| 1524 >>> results = users.batch_get(keys=[ | |
| 1525 ... { | |
| 1526 ... 'username': 'johndoe', | |
| 1527 ... }, | |
| 1528 ... { | |
| 1529 ... 'username': 'jane', | |
| 1530 ... }, | |
| 1531 ... { | |
| 1532 ... 'username': 'fred', | |
| 1533 ... }, | |
| 1534 ... ]) | |
| 1535 >>> for res in results: | |
| 1536 ... print res['first_name'] | |
| 1537 'John' | |
| 1538 'Jane' | |
| 1539 'Fred' | |
| 1540 | |
| 1541 """ | |
| 1542 # We pass the keys to the constructor instead, so it can maintain it's | |
| 1543 # own internal state as to what keys have been processed. | |
| 1544 results = BatchGetResultSet(keys=keys, max_batch_get=self.max_batch_get) | |
| 1545 results.to_call(self._batch_get, consistent=consistent, attributes=attributes) | |
| 1546 return results | |
| 1547 | |
| 1548 def _batch_get(self, keys, consistent=False, attributes=None): | |
| 1549 """ | |
| 1550 The internal method that performs the actual batch get. Used extensively | |
| 1551 by ``BatchGetResultSet`` to perform each (paginated) request. | |
| 1552 """ | |
| 1553 items = { | |
| 1554 self.table_name: { | |
| 1555 'Keys': [], | |
| 1556 }, | |
| 1557 } | |
| 1558 | |
| 1559 if consistent: | |
| 1560 items[self.table_name]['ConsistentRead'] = True | |
| 1561 | |
| 1562 if attributes is not None: | |
| 1563 items[self.table_name]['AttributesToGet'] = attributes | |
| 1564 | |
| 1565 for key_data in keys: | |
| 1566 raw_key = {} | |
| 1567 | |
| 1568 for key, value in key_data.items(): | |
| 1569 raw_key[key] = self._dynamizer.encode(value) | |
| 1570 | |
| 1571 items[self.table_name]['Keys'].append(raw_key) | |
| 1572 | |
| 1573 raw_results = self.connection.batch_get_item(request_items=items) | |
| 1574 results = [] | |
| 1575 unprocessed_keys = [] | |
| 1576 | |
| 1577 for raw_item in raw_results['Responses'].get(self.table_name, []): | |
| 1578 item = Item(self) | |
| 1579 item.load({ | |
| 1580 'Item': raw_item, | |
| 1581 }) | |
| 1582 results.append(item) | |
| 1583 | |
| 1584 raw_unprocessed = raw_results.get('UnprocessedKeys', {}).get(self.table_name, {}) | |
| 1585 | |
| 1586 for raw_key in raw_unprocessed.get('Keys', []): | |
| 1587 py_key = {} | |
| 1588 | |
| 1589 for key, value in raw_key.items(): | |
| 1590 py_key[key] = self._dynamizer.decode(value) | |
| 1591 | |
| 1592 unprocessed_keys.append(py_key) | |
| 1593 | |
| 1594 return { | |
| 1595 'results': results, | |
| 1596 # NEVER return a ``last_key``. Just in-case any part of | |
| 1597 # ``ResultSet`` peeks through, since much of the | |
| 1598 # original underlying implementation is based on this key. | |
| 1599 'last_key': None, | |
| 1600 'unprocessed_keys': unprocessed_keys, | |
| 1601 } | |
| 1602 | |
| 1603 def count(self): | |
| 1604 """ | |
| 1605 Returns a (very) eventually consistent count of the number of items | |
| 1606 in a table. | |
| 1607 | |
| 1608 Lag time is about 6 hours, so don't expect a high degree of accuracy. | |
| 1609 | |
| 1610 Example:: | |
| 1611 | |
| 1612 >>> users.count() | |
| 1613 6 | |
| 1614 | |
| 1615 """ | |
| 1616 info = self.describe() | |
| 1617 return info['Table'].get('ItemCount', 0) | |
| 1618 | |
| 1619 | |
| 1620 class BatchTable(object): | |
| 1621 """ | |
| 1622 Used by ``Table`` as the context manager for batch writes. | |
| 1623 | |
| 1624 You likely don't want to try to use this object directly. | |
| 1625 """ | |
| 1626 def __init__(self, table): | |
| 1627 self.table = table | |
| 1628 self._to_put = [] | |
| 1629 self._to_delete = [] | |
| 1630 self._unprocessed = [] | |
| 1631 | |
| 1632 def __enter__(self): | |
| 1633 return self | |
| 1634 | |
| 1635 def __exit__(self, type, value, traceback): | |
| 1636 if self._to_put or self._to_delete: | |
| 1637 # Flush anything that's left. | |
| 1638 self.flush() | |
| 1639 | |
| 1640 if self._unprocessed: | |
| 1641 # Finally, handle anything that wasn't processed. | |
| 1642 self.resend_unprocessed() | |
| 1643 | |
| 1644 def put_item(self, data, overwrite=False): | |
| 1645 self._to_put.append(data) | |
| 1646 | |
| 1647 if self.should_flush(): | |
| 1648 self.flush() | |
| 1649 | |
| 1650 def delete_item(self, **kwargs): | |
| 1651 self._to_delete.append(kwargs) | |
| 1652 | |
| 1653 if self.should_flush(): | |
| 1654 self.flush() | |
| 1655 | |
| 1656 def should_flush(self): | |
| 1657 if len(self._to_put) + len(self._to_delete) == 25: | |
| 1658 return True | |
| 1659 | |
| 1660 return False | |
| 1661 | |
| 1662 def flush(self): | |
| 1663 batch_data = { | |
| 1664 self.table.table_name: [ | |
| 1665 # We'll insert data here shortly. | |
| 1666 ], | |
| 1667 } | |
| 1668 | |
| 1669 for put in self._to_put: | |
| 1670 item = Item(self.table, data=put) | |
| 1671 batch_data[self.table.table_name].append({ | |
| 1672 'PutRequest': { | |
| 1673 'Item': item.prepare_full(), | |
| 1674 } | |
| 1675 }) | |
| 1676 | |
| 1677 for delete in self._to_delete: | |
| 1678 batch_data[self.table.table_name].append({ | |
| 1679 'DeleteRequest': { | |
| 1680 'Key': self.table._encode_keys(delete), | |
| 1681 } | |
| 1682 }) | |
| 1683 | |
| 1684 resp = self.table.connection.batch_write_item(batch_data) | |
| 1685 self.handle_unprocessed(resp) | |
| 1686 | |
| 1687 self._to_put = [] | |
| 1688 self._to_delete = [] | |
| 1689 return True | |
| 1690 | |
| 1691 def handle_unprocessed(self, resp): | |
| 1692 if len(resp.get('UnprocessedItems', [])): | |
| 1693 table_name = self.table.table_name | |
| 1694 unprocessed = resp['UnprocessedItems'].get(table_name, []) | |
| 1695 | |
| 1696 # Some items have not been processed. Stow them for now & | |
| 1697 # re-attempt processing on ``__exit__``. | |
| 1698 msg = "%s items were unprocessed. Storing for later." | |
| 1699 boto.log.info(msg % len(unprocessed)) | |
| 1700 self._unprocessed.extend(unprocessed) | |
| 1701 | |
| 1702 def resend_unprocessed(self): | |
| 1703 # If there are unprocessed records (for instance, the user was over | |
| 1704 # their throughput limitations), iterate over them & send until they're | |
| 1705 # all there. | |
| 1706 boto.log.info( | |
| 1707 "Re-sending %s unprocessed items." % len(self._unprocessed) | |
| 1708 ) | |
| 1709 | |
| 1710 while len(self._unprocessed): | |
| 1711 # Again, do 25 at a time. | |
| 1712 to_resend = self._unprocessed[:25] | |
| 1713 # Remove them from the list. | |
| 1714 self._unprocessed = self._unprocessed[25:] | |
| 1715 batch_data = { | |
| 1716 self.table.table_name: to_resend | |
| 1717 } | |
| 1718 boto.log.info("Sending %s items" % len(to_resend)) | |
| 1719 resp = self.table.connection.batch_write_item(batch_data) | |
| 1720 self.handle_unprocessed(resp) | |
| 1721 boto.log.info( | |
| 1722 "%s unprocessed items left" % len(self._unprocessed) | |
| 1723 ) |
