Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/boto/kinesis/layer1.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/boto/kinesis/layer1.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,879 +0,0 @@ -# Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. -# - -import base64 -import boto - -from boto.connection import AWSQueryConnection -from boto.regioninfo import RegionInfo -from boto.exception import JSONResponseError -from boto.kinesis import exceptions -from boto.compat import json -from boto.compat import six - - -class KinesisConnection(AWSQueryConnection): - """ - Amazon Kinesis Service API Reference - Amazon Kinesis is a managed service that scales elastically for - real time processing of streaming big data. - """ - APIVersion = "2013-12-02" - DefaultRegionName = "us-east-1" - DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com" - ServiceName = "Kinesis" - TargetPrefix = "Kinesis_20131202" - ResponseError = JSONResponseError - - _faults = { - "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException, - "LimitExceededException": exceptions.LimitExceededException, - "ExpiredIteratorException": exceptions.ExpiredIteratorException, - "ResourceInUseException": exceptions.ResourceInUseException, - "ResourceNotFoundException": exceptions.ResourceNotFoundException, - "InvalidArgumentException": exceptions.InvalidArgumentException, - "SubscriptionRequiredException": exceptions.SubscriptionRequiredException - } - - - def __init__(self, **kwargs): - region = kwargs.pop('region', None) - if not region: - region = RegionInfo(self, self.DefaultRegionName, - self.DefaultRegionEndpoint) - if 'host' not in kwargs: - kwargs['host'] = region.endpoint - super(KinesisConnection, self).__init__(**kwargs) - self.region = region - - def _required_auth_capability(self): - return ['hmac-v4'] - - def add_tags_to_stream(self, stream_name, tags): - """ - Adds or updates tags for the specified Amazon Kinesis stream. - Each stream can have up to 10 tags. - - If tags have already been assigned to the stream, - `AddTagsToStream` overwrites any existing tags that correspond - to the specified tag keys. - - :type stream_name: string - :param stream_name: The name of the stream. - - :type tags: map - :param tags: The set of key-value pairs to use to create the tags. - - """ - params = {'StreamName': stream_name, 'Tags': tags, } - return self.make_request(action='AddTagsToStream', - body=json.dumps(params)) - - def create_stream(self, stream_name, shard_count): - """ - Creates a Amazon Kinesis stream. A stream captures and - transports data records that are continuously emitted from - different data sources or producers . Scale-out within an - Amazon Kinesis stream is explicitly supported by means of - shards, which are uniquely identified groups of data records - in an Amazon Kinesis stream. - - You specify and control the number of shards that a stream is - composed of. Each open shard can support up to 5 read - transactions per second, up to a maximum total of 2 MB of data - read per second. Each shard can support up to 1000 records - written per second, up to a maximum total of 1 MB data written - per second. You can add shards to a stream if the amount of - data input increases and you can remove shards if the amount - of data input decreases. - - The stream name identifies the stream. The name is scoped to - the AWS account used by the application. It is also scoped by - region. That is, two streams in two different accounts can - have the same name, and two streams in the same account, but - in two different regions, can have the same name. - - `CreateStream` is an asynchronous operation. Upon receiving a - `CreateStream` request, Amazon Kinesis immediately returns and - sets the stream status to `CREATING`. After the stream is - created, Amazon Kinesis sets the stream status to `ACTIVE`. - You should perform read and write operations only on an - `ACTIVE` stream. - - You receive a `LimitExceededException` when making a - `CreateStream` request if you try to do one of the following: - - - + Have more than five streams in the `CREATING` state at any - point in time. - + Create more shards than are authorized for your account. - - - The default limit for an AWS account is 10 shards per stream. - If you need to create a stream with more than 10 shards, - `contact AWS Support`_ to increase the limit on your account. - - You can use `DescribeStream` to check the stream status, which - is returned in `StreamStatus`. - - `CreateStream` has a limit of 5 transactions per second per - account. - - :type stream_name: string - :param stream_name: A name to identify the stream. The stream name is - scoped to the AWS account used by the application that creates the - stream. It is also scoped by region. That is, two streams in two - different AWS accounts can have the same name, and two streams in - the same AWS account, but in two different regions, can have the - same name. - - :type shard_count: integer - :param shard_count: The number of shards that the stream will use. The - throughput of the stream is a function of the number of shards; - more shards are required for greater provisioned throughput. - **Note:** The default limit for an AWS account is 10 shards per stream. - If you need to create a stream with more than 10 shards, `contact - AWS Support`_ to increase the limit on your account. - - """ - params = { - 'StreamName': stream_name, - 'ShardCount': shard_count, - } - return self.make_request(action='CreateStream', - body=json.dumps(params)) - - def delete_stream(self, stream_name): - """ - Deletes a stream and all its shards and data. You must shut - down any applications that are operating on the stream before - you delete the stream. If an application attempts to operate - on a deleted stream, it will receive the exception - `ResourceNotFoundException`. - - If the stream is in the `ACTIVE` state, you can delete it. - After a `DeleteStream` request, the specified stream is in the - `DELETING` state until Amazon Kinesis completes the deletion. - - **Note:** Amazon Kinesis might continue to accept data read - and write operations, such as PutRecord, PutRecords, and - GetRecords, on a stream in the `DELETING` state until the - stream deletion is complete. - - When you delete a stream, any shards in that stream are also - deleted, and any tags are dissociated from the stream. - - You can use the DescribeStream operation to check the state of - the stream, which is returned in `StreamStatus`. - - `DeleteStream` has a limit of 5 transactions per second per - account. - - :type stream_name: string - :param stream_name: The name of the stream to delete. - - """ - params = {'StreamName': stream_name, } - return self.make_request(action='DeleteStream', - body=json.dumps(params)) - - def describe_stream(self, stream_name, limit=None, - exclusive_start_shard_id=None): - """ - Describes the specified stream. - - The information about the stream includes its current status, - its Amazon Resource Name (ARN), and an array of shard objects. - For each shard object, there is information about the hash key - and sequence number ranges that the shard spans, and the IDs - of any earlier shards that played in a role in creating the - shard. A sequence number is the identifier associated with - every record ingested in the Amazon Kinesis stream. The - sequence number is assigned when a record is put into the - stream. - - You can limit the number of returned shards using the `Limit` - parameter. The number of shards in a stream may be too large - to return from a single call to `DescribeStream`. You can - detect this by using the `HasMoreShards` flag in the returned - output. `HasMoreShards` is set to `True` when there is more - data available. - - `DescribeStream` is a paginated operation. If there are more - shards available, you can request them using the shard ID of - the last shard returned. Specify this ID in the - `ExclusiveStartShardId` parameter in a subsequent request to - `DescribeStream`. - - `DescribeStream` has a limit of 10 transactions per second per - account. - - :type stream_name: string - :param stream_name: The name of the stream to describe. - - :type limit: integer - :param limit: The maximum number of shards to return. - - :type exclusive_start_shard_id: string - :param exclusive_start_shard_id: The shard ID of the shard to start - with. - - """ - params = {'StreamName': stream_name, } - if limit is not None: - params['Limit'] = limit - if exclusive_start_shard_id is not None: - params['ExclusiveStartShardId'] = exclusive_start_shard_id - return self.make_request(action='DescribeStream', - body=json.dumps(params)) - - def get_records(self, shard_iterator, limit=None, b64_decode=True): - """ - Gets data records from a shard. - - Specify a shard iterator using the `ShardIterator` parameter. - The shard iterator specifies the position in the shard from - which you want to start reading data records sequentially. If - there are no records available in the portion of the shard - that the iterator points to, `GetRecords` returns an empty - list. Note that it might take multiple calls to get to a - portion of the shard that contains records. - - You can scale by provisioning multiple shards. Your - application should have one thread per shard, each reading - continuously from its stream. To read from a stream - continually, call `GetRecords` in a loop. Use GetShardIterator - to get the shard iterator to specify in the first `GetRecords` - call. `GetRecords` returns a new shard iterator in - `NextShardIterator`. Specify the shard iterator returned in - `NextShardIterator` in subsequent calls to `GetRecords`. Note - that if the shard has been closed, the shard iterator can't - return more data and `GetRecords` returns `null` in - `NextShardIterator`. You can terminate the loop when the shard - is closed, or when the shard iterator reaches the record with - the sequence number or other attribute that marks it as the - last record to process. - - Each data record can be up to 50 KB in size, and each shard - can read up to 2 MB per second. You can ensure that your calls - don't exceed the maximum supported size or throughput by using - the `Limit` parameter to specify the maximum number of records - that `GetRecords` can return. Consider your average record - size when determining this limit. For example, if your average - record size is 40 KB, you can limit the data returned to about - 1 MB per call by specifying 25 as the limit. - - The size of the data returned by `GetRecords` will vary - depending on the utilization of the shard. The maximum size of - data that `GetRecords` can return is 10 MB. If a call returns - 10 MB of data, subsequent calls made within the next 5 seconds - throw `ProvisionedThroughputExceededException`. If there is - insufficient provisioned throughput on the shard, subsequent - calls made within the next 1 second throw - `ProvisionedThroughputExceededException`. Note that - `GetRecords` won't return any data when it throws an - exception. For this reason, we recommend that you wait one - second between calls to `GetRecords`; however, it's possible - that the application will get exceptions for longer than 1 - second. - - To detect whether the application is falling behind in - processing, add a timestamp to your records and note how long - it takes to process them. You can also monitor how much data - is in a stream using the CloudWatch metrics for write - operations ( `PutRecord` and `PutRecords`). For more - information, see `Monitoring Amazon Kinesis with Amazon - CloudWatch`_ in the Amazon Kinesis Developer Guide . - - :type shard_iterator: string - :param shard_iterator: The position in the shard from which you want to - start sequentially reading data records. A shard iterator specifies - this position using the sequence number of a data record in the - shard. - - :type limit: integer - :param limit: The maximum number of records to return. Specify a value - of up to 10,000. If you specify a value that is greater than - 10,000, `GetRecords` throws `InvalidArgumentException`. - - :type b64_decode: boolean - :param b64_decode: Decode the Base64-encoded ``Data`` field of records. - - """ - params = {'ShardIterator': shard_iterator, } - if limit is not None: - params['Limit'] = limit - - response = self.make_request(action='GetRecords', - body=json.dumps(params)) - - # Base64 decode the data - if b64_decode: - for record in response.get('Records', []): - record['Data'] = base64.b64decode( - record['Data'].encode('utf-8')).decode('utf-8') - - return response - - def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, - starting_sequence_number=None): - """ - Gets a shard iterator. A shard iterator expires five minutes - after it is returned to the requester. - - A shard iterator specifies the position in the shard from - which to start reading data records sequentially. A shard - iterator specifies this position using the sequence number of - a data record in a shard. A sequence number is the identifier - associated with every record ingested in the Amazon Kinesis - stream. The sequence number is assigned when a record is put - into the stream. - - You must specify the shard iterator type. For example, you can - set the `ShardIteratorType` parameter to read exactly from the - position denoted by a specific sequence number by using the - `AT_SEQUENCE_NUMBER` shard iterator type, or right after the - sequence number by using the `AFTER_SEQUENCE_NUMBER` shard - iterator type, using sequence numbers returned by earlier - calls to PutRecord, PutRecords, GetRecords, or DescribeStream. - You can specify the shard iterator type `TRIM_HORIZON` in the - request to cause `ShardIterator` to point to the last - untrimmed record in the shard in the system, which is the - oldest data record in the shard. Or you can point to just - after the most recent record in the shard, by using the shard - iterator type `LATEST`, so that you always read the most - recent data in the shard. - - When you repeatedly read from an Amazon Kinesis stream use a - GetShardIterator request to get the first shard iterator to to - use in your first `GetRecords` request and then use the shard - iterator returned by the `GetRecords` request in - `NextShardIterator` for subsequent reads. A new shard iterator - is returned by every `GetRecords` request in - `NextShardIterator`, which you use in the `ShardIterator` - parameter of the next `GetRecords` request. - - If a `GetShardIterator` request is made too often, you receive - a `ProvisionedThroughputExceededException`. For more - information about throughput limits, see GetRecords. - - If the shard is closed, the iterator can't return more data, - and `GetShardIterator` returns `null` for its `ShardIterator`. - A shard can be closed using SplitShard or MergeShards. - - `GetShardIterator` has a limit of 5 transactions per second - per account per open shard. - - :type stream_name: string - :param stream_name: The name of the stream. - - :type shard_id: string - :param shard_id: The shard ID of the shard to get the iterator for. - - :type shard_iterator_type: string - :param shard_iterator_type: - Determines how the shard iterator is used to start reading data records - from the shard. - - The following are the valid shard iterator types: - - - + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted - by a specific sequence number. - + AFTER_SEQUENCE_NUMBER - Start reading right after the position - denoted by a specific sequence number. - + TRIM_HORIZON - Start reading at the last untrimmed record in the - shard in the system, which is the oldest data record in the shard. - + LATEST - Start reading just after the most recent record in the - shard, so that you always read the most recent data in the shard. - - :type starting_sequence_number: string - :param starting_sequence_number: The sequence number of the data record - in the shard from which to start reading from. - - :returns: A dictionary containing: - - 1) a `ShardIterator` with the value being the shard-iterator object - """ - - params = { - 'StreamName': stream_name, - 'ShardId': shard_id, - 'ShardIteratorType': shard_iterator_type, - } - if starting_sequence_number is not None: - params['StartingSequenceNumber'] = starting_sequence_number - return self.make_request(action='GetShardIterator', - body=json.dumps(params)) - - def list_streams(self, limit=None, exclusive_start_stream_name=None): - """ - Lists your streams. - - The number of streams may be too large to return from a single - call to `ListStreams`. You can limit the number of returned - streams using the `Limit` parameter. If you do not specify a - value for the `Limit` parameter, Amazon Kinesis uses the - default limit, which is currently 10. - - You can detect if there are more streams available to list by - using the `HasMoreStreams` flag from the returned output. If - there are more streams available, you can request more streams - by using the name of the last stream returned by the - `ListStreams` request in the `ExclusiveStartStreamName` - parameter in a subsequent request to `ListStreams`. The group - of stream names returned by the subsequent request is then - added to the list. You can continue this process until all the - stream names have been collected in the list. - - `ListStreams` has a limit of 5 transactions per second per - account. - - :type limit: integer - :param limit: The maximum number of streams to list. - - :type exclusive_start_stream_name: string - :param exclusive_start_stream_name: The name of the stream to start the - list with. - - """ - params = {} - if limit is not None: - params['Limit'] = limit - if exclusive_start_stream_name is not None: - params['ExclusiveStartStreamName'] = exclusive_start_stream_name - return self.make_request(action='ListStreams', - body=json.dumps(params)) - - def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None, - limit=None): - """ - Lists the tags for the specified Amazon Kinesis stream. - - :type stream_name: string - :param stream_name: The name of the stream. - - :type exclusive_start_tag_key: string - :param exclusive_start_tag_key: The key to use as the starting point - for the list of tags. If this parameter is set, `ListTagsForStream` - gets all tags that occur after `ExclusiveStartTagKey`. - - :type limit: integer - :param limit: The number of tags to return. If this number is less than - the total number of tags associated with the stream, `HasMoreTags` - is set to `True`. To list additional tags, set - `ExclusiveStartTagKey` to the last key in the response. - - """ - params = {'StreamName': stream_name, } - if exclusive_start_tag_key is not None: - params['ExclusiveStartTagKey'] = exclusive_start_tag_key - if limit is not None: - params['Limit'] = limit - return self.make_request(action='ListTagsForStream', - body=json.dumps(params)) - - def merge_shards(self, stream_name, shard_to_merge, - adjacent_shard_to_merge): - """ - Merges two adjacent shards in a stream and combines them into - a single shard to reduce the stream's capacity to ingest and - transport data. Two shards are considered adjacent if the - union of the hash key ranges for the two shards form a - contiguous set with no gaps. For example, if you have two - shards, one with a hash key range of 276...381 and the other - with a hash key range of 382...454, then you could merge these - two shards into a single shard that would have a hash key - range of 276...454. After the merge, the single child shard - receives data for all hash key values covered by the two - parent shards. - - `MergeShards` is called when there is a need to reduce the - overall capacity of a stream because of excess capacity that - is not being used. You must specify the shard to be merged and - the adjacent shard for a stream. For more information about - merging shards, see `Merge Two Shards`_ in the Amazon Kinesis - Developer Guide . - - If the stream is in the `ACTIVE` state, you can call - `MergeShards`. If a stream is in the `CREATING`, `UPDATING`, - or `DELETING` state, `MergeShards` returns a - `ResourceInUseException`. If the specified stream does not - exist, `MergeShards` returns a `ResourceNotFoundException`. - - You can use DescribeStream to check the state of the stream, - which is returned in `StreamStatus`. - - `MergeShards` is an asynchronous operation. Upon receiving a - `MergeShards` request, Amazon Kinesis immediately returns a - response and sets the `StreamStatus` to `UPDATING`. After the - operation is completed, Amazon Kinesis sets the `StreamStatus` - to `ACTIVE`. Read and write operations continue to work while - the stream is in the `UPDATING` state. - - You use DescribeStream to determine the shard IDs that are - specified in the `MergeShards` request. - - If you try to operate on too many streams in parallel using - CreateStream, DeleteStream, `MergeShards` or SplitShard, you - will receive a `LimitExceededException`. - - `MergeShards` has limit of 5 transactions per second per - account. - - :type stream_name: string - :param stream_name: The name of the stream for the merge. - - :type shard_to_merge: string - :param shard_to_merge: The shard ID of the shard to combine with the - adjacent shard for the merge. - - :type adjacent_shard_to_merge: string - :param adjacent_shard_to_merge: The shard ID of the adjacent shard for - the merge. - - """ - params = { - 'StreamName': stream_name, - 'ShardToMerge': shard_to_merge, - 'AdjacentShardToMerge': adjacent_shard_to_merge, - } - return self.make_request(action='MergeShards', - body=json.dumps(params)) - - def put_record(self, stream_name, data, partition_key, - explicit_hash_key=None, - sequence_number_for_ordering=None, - exclusive_minimum_sequence_number=None, - b64_encode=True): - """ - This operation puts a data record into an Amazon Kinesis - stream from a producer. This operation must be called to send - data from the producer into the Amazon Kinesis stream for - real-time ingestion and subsequent processing. The `PutRecord` - operation requires the name of the stream that captures, - stores, and transports the data; a partition key; and the data - blob itself. The data blob could be a segment from a log file, - geographic/location data, website clickstream data, or any - other data type. - - The partition key is used to distribute data across shards. - Amazon Kinesis segregates the data records that belong to a - data stream into multiple shards, using the partition key - associated with each data record to determine which shard a - given data record belongs to. - - Partition keys are Unicode strings, with a maximum length - limit of 256 bytes. An MD5 hash function is used to map - partition keys to 128-bit integer values and to map associated - data records to shards using the hash key ranges of the - shards. You can override hashing the partition key to - determine the shard by explicitly specifying a hash value - using the `ExplicitHashKey` parameter. For more information, - see the `Amazon Kinesis Developer Guide`_. - - `PutRecord` returns the shard ID of where the data record was - placed and the sequence number that was assigned to the data - record. - - Sequence numbers generally increase over time. To guarantee - strictly increasing ordering, use the - `SequenceNumberForOrdering` parameter. For more information, - see the `Amazon Kinesis Developer Guide`_. - - If a `PutRecord` request cannot be processed because of - insufficient provisioned throughput on the shard involved in - the request, `PutRecord` throws - `ProvisionedThroughputExceededException`. - - Data records are accessible for only 24 hours from the time - that they are added to an Amazon Kinesis stream. - - :type stream_name: string - :param stream_name: The name of the stream to put the data record into. - - :type data: blob - :param data: The data blob to put into the record, which is - Base64-encoded when the blob is serialized. - The maximum size of the data blob (the payload after - Base64-decoding) is 50 kilobytes (KB) - Set `b64_encode` to disable automatic Base64 encoding. - - :type partition_key: string - :param partition_key: Determines which shard in the stream the data - record is assigned to. Partition keys are Unicode strings with a - maximum length limit of 256 bytes. Amazon Kinesis uses the - partition key as input to a hash function that maps the partition - key and associated data to a specific shard. Specifically, an MD5 - hash function is used to map partition keys to 128-bit integer - values and to map associated data records to shards. As a result of - this hashing mechanism, all data records with the same partition - key will map to the same shard within the stream. - - :type explicit_hash_key: string - :param explicit_hash_key: The hash value used to explicitly determine - the shard the data record is assigned to by overriding the - partition key hash. - - :type sequence_number_for_ordering: string - :param sequence_number_for_ordering: Guarantees strictly increasing - sequence numbers, for puts from the same client and to the same - partition key. Usage: set the `SequenceNumberForOrdering` of record - n to the sequence number of record n-1 (as returned in the - PutRecordResult when putting record n-1 ). If this parameter is not - set, records will be coarsely ordered based on arrival time. - - :type b64_encode: boolean - :param b64_encode: Whether to Base64 encode `data`. Can be set to - ``False`` if `data` is already encoded to prevent double encoding. - - """ - params = { - 'StreamName': stream_name, - 'Data': data, - 'PartitionKey': partition_key, - } - if explicit_hash_key is not None: - params['ExplicitHashKey'] = explicit_hash_key - if sequence_number_for_ordering is not None: - params['SequenceNumberForOrdering'] = sequence_number_for_ordering - if b64_encode: - if not isinstance(params['Data'], six.binary_type): - params['Data'] = params['Data'].encode('utf-8') - params['Data'] = base64.b64encode(params['Data']).decode('utf-8') - return self.make_request(action='PutRecord', - body=json.dumps(params)) - - def put_records(self, records, stream_name, b64_encode=True): - """ - Puts (writes) multiple data records from a producer into an - Amazon Kinesis stream in a single call (also referred to as a - `PutRecords` request). Use this operation to send data from a - data producer into the Amazon Kinesis stream for real-time - ingestion and processing. Each shard can support up to 1000 - records written per second, up to a maximum total of 1 MB data - written per second. - - You must specify the name of the stream that captures, stores, - and transports the data; and an array of request `Records`, - with each record in the array requiring a partition key and - data blob. - - The data blob can be any type of data; for example, a segment - from a log file, geographic/location data, website clickstream - data, and so on. - - The partition key is used by Amazon Kinesis as input to a hash - function that maps the partition key and associated data to a - specific shard. An MD5 hash function is used to map partition - keys to 128-bit integer values and to map associated data - records to shards. As a result of this hashing mechanism, all - data records with the same partition key map to the same shard - within the stream. For more information, see `Partition Key`_ - in the Amazon Kinesis Developer Guide . - - Each record in the `Records` array may include an optional - parameter, `ExplicitHashKey`, which overrides the partition - key to shard mapping. This parameter allows a data producer to - determine explicitly the shard where the record is stored. For - more information, see `Adding Multiple Records with - PutRecords`_ in the Amazon Kinesis Developer Guide . - - The `PutRecords` response includes an array of response - `Records`. Each record in the response array directly - correlates with a record in the request array using natural - ordering, from the top to the bottom of the request and - response. The response `Records` array always includes the - same number of records as the request array. - - The response `Records` array includes both successfully and - unsuccessfully processed records. Amazon Kinesis attempts to - process all records in each `PutRecords` request. A single - record failure does not stop the processing of subsequent - records. - - A successfully-processed record includes `ShardId` and - `SequenceNumber` values. The `ShardId` parameter identifies - the shard in the stream where the record is stored. The - `SequenceNumber` parameter is an identifier assigned to the - put record, unique to all records in the stream. - - An unsuccessfully-processed record includes `ErrorCode` and - `ErrorMessage` values. `ErrorCode` reflects the type of error - and can be one of the following values: - `ProvisionedThroughputExceededException` or `InternalFailure`. - `ErrorMessage` provides more detailed information about the - `ProvisionedThroughputExceededException` exception including - the account ID, stream name, and shard ID of the record that - was throttled. - - Data records are accessible for only 24 hours from the time - that they are added to an Amazon Kinesis stream. - - :type records: list - :param records: The records associated with the request. - - :type stream_name: string - :param stream_name: The stream name associated with the request. - - :type b64_encode: boolean - :param b64_encode: Whether to Base64 encode `data`. Can be set to - ``False`` if `data` is already encoded to prevent double encoding. - - """ - params = {'Records': records, 'StreamName': stream_name, } - if b64_encode: - for i in range(len(params['Records'])): - data = params['Records'][i]['Data'] - if not isinstance(data, six.binary_type): - data = data.encode('utf-8') - params['Records'][i]['Data'] = base64.b64encode( - data).decode('utf-8') - return self.make_request(action='PutRecords', - body=json.dumps(params)) - - def remove_tags_from_stream(self, stream_name, tag_keys): - """ - Deletes tags from the specified Amazon Kinesis stream. - - If you specify a tag that does not exist, it is ignored. - - :type stream_name: string - :param stream_name: The name of the stream. - - :type tag_keys: list - :param tag_keys: A list of tag keys. Each corresponding tag is removed - from the stream. - - """ - params = {'StreamName': stream_name, 'TagKeys': tag_keys, } - return self.make_request(action='RemoveTagsFromStream', - body=json.dumps(params)) - - def split_shard(self, stream_name, shard_to_split, new_starting_hash_key): - """ - Splits a shard into two new shards in the stream, to increase - the stream's capacity to ingest and transport data. - `SplitShard` is called when there is a need to increase the - overall capacity of stream because of an expected increase in - the volume of data records being ingested. - - You can also use `SplitShard` when a shard appears to be - approaching its maximum utilization, for example, when the set - of producers sending data into the specific shard are suddenly - sending more than previously anticipated. You can also call - `SplitShard` to increase stream capacity, so that more Amazon - Kinesis applications can simultaneously read data from the - stream for real-time processing. - - You must specify the shard to be split and the new hash key, - which is the position in the shard where the shard gets split - in two. In many cases, the new hash key might simply be the - average of the beginning and ending hash key, but it can be - any hash key value in the range being mapped into the shard. - For more information about splitting shards, see `Split a - Shard`_ in the Amazon Kinesis Developer Guide . - - You can use DescribeStream to determine the shard ID and hash - key values for the `ShardToSplit` and `NewStartingHashKey` - parameters that are specified in the `SplitShard` request. - - `SplitShard` is an asynchronous operation. Upon receiving a - `SplitShard` request, Amazon Kinesis immediately returns a - response and sets the stream status to `UPDATING`. After the - operation is completed, Amazon Kinesis sets the stream status - to `ACTIVE`. Read and write operations continue to work while - the stream is in the `UPDATING` state. - - You can use `DescribeStream` to check the status of the - stream, which is returned in `StreamStatus`. If the stream is - in the `ACTIVE` state, you can call `SplitShard`. If a stream - is in `CREATING` or `UPDATING` or `DELETING` states, - `DescribeStream` returns a `ResourceInUseException`. - - If the specified stream does not exist, `DescribeStream` - returns a `ResourceNotFoundException`. If you try to create - more shards than are authorized for your account, you receive - a `LimitExceededException`. - - The default limit for an AWS account is 10 shards per stream. - If you need to create a stream with more than 10 shards, - `contact AWS Support`_ to increase the limit on your account. - - If you try to operate on too many streams in parallel using - CreateStream, DeleteStream, MergeShards or SplitShard, you - receive a `LimitExceededException`. - - `SplitShard` has limit of 5 transactions per second per - account. - - :type stream_name: string - :param stream_name: The name of the stream for the shard split. - - :type shard_to_split: string - :param shard_to_split: The shard ID of the shard to split. - - :type new_starting_hash_key: string - :param new_starting_hash_key: A hash key value for the starting hash - key of one of the child shards created by the split. The hash key - range for a given shard constitutes a set of ordered contiguous - positive integers. The value for `NewStartingHashKey` must be in - the range of hash keys being mapped into the shard. The - `NewStartingHashKey` hash key value and all higher hash key values - in hash key range are distributed to one of the child shards. All - the lower hash key values in the range are distributed to the other - child shard. - - """ - params = { - 'StreamName': stream_name, - 'ShardToSplit': shard_to_split, - 'NewStartingHashKey': new_starting_hash_key, - } - return self.make_request(action='SplitShard', - body=json.dumps(params)) - - def make_request(self, action, body): - headers = { - 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action), - 'Host': self.region.endpoint, - 'Content-Type': 'application/x-amz-json-1.1', - 'Content-Length': str(len(body)), - } - http_request = self.build_base_http_request( - method='POST', path='/', auth_path='/', params={}, - headers=headers, data=body) - response = self._mexe(http_request, sender=None, - override_num_retries=10) - response_body = response.read().decode('utf-8') - boto.log.debug(response.getheaders()) - boto.log.debug(response_body) - if response.status == 200: - if response_body: - return json.loads(response_body) - else: - json_body = json.loads(response_body) - fault_name = json_body.get('__type', None) - exception_class = self._faults.get(fault_name, self.ResponseError) - raise exception_class(response.status, response.reason, - body=json_body) -