Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/boto/kinesis/layer1.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
comparison
equal
deleted
inserted
replaced
4:79f47841a781 | 5:9b1c78e6ba9c |
---|---|
1 # Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved | |
2 # | |
3 # Permission is hereby granted, free of charge, to any person obtaining a | |
4 # copy of this software and associated documentation files (the | |
5 # "Software"), to deal in the Software without restriction, including | |
6 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
7 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
8 # persons to whom the Software is furnished to do so, subject to the fol- | |
9 # lowing conditions: | |
10 # | |
11 # The above copyright notice and this permission notice shall be included | |
12 # in all copies or substantial portions of the Software. | |
13 # | |
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
20 # IN THE SOFTWARE. | |
21 # | |
22 | |
23 import base64 | |
24 import boto | |
25 | |
26 from boto.connection import AWSQueryConnection | |
27 from boto.regioninfo import RegionInfo | |
28 from boto.exception import JSONResponseError | |
29 from boto.kinesis import exceptions | |
30 from boto.compat import json | |
31 from boto.compat import six | |
32 | |
33 | |
34 class KinesisConnection(AWSQueryConnection): | |
35 """ | |
36 Amazon Kinesis Service API Reference | |
37 Amazon Kinesis is a managed service that scales elastically for | |
38 real time processing of streaming big data. | |
39 """ | |
40 APIVersion = "2013-12-02" | |
41 DefaultRegionName = "us-east-1" | |
42 DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com" | |
43 ServiceName = "Kinesis" | |
44 TargetPrefix = "Kinesis_20131202" | |
45 ResponseError = JSONResponseError | |
46 | |
47 _faults = { | |
48 "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException, | |
49 "LimitExceededException": exceptions.LimitExceededException, | |
50 "ExpiredIteratorException": exceptions.ExpiredIteratorException, | |
51 "ResourceInUseException": exceptions.ResourceInUseException, | |
52 "ResourceNotFoundException": exceptions.ResourceNotFoundException, | |
53 "InvalidArgumentException": exceptions.InvalidArgumentException, | |
54 "SubscriptionRequiredException": exceptions.SubscriptionRequiredException | |
55 } | |
56 | |
57 | |
58 def __init__(self, **kwargs): | |
59 region = kwargs.pop('region', None) | |
60 if not region: | |
61 region = RegionInfo(self, self.DefaultRegionName, | |
62 self.DefaultRegionEndpoint) | |
63 if 'host' not in kwargs: | |
64 kwargs['host'] = region.endpoint | |
65 super(KinesisConnection, self).__init__(**kwargs) | |
66 self.region = region | |
67 | |
68 def _required_auth_capability(self): | |
69 return ['hmac-v4'] | |
70 | |
71 def add_tags_to_stream(self, stream_name, tags): | |
72 """ | |
73 Adds or updates tags for the specified Amazon Kinesis stream. | |
74 Each stream can have up to 10 tags. | |
75 | |
76 If tags have already been assigned to the stream, | |
77 `AddTagsToStream` overwrites any existing tags that correspond | |
78 to the specified tag keys. | |
79 | |
80 :type stream_name: string | |
81 :param stream_name: The name of the stream. | |
82 | |
83 :type tags: map | |
84 :param tags: The set of key-value pairs to use to create the tags. | |
85 | |
86 """ | |
87 params = {'StreamName': stream_name, 'Tags': tags, } | |
88 return self.make_request(action='AddTagsToStream', | |
89 body=json.dumps(params)) | |
90 | |
91 def create_stream(self, stream_name, shard_count): | |
92 """ | |
93 Creates a Amazon Kinesis stream. A stream captures and | |
94 transports data records that are continuously emitted from | |
95 different data sources or producers . Scale-out within an | |
96 Amazon Kinesis stream is explicitly supported by means of | |
97 shards, which are uniquely identified groups of data records | |
98 in an Amazon Kinesis stream. | |
99 | |
100 You specify and control the number of shards that a stream is | |
101 composed of. Each open shard can support up to 5 read | |
102 transactions per second, up to a maximum total of 2 MB of data | |
103 read per second. Each shard can support up to 1000 records | |
104 written per second, up to a maximum total of 1 MB data written | |
105 per second. You can add shards to a stream if the amount of | |
106 data input increases and you can remove shards if the amount | |
107 of data input decreases. | |
108 | |
109 The stream name identifies the stream. The name is scoped to | |
110 the AWS account used by the application. It is also scoped by | |
111 region. That is, two streams in two different accounts can | |
112 have the same name, and two streams in the same account, but | |
113 in two different regions, can have the same name. | |
114 | |
115 `CreateStream` is an asynchronous operation. Upon receiving a | |
116 `CreateStream` request, Amazon Kinesis immediately returns and | |
117 sets the stream status to `CREATING`. After the stream is | |
118 created, Amazon Kinesis sets the stream status to `ACTIVE`. | |
119 You should perform read and write operations only on an | |
120 `ACTIVE` stream. | |
121 | |
122 You receive a `LimitExceededException` when making a | |
123 `CreateStream` request if you try to do one of the following: | |
124 | |
125 | |
126 + Have more than five streams in the `CREATING` state at any | |
127 point in time. | |
128 + Create more shards than are authorized for your account. | |
129 | |
130 | |
131 The default limit for an AWS account is 10 shards per stream. | |
132 If you need to create a stream with more than 10 shards, | |
133 `contact AWS Support`_ to increase the limit on your account. | |
134 | |
135 You can use `DescribeStream` to check the stream status, which | |
136 is returned in `StreamStatus`. | |
137 | |
138 `CreateStream` has a limit of 5 transactions per second per | |
139 account. | |
140 | |
141 :type stream_name: string | |
142 :param stream_name: A name to identify the stream. The stream name is | |
143 scoped to the AWS account used by the application that creates the | |
144 stream. It is also scoped by region. That is, two streams in two | |
145 different AWS accounts can have the same name, and two streams in | |
146 the same AWS account, but in two different regions, can have the | |
147 same name. | |
148 | |
149 :type shard_count: integer | |
150 :param shard_count: The number of shards that the stream will use. The | |
151 throughput of the stream is a function of the number of shards; | |
152 more shards are required for greater provisioned throughput. | |
153 **Note:** The default limit for an AWS account is 10 shards per stream. | |
154 If you need to create a stream with more than 10 shards, `contact | |
155 AWS Support`_ to increase the limit on your account. | |
156 | |
157 """ | |
158 params = { | |
159 'StreamName': stream_name, | |
160 'ShardCount': shard_count, | |
161 } | |
162 return self.make_request(action='CreateStream', | |
163 body=json.dumps(params)) | |
164 | |
165 def delete_stream(self, stream_name): | |
166 """ | |
167 Deletes a stream and all its shards and data. You must shut | |
168 down any applications that are operating on the stream before | |
169 you delete the stream. If an application attempts to operate | |
170 on a deleted stream, it will receive the exception | |
171 `ResourceNotFoundException`. | |
172 | |
173 If the stream is in the `ACTIVE` state, you can delete it. | |
174 After a `DeleteStream` request, the specified stream is in the | |
175 `DELETING` state until Amazon Kinesis completes the deletion. | |
176 | |
177 **Note:** Amazon Kinesis might continue to accept data read | |
178 and write operations, such as PutRecord, PutRecords, and | |
179 GetRecords, on a stream in the `DELETING` state until the | |
180 stream deletion is complete. | |
181 | |
182 When you delete a stream, any shards in that stream are also | |
183 deleted, and any tags are dissociated from the stream. | |
184 | |
185 You can use the DescribeStream operation to check the state of | |
186 the stream, which is returned in `StreamStatus`. | |
187 | |
188 `DeleteStream` has a limit of 5 transactions per second per | |
189 account. | |
190 | |
191 :type stream_name: string | |
192 :param stream_name: The name of the stream to delete. | |
193 | |
194 """ | |
195 params = {'StreamName': stream_name, } | |
196 return self.make_request(action='DeleteStream', | |
197 body=json.dumps(params)) | |
198 | |
199 def describe_stream(self, stream_name, limit=None, | |
200 exclusive_start_shard_id=None): | |
201 """ | |
202 Describes the specified stream. | |
203 | |
204 The information about the stream includes its current status, | |
205 its Amazon Resource Name (ARN), and an array of shard objects. | |
206 For each shard object, there is information about the hash key | |
207 and sequence number ranges that the shard spans, and the IDs | |
208 of any earlier shards that played in a role in creating the | |
209 shard. A sequence number is the identifier associated with | |
210 every record ingested in the Amazon Kinesis stream. The | |
211 sequence number is assigned when a record is put into the | |
212 stream. | |
213 | |
214 You can limit the number of returned shards using the `Limit` | |
215 parameter. The number of shards in a stream may be too large | |
216 to return from a single call to `DescribeStream`. You can | |
217 detect this by using the `HasMoreShards` flag in the returned | |
218 output. `HasMoreShards` is set to `True` when there is more | |
219 data available. | |
220 | |
221 `DescribeStream` is a paginated operation. If there are more | |
222 shards available, you can request them using the shard ID of | |
223 the last shard returned. Specify this ID in the | |
224 `ExclusiveStartShardId` parameter in a subsequent request to | |
225 `DescribeStream`. | |
226 | |
227 `DescribeStream` has a limit of 10 transactions per second per | |
228 account. | |
229 | |
230 :type stream_name: string | |
231 :param stream_name: The name of the stream to describe. | |
232 | |
233 :type limit: integer | |
234 :param limit: The maximum number of shards to return. | |
235 | |
236 :type exclusive_start_shard_id: string | |
237 :param exclusive_start_shard_id: The shard ID of the shard to start | |
238 with. | |
239 | |
240 """ | |
241 params = {'StreamName': stream_name, } | |
242 if limit is not None: | |
243 params['Limit'] = limit | |
244 if exclusive_start_shard_id is not None: | |
245 params['ExclusiveStartShardId'] = exclusive_start_shard_id | |
246 return self.make_request(action='DescribeStream', | |
247 body=json.dumps(params)) | |
248 | |
249 def get_records(self, shard_iterator, limit=None, b64_decode=True): | |
250 """ | |
251 Gets data records from a shard. | |
252 | |
253 Specify a shard iterator using the `ShardIterator` parameter. | |
254 The shard iterator specifies the position in the shard from | |
255 which you want to start reading data records sequentially. If | |
256 there are no records available in the portion of the shard | |
257 that the iterator points to, `GetRecords` returns an empty | |
258 list. Note that it might take multiple calls to get to a | |
259 portion of the shard that contains records. | |
260 | |
261 You can scale by provisioning multiple shards. Your | |
262 application should have one thread per shard, each reading | |
263 continuously from its stream. To read from a stream | |
264 continually, call `GetRecords` in a loop. Use GetShardIterator | |
265 to get the shard iterator to specify in the first `GetRecords` | |
266 call. `GetRecords` returns a new shard iterator in | |
267 `NextShardIterator`. Specify the shard iterator returned in | |
268 `NextShardIterator` in subsequent calls to `GetRecords`. Note | |
269 that if the shard has been closed, the shard iterator can't | |
270 return more data and `GetRecords` returns `null` in | |
271 `NextShardIterator`. You can terminate the loop when the shard | |
272 is closed, or when the shard iterator reaches the record with | |
273 the sequence number or other attribute that marks it as the | |
274 last record to process. | |
275 | |
276 Each data record can be up to 50 KB in size, and each shard | |
277 can read up to 2 MB per second. You can ensure that your calls | |
278 don't exceed the maximum supported size or throughput by using | |
279 the `Limit` parameter to specify the maximum number of records | |
280 that `GetRecords` can return. Consider your average record | |
281 size when determining this limit. For example, if your average | |
282 record size is 40 KB, you can limit the data returned to about | |
283 1 MB per call by specifying 25 as the limit. | |
284 | |
285 The size of the data returned by `GetRecords` will vary | |
286 depending on the utilization of the shard. The maximum size of | |
287 data that `GetRecords` can return is 10 MB. If a call returns | |
288 10 MB of data, subsequent calls made within the next 5 seconds | |
289 throw `ProvisionedThroughputExceededException`. If there is | |
290 insufficient provisioned throughput on the shard, subsequent | |
291 calls made within the next 1 second throw | |
292 `ProvisionedThroughputExceededException`. Note that | |
293 `GetRecords` won't return any data when it throws an | |
294 exception. For this reason, we recommend that you wait one | |
295 second between calls to `GetRecords`; however, it's possible | |
296 that the application will get exceptions for longer than 1 | |
297 second. | |
298 | |
299 To detect whether the application is falling behind in | |
300 processing, add a timestamp to your records and note how long | |
301 it takes to process them. You can also monitor how much data | |
302 is in a stream using the CloudWatch metrics for write | |
303 operations ( `PutRecord` and `PutRecords`). For more | |
304 information, see `Monitoring Amazon Kinesis with Amazon | |
305 CloudWatch`_ in the Amazon Kinesis Developer Guide . | |
306 | |
307 :type shard_iterator: string | |
308 :param shard_iterator: The position in the shard from which you want to | |
309 start sequentially reading data records. A shard iterator specifies | |
310 this position using the sequence number of a data record in the | |
311 shard. | |
312 | |
313 :type limit: integer | |
314 :param limit: The maximum number of records to return. Specify a value | |
315 of up to 10,000. If you specify a value that is greater than | |
316 10,000, `GetRecords` throws `InvalidArgumentException`. | |
317 | |
318 :type b64_decode: boolean | |
319 :param b64_decode: Decode the Base64-encoded ``Data`` field of records. | |
320 | |
321 """ | |
322 params = {'ShardIterator': shard_iterator, } | |
323 if limit is not None: | |
324 params['Limit'] = limit | |
325 | |
326 response = self.make_request(action='GetRecords', | |
327 body=json.dumps(params)) | |
328 | |
329 # Base64 decode the data | |
330 if b64_decode: | |
331 for record in response.get('Records', []): | |
332 record['Data'] = base64.b64decode( | |
333 record['Data'].encode('utf-8')).decode('utf-8') | |
334 | |
335 return response | |
336 | |
337 def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, | |
338 starting_sequence_number=None): | |
339 """ | |
340 Gets a shard iterator. A shard iterator expires five minutes | |
341 after it is returned to the requester. | |
342 | |
343 A shard iterator specifies the position in the shard from | |
344 which to start reading data records sequentially. A shard | |
345 iterator specifies this position using the sequence number of | |
346 a data record in a shard. A sequence number is the identifier | |
347 associated with every record ingested in the Amazon Kinesis | |
348 stream. The sequence number is assigned when a record is put | |
349 into the stream. | |
350 | |
351 You must specify the shard iterator type. For example, you can | |
352 set the `ShardIteratorType` parameter to read exactly from the | |
353 position denoted by a specific sequence number by using the | |
354 `AT_SEQUENCE_NUMBER` shard iterator type, or right after the | |
355 sequence number by using the `AFTER_SEQUENCE_NUMBER` shard | |
356 iterator type, using sequence numbers returned by earlier | |
357 calls to PutRecord, PutRecords, GetRecords, or DescribeStream. | |
358 You can specify the shard iterator type `TRIM_HORIZON` in the | |
359 request to cause `ShardIterator` to point to the last | |
360 untrimmed record in the shard in the system, which is the | |
361 oldest data record in the shard. Or you can point to just | |
362 after the most recent record in the shard, by using the shard | |
363 iterator type `LATEST`, so that you always read the most | |
364 recent data in the shard. | |
365 | |
366 When you repeatedly read from an Amazon Kinesis stream use a | |
367 GetShardIterator request to get the first shard iterator to to | |
368 use in your first `GetRecords` request and then use the shard | |
369 iterator returned by the `GetRecords` request in | |
370 `NextShardIterator` for subsequent reads. A new shard iterator | |
371 is returned by every `GetRecords` request in | |
372 `NextShardIterator`, which you use in the `ShardIterator` | |
373 parameter of the next `GetRecords` request. | |
374 | |
375 If a `GetShardIterator` request is made too often, you receive | |
376 a `ProvisionedThroughputExceededException`. For more | |
377 information about throughput limits, see GetRecords. | |
378 | |
379 If the shard is closed, the iterator can't return more data, | |
380 and `GetShardIterator` returns `null` for its `ShardIterator`. | |
381 A shard can be closed using SplitShard or MergeShards. | |
382 | |
383 `GetShardIterator` has a limit of 5 transactions per second | |
384 per account per open shard. | |
385 | |
386 :type stream_name: string | |
387 :param stream_name: The name of the stream. | |
388 | |
389 :type shard_id: string | |
390 :param shard_id: The shard ID of the shard to get the iterator for. | |
391 | |
392 :type shard_iterator_type: string | |
393 :param shard_iterator_type: | |
394 Determines how the shard iterator is used to start reading data records | |
395 from the shard. | |
396 | |
397 The following are the valid shard iterator types: | |
398 | |
399 | |
400 + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted | |
401 by a specific sequence number. | |
402 + AFTER_SEQUENCE_NUMBER - Start reading right after the position | |
403 denoted by a specific sequence number. | |
404 + TRIM_HORIZON - Start reading at the last untrimmed record in the | |
405 shard in the system, which is the oldest data record in the shard. | |
406 + LATEST - Start reading just after the most recent record in the | |
407 shard, so that you always read the most recent data in the shard. | |
408 | |
409 :type starting_sequence_number: string | |
410 :param starting_sequence_number: The sequence number of the data record | |
411 in the shard from which to start reading from. | |
412 | |
413 :returns: A dictionary containing: | |
414 | |
415 1) a `ShardIterator` with the value being the shard-iterator object | |
416 """ | |
417 | |
418 params = { | |
419 'StreamName': stream_name, | |
420 'ShardId': shard_id, | |
421 'ShardIteratorType': shard_iterator_type, | |
422 } | |
423 if starting_sequence_number is not None: | |
424 params['StartingSequenceNumber'] = starting_sequence_number | |
425 return self.make_request(action='GetShardIterator', | |
426 body=json.dumps(params)) | |
427 | |
428 def list_streams(self, limit=None, exclusive_start_stream_name=None): | |
429 """ | |
430 Lists your streams. | |
431 | |
432 The number of streams may be too large to return from a single | |
433 call to `ListStreams`. You can limit the number of returned | |
434 streams using the `Limit` parameter. If you do not specify a | |
435 value for the `Limit` parameter, Amazon Kinesis uses the | |
436 default limit, which is currently 10. | |
437 | |
438 You can detect if there are more streams available to list by | |
439 using the `HasMoreStreams` flag from the returned output. If | |
440 there are more streams available, you can request more streams | |
441 by using the name of the last stream returned by the | |
442 `ListStreams` request in the `ExclusiveStartStreamName` | |
443 parameter in a subsequent request to `ListStreams`. The group | |
444 of stream names returned by the subsequent request is then | |
445 added to the list. You can continue this process until all the | |
446 stream names have been collected in the list. | |
447 | |
448 `ListStreams` has a limit of 5 transactions per second per | |
449 account. | |
450 | |
451 :type limit: integer | |
452 :param limit: The maximum number of streams to list. | |
453 | |
454 :type exclusive_start_stream_name: string | |
455 :param exclusive_start_stream_name: The name of the stream to start the | |
456 list with. | |
457 | |
458 """ | |
459 params = {} | |
460 if limit is not None: | |
461 params['Limit'] = limit | |
462 if exclusive_start_stream_name is not None: | |
463 params['ExclusiveStartStreamName'] = exclusive_start_stream_name | |
464 return self.make_request(action='ListStreams', | |
465 body=json.dumps(params)) | |
466 | |
467 def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None, | |
468 limit=None): | |
469 """ | |
470 Lists the tags for the specified Amazon Kinesis stream. | |
471 | |
472 :type stream_name: string | |
473 :param stream_name: The name of the stream. | |
474 | |
475 :type exclusive_start_tag_key: string | |
476 :param exclusive_start_tag_key: The key to use as the starting point | |
477 for the list of tags. If this parameter is set, `ListTagsForStream` | |
478 gets all tags that occur after `ExclusiveStartTagKey`. | |
479 | |
480 :type limit: integer | |
481 :param limit: The number of tags to return. If this number is less than | |
482 the total number of tags associated with the stream, `HasMoreTags` | |
483 is set to `True`. To list additional tags, set | |
484 `ExclusiveStartTagKey` to the last key in the response. | |
485 | |
486 """ | |
487 params = {'StreamName': stream_name, } | |
488 if exclusive_start_tag_key is not None: | |
489 params['ExclusiveStartTagKey'] = exclusive_start_tag_key | |
490 if limit is not None: | |
491 params['Limit'] = limit | |
492 return self.make_request(action='ListTagsForStream', | |
493 body=json.dumps(params)) | |
494 | |
495 def merge_shards(self, stream_name, shard_to_merge, | |
496 adjacent_shard_to_merge): | |
497 """ | |
498 Merges two adjacent shards in a stream and combines them into | |
499 a single shard to reduce the stream's capacity to ingest and | |
500 transport data. Two shards are considered adjacent if the | |
501 union of the hash key ranges for the two shards form a | |
502 contiguous set with no gaps. For example, if you have two | |
503 shards, one with a hash key range of 276...381 and the other | |
504 with a hash key range of 382...454, then you could merge these | |
505 two shards into a single shard that would have a hash key | |
506 range of 276...454. After the merge, the single child shard | |
507 receives data for all hash key values covered by the two | |
508 parent shards. | |
509 | |
510 `MergeShards` is called when there is a need to reduce the | |
511 overall capacity of a stream because of excess capacity that | |
512 is not being used. You must specify the shard to be merged and | |
513 the adjacent shard for a stream. For more information about | |
514 merging shards, see `Merge Two Shards`_ in the Amazon Kinesis | |
515 Developer Guide . | |
516 | |
517 If the stream is in the `ACTIVE` state, you can call | |
518 `MergeShards`. If a stream is in the `CREATING`, `UPDATING`, | |
519 or `DELETING` state, `MergeShards` returns a | |
520 `ResourceInUseException`. If the specified stream does not | |
521 exist, `MergeShards` returns a `ResourceNotFoundException`. | |
522 | |
523 You can use DescribeStream to check the state of the stream, | |
524 which is returned in `StreamStatus`. | |
525 | |
526 `MergeShards` is an asynchronous operation. Upon receiving a | |
527 `MergeShards` request, Amazon Kinesis immediately returns a | |
528 response and sets the `StreamStatus` to `UPDATING`. After the | |
529 operation is completed, Amazon Kinesis sets the `StreamStatus` | |
530 to `ACTIVE`. Read and write operations continue to work while | |
531 the stream is in the `UPDATING` state. | |
532 | |
533 You use DescribeStream to determine the shard IDs that are | |
534 specified in the `MergeShards` request. | |
535 | |
536 If you try to operate on too many streams in parallel using | |
537 CreateStream, DeleteStream, `MergeShards` or SplitShard, you | |
538 will receive a `LimitExceededException`. | |
539 | |
540 `MergeShards` has limit of 5 transactions per second per | |
541 account. | |
542 | |
543 :type stream_name: string | |
544 :param stream_name: The name of the stream for the merge. | |
545 | |
546 :type shard_to_merge: string | |
547 :param shard_to_merge: The shard ID of the shard to combine with the | |
548 adjacent shard for the merge. | |
549 | |
550 :type adjacent_shard_to_merge: string | |
551 :param adjacent_shard_to_merge: The shard ID of the adjacent shard for | |
552 the merge. | |
553 | |
554 """ | |
555 params = { | |
556 'StreamName': stream_name, | |
557 'ShardToMerge': shard_to_merge, | |
558 'AdjacentShardToMerge': adjacent_shard_to_merge, | |
559 } | |
560 return self.make_request(action='MergeShards', | |
561 body=json.dumps(params)) | |
562 | |
563 def put_record(self, stream_name, data, partition_key, | |
564 explicit_hash_key=None, | |
565 sequence_number_for_ordering=None, | |
566 exclusive_minimum_sequence_number=None, | |
567 b64_encode=True): | |
568 """ | |
569 This operation puts a data record into an Amazon Kinesis | |
570 stream from a producer. This operation must be called to send | |
571 data from the producer into the Amazon Kinesis stream for | |
572 real-time ingestion and subsequent processing. The `PutRecord` | |
573 operation requires the name of the stream that captures, | |
574 stores, and transports the data; a partition key; and the data | |
575 blob itself. The data blob could be a segment from a log file, | |
576 geographic/location data, website clickstream data, or any | |
577 other data type. | |
578 | |
579 The partition key is used to distribute data across shards. | |
580 Amazon Kinesis segregates the data records that belong to a | |
581 data stream into multiple shards, using the partition key | |
582 associated with each data record to determine which shard a | |
583 given data record belongs to. | |
584 | |
585 Partition keys are Unicode strings, with a maximum length | |
586 limit of 256 bytes. An MD5 hash function is used to map | |
587 partition keys to 128-bit integer values and to map associated | |
588 data records to shards using the hash key ranges of the | |
589 shards. You can override hashing the partition key to | |
590 determine the shard by explicitly specifying a hash value | |
591 using the `ExplicitHashKey` parameter. For more information, | |
592 see the `Amazon Kinesis Developer Guide`_. | |
593 | |
594 `PutRecord` returns the shard ID of where the data record was | |
595 placed and the sequence number that was assigned to the data | |
596 record. | |
597 | |
598 Sequence numbers generally increase over time. To guarantee | |
599 strictly increasing ordering, use the | |
600 `SequenceNumberForOrdering` parameter. For more information, | |
601 see the `Amazon Kinesis Developer Guide`_. | |
602 | |
603 If a `PutRecord` request cannot be processed because of | |
604 insufficient provisioned throughput on the shard involved in | |
605 the request, `PutRecord` throws | |
606 `ProvisionedThroughputExceededException`. | |
607 | |
608 Data records are accessible for only 24 hours from the time | |
609 that they are added to an Amazon Kinesis stream. | |
610 | |
611 :type stream_name: string | |
612 :param stream_name: The name of the stream to put the data record into. | |
613 | |
614 :type data: blob | |
615 :param data: The data blob to put into the record, which is | |
616 Base64-encoded when the blob is serialized. | |
617 The maximum size of the data blob (the payload after | |
618 Base64-decoding) is 50 kilobytes (KB) | |
619 Set `b64_encode` to disable automatic Base64 encoding. | |
620 | |
621 :type partition_key: string | |
622 :param partition_key: Determines which shard in the stream the data | |
623 record is assigned to. Partition keys are Unicode strings with a | |
624 maximum length limit of 256 bytes. Amazon Kinesis uses the | |
625 partition key as input to a hash function that maps the partition | |
626 key and associated data to a specific shard. Specifically, an MD5 | |
627 hash function is used to map partition keys to 128-bit integer | |
628 values and to map associated data records to shards. As a result of | |
629 this hashing mechanism, all data records with the same partition | |
630 key will map to the same shard within the stream. | |
631 | |
632 :type explicit_hash_key: string | |
633 :param explicit_hash_key: The hash value used to explicitly determine | |
634 the shard the data record is assigned to by overriding the | |
635 partition key hash. | |
636 | |
637 :type sequence_number_for_ordering: string | |
638 :param sequence_number_for_ordering: Guarantees strictly increasing | |
639 sequence numbers, for puts from the same client and to the same | |
640 partition key. Usage: set the `SequenceNumberForOrdering` of record | |
641 n to the sequence number of record n-1 (as returned in the | |
642 PutRecordResult when putting record n-1 ). If this parameter is not | |
643 set, records will be coarsely ordered based on arrival time. | |
644 | |
645 :type b64_encode: boolean | |
646 :param b64_encode: Whether to Base64 encode `data`. Can be set to | |
647 ``False`` if `data` is already encoded to prevent double encoding. | |
648 | |
649 """ | |
650 params = { | |
651 'StreamName': stream_name, | |
652 'Data': data, | |
653 'PartitionKey': partition_key, | |
654 } | |
655 if explicit_hash_key is not None: | |
656 params['ExplicitHashKey'] = explicit_hash_key | |
657 if sequence_number_for_ordering is not None: | |
658 params['SequenceNumberForOrdering'] = sequence_number_for_ordering | |
659 if b64_encode: | |
660 if not isinstance(params['Data'], six.binary_type): | |
661 params['Data'] = params['Data'].encode('utf-8') | |
662 params['Data'] = base64.b64encode(params['Data']).decode('utf-8') | |
663 return self.make_request(action='PutRecord', | |
664 body=json.dumps(params)) | |
665 | |
666 def put_records(self, records, stream_name, b64_encode=True): | |
667 """ | |
668 Puts (writes) multiple data records from a producer into an | |
669 Amazon Kinesis stream in a single call (also referred to as a | |
670 `PutRecords` request). Use this operation to send data from a | |
671 data producer into the Amazon Kinesis stream for real-time | |
672 ingestion and processing. Each shard can support up to 1000 | |
673 records written per second, up to a maximum total of 1 MB data | |
674 written per second. | |
675 | |
676 You must specify the name of the stream that captures, stores, | |
677 and transports the data; and an array of request `Records`, | |
678 with each record in the array requiring a partition key and | |
679 data blob. | |
680 | |
681 The data blob can be any type of data; for example, a segment | |
682 from a log file, geographic/location data, website clickstream | |
683 data, and so on. | |
684 | |
685 The partition key is used by Amazon Kinesis as input to a hash | |
686 function that maps the partition key and associated data to a | |
687 specific shard. An MD5 hash function is used to map partition | |
688 keys to 128-bit integer values and to map associated data | |
689 records to shards. As a result of this hashing mechanism, all | |
690 data records with the same partition key map to the same shard | |
691 within the stream. For more information, see `Partition Key`_ | |
692 in the Amazon Kinesis Developer Guide . | |
693 | |
694 Each record in the `Records` array may include an optional | |
695 parameter, `ExplicitHashKey`, which overrides the partition | |
696 key to shard mapping. This parameter allows a data producer to | |
697 determine explicitly the shard where the record is stored. For | |
698 more information, see `Adding Multiple Records with | |
699 PutRecords`_ in the Amazon Kinesis Developer Guide . | |
700 | |
701 The `PutRecords` response includes an array of response | |
702 `Records`. Each record in the response array directly | |
703 correlates with a record in the request array using natural | |
704 ordering, from the top to the bottom of the request and | |
705 response. The response `Records` array always includes the | |
706 same number of records as the request array. | |
707 | |
708 The response `Records` array includes both successfully and | |
709 unsuccessfully processed records. Amazon Kinesis attempts to | |
710 process all records in each `PutRecords` request. A single | |
711 record failure does not stop the processing of subsequent | |
712 records. | |
713 | |
714 A successfully-processed record includes `ShardId` and | |
715 `SequenceNumber` values. The `ShardId` parameter identifies | |
716 the shard in the stream where the record is stored. The | |
717 `SequenceNumber` parameter is an identifier assigned to the | |
718 put record, unique to all records in the stream. | |
719 | |
720 An unsuccessfully-processed record includes `ErrorCode` and | |
721 `ErrorMessage` values. `ErrorCode` reflects the type of error | |
722 and can be one of the following values: | |
723 `ProvisionedThroughputExceededException` or `InternalFailure`. | |
724 `ErrorMessage` provides more detailed information about the | |
725 `ProvisionedThroughputExceededException` exception including | |
726 the account ID, stream name, and shard ID of the record that | |
727 was throttled. | |
728 | |
729 Data records are accessible for only 24 hours from the time | |
730 that they are added to an Amazon Kinesis stream. | |
731 | |
732 :type records: list | |
733 :param records: The records associated with the request. | |
734 | |
735 :type stream_name: string | |
736 :param stream_name: The stream name associated with the request. | |
737 | |
738 :type b64_encode: boolean | |
739 :param b64_encode: Whether to Base64 encode `data`. Can be set to | |
740 ``False`` if `data` is already encoded to prevent double encoding. | |
741 | |
742 """ | |
743 params = {'Records': records, 'StreamName': stream_name, } | |
744 if b64_encode: | |
745 for i in range(len(params['Records'])): | |
746 data = params['Records'][i]['Data'] | |
747 if not isinstance(data, six.binary_type): | |
748 data = data.encode('utf-8') | |
749 params['Records'][i]['Data'] = base64.b64encode( | |
750 data).decode('utf-8') | |
751 return self.make_request(action='PutRecords', | |
752 body=json.dumps(params)) | |
753 | |
754 def remove_tags_from_stream(self, stream_name, tag_keys): | |
755 """ | |
756 Deletes tags from the specified Amazon Kinesis stream. | |
757 | |
758 If you specify a tag that does not exist, it is ignored. | |
759 | |
760 :type stream_name: string | |
761 :param stream_name: The name of the stream. | |
762 | |
763 :type tag_keys: list | |
764 :param tag_keys: A list of tag keys. Each corresponding tag is removed | |
765 from the stream. | |
766 | |
767 """ | |
768 params = {'StreamName': stream_name, 'TagKeys': tag_keys, } | |
769 return self.make_request(action='RemoveTagsFromStream', | |
770 body=json.dumps(params)) | |
771 | |
772 def split_shard(self, stream_name, shard_to_split, new_starting_hash_key): | |
773 """ | |
774 Splits a shard into two new shards in the stream, to increase | |
775 the stream's capacity to ingest and transport data. | |
776 `SplitShard` is called when there is a need to increase the | |
777 overall capacity of stream because of an expected increase in | |
778 the volume of data records being ingested. | |
779 | |
780 You can also use `SplitShard` when a shard appears to be | |
781 approaching its maximum utilization, for example, when the set | |
782 of producers sending data into the specific shard are suddenly | |
783 sending more than previously anticipated. You can also call | |
784 `SplitShard` to increase stream capacity, so that more Amazon | |
785 Kinesis applications can simultaneously read data from the | |
786 stream for real-time processing. | |
787 | |
788 You must specify the shard to be split and the new hash key, | |
789 which is the position in the shard where the shard gets split | |
790 in two. In many cases, the new hash key might simply be the | |
791 average of the beginning and ending hash key, but it can be | |
792 any hash key value in the range being mapped into the shard. | |
793 For more information about splitting shards, see `Split a | |
794 Shard`_ in the Amazon Kinesis Developer Guide . | |
795 | |
796 You can use DescribeStream to determine the shard ID and hash | |
797 key values for the `ShardToSplit` and `NewStartingHashKey` | |
798 parameters that are specified in the `SplitShard` request. | |
799 | |
800 `SplitShard` is an asynchronous operation. Upon receiving a | |
801 `SplitShard` request, Amazon Kinesis immediately returns a | |
802 response and sets the stream status to `UPDATING`. After the | |
803 operation is completed, Amazon Kinesis sets the stream status | |
804 to `ACTIVE`. Read and write operations continue to work while | |
805 the stream is in the `UPDATING` state. | |
806 | |
807 You can use `DescribeStream` to check the status of the | |
808 stream, which is returned in `StreamStatus`. If the stream is | |
809 in the `ACTIVE` state, you can call `SplitShard`. If a stream | |
810 is in `CREATING` or `UPDATING` or `DELETING` states, | |
811 `DescribeStream` returns a `ResourceInUseException`. | |
812 | |
813 If the specified stream does not exist, `DescribeStream` | |
814 returns a `ResourceNotFoundException`. If you try to create | |
815 more shards than are authorized for your account, you receive | |
816 a `LimitExceededException`. | |
817 | |
818 The default limit for an AWS account is 10 shards per stream. | |
819 If you need to create a stream with more than 10 shards, | |
820 `contact AWS Support`_ to increase the limit on your account. | |
821 | |
822 If you try to operate on too many streams in parallel using | |
823 CreateStream, DeleteStream, MergeShards or SplitShard, you | |
824 receive a `LimitExceededException`. | |
825 | |
826 `SplitShard` has limit of 5 transactions per second per | |
827 account. | |
828 | |
829 :type stream_name: string | |
830 :param stream_name: The name of the stream for the shard split. | |
831 | |
832 :type shard_to_split: string | |
833 :param shard_to_split: The shard ID of the shard to split. | |
834 | |
835 :type new_starting_hash_key: string | |
836 :param new_starting_hash_key: A hash key value for the starting hash | |
837 key of one of the child shards created by the split. The hash key | |
838 range for a given shard constitutes a set of ordered contiguous | |
839 positive integers. The value for `NewStartingHashKey` must be in | |
840 the range of hash keys being mapped into the shard. The | |
841 `NewStartingHashKey` hash key value and all higher hash key values | |
842 in hash key range are distributed to one of the child shards. All | |
843 the lower hash key values in the range are distributed to the other | |
844 child shard. | |
845 | |
846 """ | |
847 params = { | |
848 'StreamName': stream_name, | |
849 'ShardToSplit': shard_to_split, | |
850 'NewStartingHashKey': new_starting_hash_key, | |
851 } | |
852 return self.make_request(action='SplitShard', | |
853 body=json.dumps(params)) | |
854 | |
855 def make_request(self, action, body): | |
856 headers = { | |
857 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action), | |
858 'Host': self.region.endpoint, | |
859 'Content-Type': 'application/x-amz-json-1.1', | |
860 'Content-Length': str(len(body)), | |
861 } | |
862 http_request = self.build_base_http_request( | |
863 method='POST', path='/', auth_path='/', params={}, | |
864 headers=headers, data=body) | |
865 response = self._mexe(http_request, sender=None, | |
866 override_num_retries=10) | |
867 response_body = response.read().decode('utf-8') | |
868 boto.log.debug(response.getheaders()) | |
869 boto.log.debug(response_body) | |
870 if response.status == 200: | |
871 if response_body: | |
872 return json.loads(response_body) | |
873 else: | |
874 json_body = json.loads(response_body) | |
875 fault_name = json_body.get('__type', None) | |
876 exception_class = self._faults.get(fault_name, self.ResponseError) | |
877 raise exception_class(response.status, response.reason, | |
878 body=json_body) | |
879 |