Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/boto/kinesis/layer1.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/boto/kinesis/layer1.py Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,879 @@ +# Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, dis- +# tribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the fol- +# lowing conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# + +import base64 +import boto + +from boto.connection import AWSQueryConnection +from boto.regioninfo import RegionInfo +from boto.exception import JSONResponseError +from boto.kinesis import exceptions +from boto.compat import json +from boto.compat import six + + +class KinesisConnection(AWSQueryConnection): + """ + Amazon Kinesis Service API Reference + Amazon Kinesis is a managed service that scales elastically for + real time processing of streaming big data. + """ + APIVersion = "2013-12-02" + DefaultRegionName = "us-east-1" + DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com" + ServiceName = "Kinesis" + TargetPrefix = "Kinesis_20131202" + ResponseError = JSONResponseError + + _faults = { + "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException, + "LimitExceededException": exceptions.LimitExceededException, + "ExpiredIteratorException": exceptions.ExpiredIteratorException, + "ResourceInUseException": exceptions.ResourceInUseException, + "ResourceNotFoundException": exceptions.ResourceNotFoundException, + "InvalidArgumentException": exceptions.InvalidArgumentException, + "SubscriptionRequiredException": exceptions.SubscriptionRequiredException + } + + + def __init__(self, **kwargs): + region = kwargs.pop('region', None) + if not region: + region = RegionInfo(self, self.DefaultRegionName, + self.DefaultRegionEndpoint) + if 'host' not in kwargs: + kwargs['host'] = region.endpoint + super(KinesisConnection, self).__init__(**kwargs) + self.region = region + + def _required_auth_capability(self): + return ['hmac-v4'] + + def add_tags_to_stream(self, stream_name, tags): + """ + Adds or updates tags for the specified Amazon Kinesis stream. + Each stream can have up to 10 tags. + + If tags have already been assigned to the stream, + `AddTagsToStream` overwrites any existing tags that correspond + to the specified tag keys. + + :type stream_name: string + :param stream_name: The name of the stream. + + :type tags: map + :param tags: The set of key-value pairs to use to create the tags. + + """ + params = {'StreamName': stream_name, 'Tags': tags, } + return self.make_request(action='AddTagsToStream', + body=json.dumps(params)) + + def create_stream(self, stream_name, shard_count): + """ + Creates a Amazon Kinesis stream. A stream captures and + transports data records that are continuously emitted from + different data sources or producers . Scale-out within an + Amazon Kinesis stream is explicitly supported by means of + shards, which are uniquely identified groups of data records + in an Amazon Kinesis stream. + + You specify and control the number of shards that a stream is + composed of. Each open shard can support up to 5 read + transactions per second, up to a maximum total of 2 MB of data + read per second. Each shard can support up to 1000 records + written per second, up to a maximum total of 1 MB data written + per second. You can add shards to a stream if the amount of + data input increases and you can remove shards if the amount + of data input decreases. + + The stream name identifies the stream. The name is scoped to + the AWS account used by the application. It is also scoped by + region. That is, two streams in two different accounts can + have the same name, and two streams in the same account, but + in two different regions, can have the same name. + + `CreateStream` is an asynchronous operation. Upon receiving a + `CreateStream` request, Amazon Kinesis immediately returns and + sets the stream status to `CREATING`. After the stream is + created, Amazon Kinesis sets the stream status to `ACTIVE`. + You should perform read and write operations only on an + `ACTIVE` stream. + + You receive a `LimitExceededException` when making a + `CreateStream` request if you try to do one of the following: + + + + Have more than five streams in the `CREATING` state at any + point in time. + + Create more shards than are authorized for your account. + + + The default limit for an AWS account is 10 shards per stream. + If you need to create a stream with more than 10 shards, + `contact AWS Support`_ to increase the limit on your account. + + You can use `DescribeStream` to check the stream status, which + is returned in `StreamStatus`. + + `CreateStream` has a limit of 5 transactions per second per + account. + + :type stream_name: string + :param stream_name: A name to identify the stream. The stream name is + scoped to the AWS account used by the application that creates the + stream. It is also scoped by region. That is, two streams in two + different AWS accounts can have the same name, and two streams in + the same AWS account, but in two different regions, can have the + same name. + + :type shard_count: integer + :param shard_count: The number of shards that the stream will use. The + throughput of the stream is a function of the number of shards; + more shards are required for greater provisioned throughput. + **Note:** The default limit for an AWS account is 10 shards per stream. + If you need to create a stream with more than 10 shards, `contact + AWS Support`_ to increase the limit on your account. + + """ + params = { + 'StreamName': stream_name, + 'ShardCount': shard_count, + } + return self.make_request(action='CreateStream', + body=json.dumps(params)) + + def delete_stream(self, stream_name): + """ + Deletes a stream and all its shards and data. You must shut + down any applications that are operating on the stream before + you delete the stream. If an application attempts to operate + on a deleted stream, it will receive the exception + `ResourceNotFoundException`. + + If the stream is in the `ACTIVE` state, you can delete it. + After a `DeleteStream` request, the specified stream is in the + `DELETING` state until Amazon Kinesis completes the deletion. + + **Note:** Amazon Kinesis might continue to accept data read + and write operations, such as PutRecord, PutRecords, and + GetRecords, on a stream in the `DELETING` state until the + stream deletion is complete. + + When you delete a stream, any shards in that stream are also + deleted, and any tags are dissociated from the stream. + + You can use the DescribeStream operation to check the state of + the stream, which is returned in `StreamStatus`. + + `DeleteStream` has a limit of 5 transactions per second per + account. + + :type stream_name: string + :param stream_name: The name of the stream to delete. + + """ + params = {'StreamName': stream_name, } + return self.make_request(action='DeleteStream', + body=json.dumps(params)) + + def describe_stream(self, stream_name, limit=None, + exclusive_start_shard_id=None): + """ + Describes the specified stream. + + The information about the stream includes its current status, + its Amazon Resource Name (ARN), and an array of shard objects. + For each shard object, there is information about the hash key + and sequence number ranges that the shard spans, and the IDs + of any earlier shards that played in a role in creating the + shard. A sequence number is the identifier associated with + every record ingested in the Amazon Kinesis stream. The + sequence number is assigned when a record is put into the + stream. + + You can limit the number of returned shards using the `Limit` + parameter. The number of shards in a stream may be too large + to return from a single call to `DescribeStream`. You can + detect this by using the `HasMoreShards` flag in the returned + output. `HasMoreShards` is set to `True` when there is more + data available. + + `DescribeStream` is a paginated operation. If there are more + shards available, you can request them using the shard ID of + the last shard returned. Specify this ID in the + `ExclusiveStartShardId` parameter in a subsequent request to + `DescribeStream`. + + `DescribeStream` has a limit of 10 transactions per second per + account. + + :type stream_name: string + :param stream_name: The name of the stream to describe. + + :type limit: integer + :param limit: The maximum number of shards to return. + + :type exclusive_start_shard_id: string + :param exclusive_start_shard_id: The shard ID of the shard to start + with. + + """ + params = {'StreamName': stream_name, } + if limit is not None: + params['Limit'] = limit + if exclusive_start_shard_id is not None: + params['ExclusiveStartShardId'] = exclusive_start_shard_id + return self.make_request(action='DescribeStream', + body=json.dumps(params)) + + def get_records(self, shard_iterator, limit=None, b64_decode=True): + """ + Gets data records from a shard. + + Specify a shard iterator using the `ShardIterator` parameter. + The shard iterator specifies the position in the shard from + which you want to start reading data records sequentially. If + there are no records available in the portion of the shard + that the iterator points to, `GetRecords` returns an empty + list. Note that it might take multiple calls to get to a + portion of the shard that contains records. + + You can scale by provisioning multiple shards. Your + application should have one thread per shard, each reading + continuously from its stream. To read from a stream + continually, call `GetRecords` in a loop. Use GetShardIterator + to get the shard iterator to specify in the first `GetRecords` + call. `GetRecords` returns a new shard iterator in + `NextShardIterator`. Specify the shard iterator returned in + `NextShardIterator` in subsequent calls to `GetRecords`. Note + that if the shard has been closed, the shard iterator can't + return more data and `GetRecords` returns `null` in + `NextShardIterator`. You can terminate the loop when the shard + is closed, or when the shard iterator reaches the record with + the sequence number or other attribute that marks it as the + last record to process. + + Each data record can be up to 50 KB in size, and each shard + can read up to 2 MB per second. You can ensure that your calls + don't exceed the maximum supported size or throughput by using + the `Limit` parameter to specify the maximum number of records + that `GetRecords` can return. Consider your average record + size when determining this limit. For example, if your average + record size is 40 KB, you can limit the data returned to about + 1 MB per call by specifying 25 as the limit. + + The size of the data returned by `GetRecords` will vary + depending on the utilization of the shard. The maximum size of + data that `GetRecords` can return is 10 MB. If a call returns + 10 MB of data, subsequent calls made within the next 5 seconds + throw `ProvisionedThroughputExceededException`. If there is + insufficient provisioned throughput on the shard, subsequent + calls made within the next 1 second throw + `ProvisionedThroughputExceededException`. Note that + `GetRecords` won't return any data when it throws an + exception. For this reason, we recommend that you wait one + second between calls to `GetRecords`; however, it's possible + that the application will get exceptions for longer than 1 + second. + + To detect whether the application is falling behind in + processing, add a timestamp to your records and note how long + it takes to process them. You can also monitor how much data + is in a stream using the CloudWatch metrics for write + operations ( `PutRecord` and `PutRecords`). For more + information, see `Monitoring Amazon Kinesis with Amazon + CloudWatch`_ in the Amazon Kinesis Developer Guide . + + :type shard_iterator: string + :param shard_iterator: The position in the shard from which you want to + start sequentially reading data records. A shard iterator specifies + this position using the sequence number of a data record in the + shard. + + :type limit: integer + :param limit: The maximum number of records to return. Specify a value + of up to 10,000. If you specify a value that is greater than + 10,000, `GetRecords` throws `InvalidArgumentException`. + + :type b64_decode: boolean + :param b64_decode: Decode the Base64-encoded ``Data`` field of records. + + """ + params = {'ShardIterator': shard_iterator, } + if limit is not None: + params['Limit'] = limit + + response = self.make_request(action='GetRecords', + body=json.dumps(params)) + + # Base64 decode the data + if b64_decode: + for record in response.get('Records', []): + record['Data'] = base64.b64decode( + record['Data'].encode('utf-8')).decode('utf-8') + + return response + + def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, + starting_sequence_number=None): + """ + Gets a shard iterator. A shard iterator expires five minutes + after it is returned to the requester. + + A shard iterator specifies the position in the shard from + which to start reading data records sequentially. A shard + iterator specifies this position using the sequence number of + a data record in a shard. A sequence number is the identifier + associated with every record ingested in the Amazon Kinesis + stream. The sequence number is assigned when a record is put + into the stream. + + You must specify the shard iterator type. For example, you can + set the `ShardIteratorType` parameter to read exactly from the + position denoted by a specific sequence number by using the + `AT_SEQUENCE_NUMBER` shard iterator type, or right after the + sequence number by using the `AFTER_SEQUENCE_NUMBER` shard + iterator type, using sequence numbers returned by earlier + calls to PutRecord, PutRecords, GetRecords, or DescribeStream. + You can specify the shard iterator type `TRIM_HORIZON` in the + request to cause `ShardIterator` to point to the last + untrimmed record in the shard in the system, which is the + oldest data record in the shard. Or you can point to just + after the most recent record in the shard, by using the shard + iterator type `LATEST`, so that you always read the most + recent data in the shard. + + When you repeatedly read from an Amazon Kinesis stream use a + GetShardIterator request to get the first shard iterator to to + use in your first `GetRecords` request and then use the shard + iterator returned by the `GetRecords` request in + `NextShardIterator` for subsequent reads. A new shard iterator + is returned by every `GetRecords` request in + `NextShardIterator`, which you use in the `ShardIterator` + parameter of the next `GetRecords` request. + + If a `GetShardIterator` request is made too often, you receive + a `ProvisionedThroughputExceededException`. For more + information about throughput limits, see GetRecords. + + If the shard is closed, the iterator can't return more data, + and `GetShardIterator` returns `null` for its `ShardIterator`. + A shard can be closed using SplitShard or MergeShards. + + `GetShardIterator` has a limit of 5 transactions per second + per account per open shard. + + :type stream_name: string + :param stream_name: The name of the stream. + + :type shard_id: string + :param shard_id: The shard ID of the shard to get the iterator for. + + :type shard_iterator_type: string + :param shard_iterator_type: + Determines how the shard iterator is used to start reading data records + from the shard. + + The following are the valid shard iterator types: + + + + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted + by a specific sequence number. + + AFTER_SEQUENCE_NUMBER - Start reading right after the position + denoted by a specific sequence number. + + TRIM_HORIZON - Start reading at the last untrimmed record in the + shard in the system, which is the oldest data record in the shard. + + LATEST - Start reading just after the most recent record in the + shard, so that you always read the most recent data in the shard. + + :type starting_sequence_number: string + :param starting_sequence_number: The sequence number of the data record + in the shard from which to start reading from. + + :returns: A dictionary containing: + + 1) a `ShardIterator` with the value being the shard-iterator object + """ + + params = { + 'StreamName': stream_name, + 'ShardId': shard_id, + 'ShardIteratorType': shard_iterator_type, + } + if starting_sequence_number is not None: + params['StartingSequenceNumber'] = starting_sequence_number + return self.make_request(action='GetShardIterator', + body=json.dumps(params)) + + def list_streams(self, limit=None, exclusive_start_stream_name=None): + """ + Lists your streams. + + The number of streams may be too large to return from a single + call to `ListStreams`. You can limit the number of returned + streams using the `Limit` parameter. If you do not specify a + value for the `Limit` parameter, Amazon Kinesis uses the + default limit, which is currently 10. + + You can detect if there are more streams available to list by + using the `HasMoreStreams` flag from the returned output. If + there are more streams available, you can request more streams + by using the name of the last stream returned by the + `ListStreams` request in the `ExclusiveStartStreamName` + parameter in a subsequent request to `ListStreams`. The group + of stream names returned by the subsequent request is then + added to the list. You can continue this process until all the + stream names have been collected in the list. + + `ListStreams` has a limit of 5 transactions per second per + account. + + :type limit: integer + :param limit: The maximum number of streams to list. + + :type exclusive_start_stream_name: string + :param exclusive_start_stream_name: The name of the stream to start the + list with. + + """ + params = {} + if limit is not None: + params['Limit'] = limit + if exclusive_start_stream_name is not None: + params['ExclusiveStartStreamName'] = exclusive_start_stream_name + return self.make_request(action='ListStreams', + body=json.dumps(params)) + + def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None, + limit=None): + """ + Lists the tags for the specified Amazon Kinesis stream. + + :type stream_name: string + :param stream_name: The name of the stream. + + :type exclusive_start_tag_key: string + :param exclusive_start_tag_key: The key to use as the starting point + for the list of tags. If this parameter is set, `ListTagsForStream` + gets all tags that occur after `ExclusiveStartTagKey`. + + :type limit: integer + :param limit: The number of tags to return. If this number is less than + the total number of tags associated with the stream, `HasMoreTags` + is set to `True`. To list additional tags, set + `ExclusiveStartTagKey` to the last key in the response. + + """ + params = {'StreamName': stream_name, } + if exclusive_start_tag_key is not None: + params['ExclusiveStartTagKey'] = exclusive_start_tag_key + if limit is not None: + params['Limit'] = limit + return self.make_request(action='ListTagsForStream', + body=json.dumps(params)) + + def merge_shards(self, stream_name, shard_to_merge, + adjacent_shard_to_merge): + """ + Merges two adjacent shards in a stream and combines them into + a single shard to reduce the stream's capacity to ingest and + transport data. Two shards are considered adjacent if the + union of the hash key ranges for the two shards form a + contiguous set with no gaps. For example, if you have two + shards, one with a hash key range of 276...381 and the other + with a hash key range of 382...454, then you could merge these + two shards into a single shard that would have a hash key + range of 276...454. After the merge, the single child shard + receives data for all hash key values covered by the two + parent shards. + + `MergeShards` is called when there is a need to reduce the + overall capacity of a stream because of excess capacity that + is not being used. You must specify the shard to be merged and + the adjacent shard for a stream. For more information about + merging shards, see `Merge Two Shards`_ in the Amazon Kinesis + Developer Guide . + + If the stream is in the `ACTIVE` state, you can call + `MergeShards`. If a stream is in the `CREATING`, `UPDATING`, + or `DELETING` state, `MergeShards` returns a + `ResourceInUseException`. If the specified stream does not + exist, `MergeShards` returns a `ResourceNotFoundException`. + + You can use DescribeStream to check the state of the stream, + which is returned in `StreamStatus`. + + `MergeShards` is an asynchronous operation. Upon receiving a + `MergeShards` request, Amazon Kinesis immediately returns a + response and sets the `StreamStatus` to `UPDATING`. After the + operation is completed, Amazon Kinesis sets the `StreamStatus` + to `ACTIVE`. Read and write operations continue to work while + the stream is in the `UPDATING` state. + + You use DescribeStream to determine the shard IDs that are + specified in the `MergeShards` request. + + If you try to operate on too many streams in parallel using + CreateStream, DeleteStream, `MergeShards` or SplitShard, you + will receive a `LimitExceededException`. + + `MergeShards` has limit of 5 transactions per second per + account. + + :type stream_name: string + :param stream_name: The name of the stream for the merge. + + :type shard_to_merge: string + :param shard_to_merge: The shard ID of the shard to combine with the + adjacent shard for the merge. + + :type adjacent_shard_to_merge: string + :param adjacent_shard_to_merge: The shard ID of the adjacent shard for + the merge. + + """ + params = { + 'StreamName': stream_name, + 'ShardToMerge': shard_to_merge, + 'AdjacentShardToMerge': adjacent_shard_to_merge, + } + return self.make_request(action='MergeShards', + body=json.dumps(params)) + + def put_record(self, stream_name, data, partition_key, + explicit_hash_key=None, + sequence_number_for_ordering=None, + exclusive_minimum_sequence_number=None, + b64_encode=True): + """ + This operation puts a data record into an Amazon Kinesis + stream from a producer. This operation must be called to send + data from the producer into the Amazon Kinesis stream for + real-time ingestion and subsequent processing. The `PutRecord` + operation requires the name of the stream that captures, + stores, and transports the data; a partition key; and the data + blob itself. The data blob could be a segment from a log file, + geographic/location data, website clickstream data, or any + other data type. + + The partition key is used to distribute data across shards. + Amazon Kinesis segregates the data records that belong to a + data stream into multiple shards, using the partition key + associated with each data record to determine which shard a + given data record belongs to. + + Partition keys are Unicode strings, with a maximum length + limit of 256 bytes. An MD5 hash function is used to map + partition keys to 128-bit integer values and to map associated + data records to shards using the hash key ranges of the + shards. You can override hashing the partition key to + determine the shard by explicitly specifying a hash value + using the `ExplicitHashKey` parameter. For more information, + see the `Amazon Kinesis Developer Guide`_. + + `PutRecord` returns the shard ID of where the data record was + placed and the sequence number that was assigned to the data + record. + + Sequence numbers generally increase over time. To guarantee + strictly increasing ordering, use the + `SequenceNumberForOrdering` parameter. For more information, + see the `Amazon Kinesis Developer Guide`_. + + If a `PutRecord` request cannot be processed because of + insufficient provisioned throughput on the shard involved in + the request, `PutRecord` throws + `ProvisionedThroughputExceededException`. + + Data records are accessible for only 24 hours from the time + that they are added to an Amazon Kinesis stream. + + :type stream_name: string + :param stream_name: The name of the stream to put the data record into. + + :type data: blob + :param data: The data blob to put into the record, which is + Base64-encoded when the blob is serialized. + The maximum size of the data blob (the payload after + Base64-decoding) is 50 kilobytes (KB) + Set `b64_encode` to disable automatic Base64 encoding. + + :type partition_key: string + :param partition_key: Determines which shard in the stream the data + record is assigned to. Partition keys are Unicode strings with a + maximum length limit of 256 bytes. Amazon Kinesis uses the + partition key as input to a hash function that maps the partition + key and associated data to a specific shard. Specifically, an MD5 + hash function is used to map partition keys to 128-bit integer + values and to map associated data records to shards. As a result of + this hashing mechanism, all data records with the same partition + key will map to the same shard within the stream. + + :type explicit_hash_key: string + :param explicit_hash_key: The hash value used to explicitly determine + the shard the data record is assigned to by overriding the + partition key hash. + + :type sequence_number_for_ordering: string + :param sequence_number_for_ordering: Guarantees strictly increasing + sequence numbers, for puts from the same client and to the same + partition key. Usage: set the `SequenceNumberForOrdering` of record + n to the sequence number of record n-1 (as returned in the + PutRecordResult when putting record n-1 ). If this parameter is not + set, records will be coarsely ordered based on arrival time. + + :type b64_encode: boolean + :param b64_encode: Whether to Base64 encode `data`. Can be set to + ``False`` if `data` is already encoded to prevent double encoding. + + """ + params = { + 'StreamName': stream_name, + 'Data': data, + 'PartitionKey': partition_key, + } + if explicit_hash_key is not None: + params['ExplicitHashKey'] = explicit_hash_key + if sequence_number_for_ordering is not None: + params['SequenceNumberForOrdering'] = sequence_number_for_ordering + if b64_encode: + if not isinstance(params['Data'], six.binary_type): + params['Data'] = params['Data'].encode('utf-8') + params['Data'] = base64.b64encode(params['Data']).decode('utf-8') + return self.make_request(action='PutRecord', + body=json.dumps(params)) + + def put_records(self, records, stream_name, b64_encode=True): + """ + Puts (writes) multiple data records from a producer into an + Amazon Kinesis stream in a single call (also referred to as a + `PutRecords` request). Use this operation to send data from a + data producer into the Amazon Kinesis stream for real-time + ingestion and processing. Each shard can support up to 1000 + records written per second, up to a maximum total of 1 MB data + written per second. + + You must specify the name of the stream that captures, stores, + and transports the data; and an array of request `Records`, + with each record in the array requiring a partition key and + data blob. + + The data blob can be any type of data; for example, a segment + from a log file, geographic/location data, website clickstream + data, and so on. + + The partition key is used by Amazon Kinesis as input to a hash + function that maps the partition key and associated data to a + specific shard. An MD5 hash function is used to map partition + keys to 128-bit integer values and to map associated data + records to shards. As a result of this hashing mechanism, all + data records with the same partition key map to the same shard + within the stream. For more information, see `Partition Key`_ + in the Amazon Kinesis Developer Guide . + + Each record in the `Records` array may include an optional + parameter, `ExplicitHashKey`, which overrides the partition + key to shard mapping. This parameter allows a data producer to + determine explicitly the shard where the record is stored. For + more information, see `Adding Multiple Records with + PutRecords`_ in the Amazon Kinesis Developer Guide . + + The `PutRecords` response includes an array of response + `Records`. Each record in the response array directly + correlates with a record in the request array using natural + ordering, from the top to the bottom of the request and + response. The response `Records` array always includes the + same number of records as the request array. + + The response `Records` array includes both successfully and + unsuccessfully processed records. Amazon Kinesis attempts to + process all records in each `PutRecords` request. A single + record failure does not stop the processing of subsequent + records. + + A successfully-processed record includes `ShardId` and + `SequenceNumber` values. The `ShardId` parameter identifies + the shard in the stream where the record is stored. The + `SequenceNumber` parameter is an identifier assigned to the + put record, unique to all records in the stream. + + An unsuccessfully-processed record includes `ErrorCode` and + `ErrorMessage` values. `ErrorCode` reflects the type of error + and can be one of the following values: + `ProvisionedThroughputExceededException` or `InternalFailure`. + `ErrorMessage` provides more detailed information about the + `ProvisionedThroughputExceededException` exception including + the account ID, stream name, and shard ID of the record that + was throttled. + + Data records are accessible for only 24 hours from the time + that they are added to an Amazon Kinesis stream. + + :type records: list + :param records: The records associated with the request. + + :type stream_name: string + :param stream_name: The stream name associated with the request. + + :type b64_encode: boolean + :param b64_encode: Whether to Base64 encode `data`. Can be set to + ``False`` if `data` is already encoded to prevent double encoding. + + """ + params = {'Records': records, 'StreamName': stream_name, } + if b64_encode: + for i in range(len(params['Records'])): + data = params['Records'][i]['Data'] + if not isinstance(data, six.binary_type): + data = data.encode('utf-8') + params['Records'][i]['Data'] = base64.b64encode( + data).decode('utf-8') + return self.make_request(action='PutRecords', + body=json.dumps(params)) + + def remove_tags_from_stream(self, stream_name, tag_keys): + """ + Deletes tags from the specified Amazon Kinesis stream. + + If you specify a tag that does not exist, it is ignored. + + :type stream_name: string + :param stream_name: The name of the stream. + + :type tag_keys: list + :param tag_keys: A list of tag keys. Each corresponding tag is removed + from the stream. + + """ + params = {'StreamName': stream_name, 'TagKeys': tag_keys, } + return self.make_request(action='RemoveTagsFromStream', + body=json.dumps(params)) + + def split_shard(self, stream_name, shard_to_split, new_starting_hash_key): + """ + Splits a shard into two new shards in the stream, to increase + the stream's capacity to ingest and transport data. + `SplitShard` is called when there is a need to increase the + overall capacity of stream because of an expected increase in + the volume of data records being ingested. + + You can also use `SplitShard` when a shard appears to be + approaching its maximum utilization, for example, when the set + of producers sending data into the specific shard are suddenly + sending more than previously anticipated. You can also call + `SplitShard` to increase stream capacity, so that more Amazon + Kinesis applications can simultaneously read data from the + stream for real-time processing. + + You must specify the shard to be split and the new hash key, + which is the position in the shard where the shard gets split + in two. In many cases, the new hash key might simply be the + average of the beginning and ending hash key, but it can be + any hash key value in the range being mapped into the shard. + For more information about splitting shards, see `Split a + Shard`_ in the Amazon Kinesis Developer Guide . + + You can use DescribeStream to determine the shard ID and hash + key values for the `ShardToSplit` and `NewStartingHashKey` + parameters that are specified in the `SplitShard` request. + + `SplitShard` is an asynchronous operation. Upon receiving a + `SplitShard` request, Amazon Kinesis immediately returns a + response and sets the stream status to `UPDATING`. After the + operation is completed, Amazon Kinesis sets the stream status + to `ACTIVE`. Read and write operations continue to work while + the stream is in the `UPDATING` state. + + You can use `DescribeStream` to check the status of the + stream, which is returned in `StreamStatus`. If the stream is + in the `ACTIVE` state, you can call `SplitShard`. If a stream + is in `CREATING` or `UPDATING` or `DELETING` states, + `DescribeStream` returns a `ResourceInUseException`. + + If the specified stream does not exist, `DescribeStream` + returns a `ResourceNotFoundException`. If you try to create + more shards than are authorized for your account, you receive + a `LimitExceededException`. + + The default limit for an AWS account is 10 shards per stream. + If you need to create a stream with more than 10 shards, + `contact AWS Support`_ to increase the limit on your account. + + If you try to operate on too many streams in parallel using + CreateStream, DeleteStream, MergeShards or SplitShard, you + receive a `LimitExceededException`. + + `SplitShard` has limit of 5 transactions per second per + account. + + :type stream_name: string + :param stream_name: The name of the stream for the shard split. + + :type shard_to_split: string + :param shard_to_split: The shard ID of the shard to split. + + :type new_starting_hash_key: string + :param new_starting_hash_key: A hash key value for the starting hash + key of one of the child shards created by the split. The hash key + range for a given shard constitutes a set of ordered contiguous + positive integers. The value for `NewStartingHashKey` must be in + the range of hash keys being mapped into the shard. The + `NewStartingHashKey` hash key value and all higher hash key values + in hash key range are distributed to one of the child shards. All + the lower hash key values in the range are distributed to the other + child shard. + + """ + params = { + 'StreamName': stream_name, + 'ShardToSplit': shard_to_split, + 'NewStartingHashKey': new_starting_hash_key, + } + return self.make_request(action='SplitShard', + body=json.dumps(params)) + + def make_request(self, action, body): + headers = { + 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action), + 'Host': self.region.endpoint, + 'Content-Type': 'application/x-amz-json-1.1', + 'Content-Length': str(len(body)), + } + http_request = self.build_base_http_request( + method='POST', path='/', auth_path='/', params={}, + headers=headers, data=body) + response = self._mexe(http_request, sender=None, + override_num_retries=10) + response_body = response.read().decode('utf-8') + boto.log.debug(response.getheaders()) + boto.log.debug(response_body) + if response.status == 200: + if response_body: + return json.loads(response_body) + else: + json_body = json.loads(response_body) + fault_name = json_body.get('__type', None) + exception_class = self._faults.get(fault_name, self.ResponseError) + raise exception_class(response.status, response.reason, + body=json_body) +