diff env/lib/python3.7/site-packages/boto/kinesis/layer1.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/boto/kinesis/layer1.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,879 @@
+# Copyright (c) 2014 Amazon.com, Inc. or its affiliates.  All Rights Reserved
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+import base64
+import boto
+from boto.connection import AWSQueryConnection
+from boto.regioninfo import RegionInfo
+from boto.exception import JSONResponseError
+from boto.kinesis import exceptions
+from boto.compat import json
+from boto.compat import six
+class KinesisConnection(AWSQueryConnection):
+    """
+    Amazon Kinesis Service API Reference
+    Amazon Kinesis is a managed service that scales elastically for
+    real time processing of streaming big data.
+    """
+    APIVersion = "2013-12-02"
+    DefaultRegionName = "us-east-1"
+    DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com"
+    ServiceName = "Kinesis"
+    TargetPrefix = "Kinesis_20131202"
+    ResponseError = JSONResponseError
+    _faults = {
+        "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException,
+        "LimitExceededException": exceptions.LimitExceededException,
+        "ExpiredIteratorException": exceptions.ExpiredIteratorException,
+        "ResourceInUseException": exceptions.ResourceInUseException,
+        "ResourceNotFoundException": exceptions.ResourceNotFoundException,
+        "InvalidArgumentException": exceptions.InvalidArgumentException,
+        "SubscriptionRequiredException": exceptions.SubscriptionRequiredException
+    }
+    def __init__(self, **kwargs):
+        region = kwargs.pop('region', None)
+        if not region:
+            region = RegionInfo(self, self.DefaultRegionName,
+                                self.DefaultRegionEndpoint)
+        if 'host' not in kwargs:
+            kwargs['host'] = region.endpoint
+        super(KinesisConnection, self).__init__(**kwargs)
+        self.region = region
+    def _required_auth_capability(self):
+        return ['hmac-v4']
+    def add_tags_to_stream(self, stream_name, tags):
+        """
+        Adds or updates tags for the specified Amazon Kinesis stream.
+        Each stream can have up to 10 tags.
+        If tags have already been assigned to the stream,
+        `AddTagsToStream` overwrites any existing tags that correspond
+        to the specified tag keys.
+        :type stream_name: string
+        :param stream_name: The name of the stream.
+        :type tags: map
+        :param tags: The set of key-value pairs to use to create the tags.
+        """
+        params = {'StreamName': stream_name, 'Tags': tags, }
+        return self.make_request(action='AddTagsToStream',
+                                 body=json.dumps(params))
+    def create_stream(self, stream_name, shard_count):
+        """
+        Creates a Amazon Kinesis stream. A stream captures and
+        transports data records that are continuously emitted from
+        different data sources or producers . Scale-out within an
+        Amazon Kinesis stream is explicitly supported by means of
+        shards, which are uniquely identified groups of data records
+        in an Amazon Kinesis stream.
+        You specify and control the number of shards that a stream is
+        composed of. Each open shard can support up to 5 read
+        transactions per second, up to a maximum total of 2 MB of data
+        read per second. Each shard can support up to 1000 records
+        written per second, up to a maximum total of 1 MB data written
+        per second. You can add shards to a stream if the amount of
+        data input increases and you can remove shards if the amount
+        of data input decreases.
+        The stream name identifies the stream. The name is scoped to
+        the AWS account used by the application. It is also scoped by
+        region. That is, two streams in two different accounts can
+        have the same name, and two streams in the same account, but
+        in two different regions, can have the same name.
+        `CreateStream` is an asynchronous operation. Upon receiving a
+        `CreateStream` request, Amazon Kinesis immediately returns and
+        sets the stream status to `CREATING`. After the stream is
+        created, Amazon Kinesis sets the stream status to `ACTIVE`.
+        You should perform read and write operations only on an
+        `ACTIVE` stream.
+        You receive a `LimitExceededException` when making a
+        `CreateStream` request if you try to do one of the following:
+        + Have more than five streams in the `CREATING` state at any
+          point in time.
+        + Create more shards than are authorized for your account.
+        The default limit for an AWS account is 10 shards per stream.
+        If you need to create a stream with more than 10 shards,
+        `contact AWS Support`_ to increase the limit on your account.
+        You can use `DescribeStream` to check the stream status, which
+        is returned in `StreamStatus`.
+        `CreateStream` has a limit of 5 transactions per second per
+        account.
+        :type stream_name: string
+        :param stream_name: A name to identify the stream. The stream name is
+            scoped to the AWS account used by the application that creates the
+            stream. It is also scoped by region. That is, two streams in two
+            different AWS accounts can have the same name, and two streams in
+            the same AWS account, but in two different regions, can have the
+            same name.
+        :type shard_count: integer
+        :param shard_count: The number of shards that the stream will use. The
+            throughput of the stream is a function of the number of shards;
+            more shards are required for greater provisioned throughput.
+        **Note:** The default limit for an AWS account is 10 shards per stream.
+            If you need to create a stream with more than 10 shards, `contact
+            AWS Support`_ to increase the limit on your account.
+        """
+        params = {
+            'StreamName': stream_name,
+            'ShardCount': shard_count,
+        }
+        return self.make_request(action='CreateStream',
+                                 body=json.dumps(params))
+    def delete_stream(self, stream_name):
+        """
+        Deletes a stream and all its shards and data. You must shut
+        down any applications that are operating on the stream before
+        you delete the stream. If an application attempts to operate
+        on a deleted stream, it will receive the exception
+        `ResourceNotFoundException`.
+        If the stream is in the `ACTIVE` state, you can delete it.
+        After a `DeleteStream` request, the specified stream is in the
+        `DELETING` state until Amazon Kinesis completes the deletion.
+        **Note:** Amazon Kinesis might continue to accept data read
+        and write operations, such as PutRecord, PutRecords, and
+        GetRecords, on a stream in the `DELETING` state until the
+        stream deletion is complete.
+        When you delete a stream, any shards in that stream are also
+        deleted, and any tags are dissociated from the stream.
+        You can use the DescribeStream operation to check the state of
+        the stream, which is returned in `StreamStatus`.
+        `DeleteStream` has a limit of 5 transactions per second per
+        account.
+        :type stream_name: string
+        :param stream_name: The name of the stream to delete.
+        """
+        params = {'StreamName': stream_name, }
+        return self.make_request(action='DeleteStream',
+                                 body=json.dumps(params))
+    def describe_stream(self, stream_name, limit=None,
+                        exclusive_start_shard_id=None):
+        """
+        Describes the specified stream.
+        The information about the stream includes its current status,
+        its Amazon Resource Name (ARN), and an array of shard objects.
+        For each shard object, there is information about the hash key
+        and sequence number ranges that the shard spans, and the IDs
+        of any earlier shards that played in a role in creating the
+        shard. A sequence number is the identifier associated with
+        every record ingested in the Amazon Kinesis stream. The
+        sequence number is assigned when a record is put into the
+        stream.
+        You can limit the number of returned shards using the `Limit`
+        parameter. The number of shards in a stream may be too large
+        to return from a single call to `DescribeStream`. You can
+        detect this by using the `HasMoreShards` flag in the returned
+        output. `HasMoreShards` is set to `True` when there is more
+        data available.
+        `DescribeStream` is a paginated operation. If there are more
+        shards available, you can request them using the shard ID of
+        the last shard returned. Specify this ID in the
+        `ExclusiveStartShardId` parameter in a subsequent request to
+        `DescribeStream`.
+        `DescribeStream` has a limit of 10 transactions per second per
+        account.
+        :type stream_name: string
+        :param stream_name: The name of the stream to describe.
+        :type limit: integer
+        :param limit: The maximum number of shards to return.
+        :type exclusive_start_shard_id: string
+        :param exclusive_start_shard_id: The shard ID of the shard to start
+            with.
+        """
+        params = {'StreamName': stream_name, }
+        if limit is not None:
+            params['Limit'] = limit
+        if exclusive_start_shard_id is not None:
+            params['ExclusiveStartShardId'] = exclusive_start_shard_id
+        return self.make_request(action='DescribeStream',
+                                 body=json.dumps(params))
+    def get_records(self, shard_iterator, limit=None, b64_decode=True):
+        """
+        Gets data records from a shard.
+        Specify a shard iterator using the `ShardIterator` parameter.
+        The shard iterator specifies the position in the shard from
+        which you want to start reading data records sequentially. If
+        there are no records available in the portion of the shard
+        that the iterator points to, `GetRecords` returns an empty
+        list. Note that it might take multiple calls to get to a
+        portion of the shard that contains records.
+        You can scale by provisioning multiple shards. Your
+        application should have one thread per shard, each reading
+        continuously from its stream. To read from a stream
+        continually, call `GetRecords` in a loop. Use GetShardIterator
+        to get the shard iterator to specify in the first `GetRecords`
+        call. `GetRecords` returns a new shard iterator in
+        `NextShardIterator`. Specify the shard iterator returned in
+        `NextShardIterator` in subsequent calls to `GetRecords`. Note
+        that if the shard has been closed, the shard iterator can't
+        return more data and `GetRecords` returns `null` in
+        `NextShardIterator`. You can terminate the loop when the shard
+        is closed, or when the shard iterator reaches the record with
+        the sequence number or other attribute that marks it as the
+        last record to process.
+        Each data record can be up to 50 KB in size, and each shard
+        can read up to 2 MB per second. You can ensure that your calls
+        don't exceed the maximum supported size or throughput by using
+        the `Limit` parameter to specify the maximum number of records
+        that `GetRecords` can return. Consider your average record
+        size when determining this limit. For example, if your average
+        record size is 40 KB, you can limit the data returned to about
+        1 MB per call by specifying 25 as the limit.
+        The size of the data returned by `GetRecords` will vary
+        depending on the utilization of the shard. The maximum size of
+        data that `GetRecords` can return is 10 MB. If a call returns
+        10 MB of data, subsequent calls made within the next 5 seconds
+        throw `ProvisionedThroughputExceededException`. If there is
+        insufficient provisioned throughput on the shard, subsequent
+        calls made within the next 1 second throw
+        `ProvisionedThroughputExceededException`. Note that
+        `GetRecords` won't return any data when it throws an
+        exception. For this reason, we recommend that you wait one
+        second between calls to `GetRecords`; however, it's possible
+        that the application will get exceptions for longer than 1
+        second.
+        To detect whether the application is falling behind in
+        processing, add a timestamp to your records and note how long
+        it takes to process them. You can also monitor how much data
+        is in a stream using the CloudWatch metrics for write
+        operations ( `PutRecord` and `PutRecords`). For more
+        information, see `Monitoring Amazon Kinesis with Amazon
+        CloudWatch`_ in the Amazon Kinesis Developer Guide .
+        :type shard_iterator: string
+        :param shard_iterator: The position in the shard from which you want to
+            start sequentially reading data records. A shard iterator specifies
+            this position using the sequence number of a data record in the
+            shard.
+        :type limit: integer
+        :param limit: The maximum number of records to return. Specify a value
+            of up to 10,000. If you specify a value that is greater than
+            10,000, `GetRecords` throws `InvalidArgumentException`.
+        :type b64_decode: boolean
+        :param b64_decode: Decode the Base64-encoded ``Data`` field of records.
+        """
+        params = {'ShardIterator': shard_iterator, }
+        if limit is not None:
+            params['Limit'] = limit
+        response = self.make_request(action='GetRecords',
+                                     body=json.dumps(params))
+        # Base64 decode the data
+        if b64_decode:
+            for record in response.get('Records', []):
+                record['Data'] = base64.b64decode(
+                    record['Data'].encode('utf-8')).decode('utf-8')
+        return response
+    def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type,
+                           starting_sequence_number=None):
+        """
+        Gets a shard iterator. A shard iterator expires five minutes
+        after it is returned to the requester.
+        A shard iterator specifies the position in the shard from
+        which to start reading data records sequentially. A shard
+        iterator specifies this position using the sequence number of
+        a data record in a shard. A sequence number is the identifier
+        associated with every record ingested in the Amazon Kinesis
+        stream. The sequence number is assigned when a record is put
+        into the stream.
+        You must specify the shard iterator type. For example, you can
+        set the `ShardIteratorType` parameter to read exactly from the
+        position denoted by a specific sequence number by using the
+        `AT_SEQUENCE_NUMBER` shard iterator type, or right after the
+        sequence number by using the `AFTER_SEQUENCE_NUMBER` shard
+        iterator type, using sequence numbers returned by earlier
+        calls to PutRecord, PutRecords, GetRecords, or DescribeStream.
+        You can specify the shard iterator type `TRIM_HORIZON` in the
+        request to cause `ShardIterator` to point to the last
+        untrimmed record in the shard in the system, which is the
+        oldest data record in the shard. Or you can point to just
+        after the most recent record in the shard, by using the shard
+        iterator type `LATEST`, so that you always read the most
+        recent data in the shard.
+        When you repeatedly read from an Amazon Kinesis stream use a
+        GetShardIterator request to get the first shard iterator to to
+        use in your first `GetRecords` request and then use the shard
+        iterator returned by the `GetRecords` request in
+        `NextShardIterator` for subsequent reads. A new shard iterator
+        is returned by every `GetRecords` request in
+        `NextShardIterator`, which you use in the `ShardIterator`
+        parameter of the next `GetRecords` request.
+        If a `GetShardIterator` request is made too often, you receive
+        a `ProvisionedThroughputExceededException`. For more
+        information about throughput limits, see GetRecords.
+        If the shard is closed, the iterator can't return more data,
+        and `GetShardIterator` returns `null` for its `ShardIterator`.
+        A shard can be closed using SplitShard or MergeShards.
+        `GetShardIterator` has a limit of 5 transactions per second
+        per account per open shard.
+        :type stream_name: string
+        :param stream_name: The name of the stream.
+        :type shard_id: string
+        :param shard_id: The shard ID of the shard to get the iterator for.
+        :type shard_iterator_type: string
+        :param shard_iterator_type:
+        Determines how the shard iterator is used to start reading data records
+            from the shard.
+        The following are the valid shard iterator types:
+        + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted
+              by a specific sequence number.
+        + AFTER_SEQUENCE_NUMBER - Start reading right after the position
+              denoted by a specific sequence number.
+        + TRIM_HORIZON - Start reading at the last untrimmed record in the
+              shard in the system, which is the oldest data record in the shard.
+        + LATEST - Start reading just after the most recent record in the
+              shard, so that you always read the most recent data in the shard.
+        :type starting_sequence_number: string
+        :param starting_sequence_number: The sequence number of the data record
+            in the shard from which to start reading from.
+        :returns: A dictionary containing:
+            1) a `ShardIterator` with the value being the shard-iterator object
+        """
+        params = {
+            'StreamName': stream_name,
+            'ShardId': shard_id,
+            'ShardIteratorType': shard_iterator_type,
+        }
+        if starting_sequence_number is not None:
+            params['StartingSequenceNumber'] = starting_sequence_number
+        return self.make_request(action='GetShardIterator',
+                                 body=json.dumps(params))
+    def list_streams(self, limit=None, exclusive_start_stream_name=None):
+        """
+        Lists your streams.
+        The number of streams may be too large to return from a single
+        call to `ListStreams`. You can limit the number of returned
+        streams using the `Limit` parameter. If you do not specify a
+        value for the `Limit` parameter, Amazon Kinesis uses the
+        default limit, which is currently 10.
+        You can detect if there are more streams available to list by
+        using the `HasMoreStreams` flag from the returned output. If
+        there are more streams available, you can request more streams
+        by using the name of the last stream returned by the
+        `ListStreams` request in the `ExclusiveStartStreamName`
+        parameter in a subsequent request to `ListStreams`. The group
+        of stream names returned by the subsequent request is then
+        added to the list. You can continue this process until all the
+        stream names have been collected in the list.
+        `ListStreams` has a limit of 5 transactions per second per
+        account.
+        :type limit: integer
+        :param limit: The maximum number of streams to list.
+        :type exclusive_start_stream_name: string
+        :param exclusive_start_stream_name: The name of the stream to start the
+            list with.
+        """
+        params = {}
+        if limit is not None:
+            params['Limit'] = limit
+        if exclusive_start_stream_name is not None:
+            params['ExclusiveStartStreamName'] = exclusive_start_stream_name
+        return self.make_request(action='ListStreams',
+                                 body=json.dumps(params))
+    def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None,
+                             limit=None):
+        """
+        Lists the tags for the specified Amazon Kinesis stream.
+        :type stream_name: string
+        :param stream_name: The name of the stream.
+        :type exclusive_start_tag_key: string
+        :param exclusive_start_tag_key: The key to use as the starting point
+            for the list of tags. If this parameter is set, `ListTagsForStream`
+            gets all tags that occur after `ExclusiveStartTagKey`.
+        :type limit: integer
+        :param limit: The number of tags to return. If this number is less than
+            the total number of tags associated with the stream, `HasMoreTags`
+            is set to `True`. To list additional tags, set
+            `ExclusiveStartTagKey` to the last key in the response.
+        """
+        params = {'StreamName': stream_name, }
+        if exclusive_start_tag_key is not None:
+            params['ExclusiveStartTagKey'] = exclusive_start_tag_key
+        if limit is not None:
+            params['Limit'] = limit
+        return self.make_request(action='ListTagsForStream',
+                                 body=json.dumps(params))
+    def merge_shards(self, stream_name, shard_to_merge,
+                     adjacent_shard_to_merge):
+        """
+        Merges two adjacent shards in a stream and combines them into
+        a single shard to reduce the stream's capacity to ingest and
+        transport data. Two shards are considered adjacent if the
+        union of the hash key ranges for the two shards form a
+        contiguous set with no gaps. For example, if you have two
+        shards, one with a hash key range of 276...381 and the other
+        with a hash key range of 382...454, then you could merge these
+        two shards into a single shard that would have a hash key
+        range of 276...454. After the merge, the single child shard
+        receives data for all hash key values covered by the two
+        parent shards.
+        `MergeShards` is called when there is a need to reduce the
+        overall capacity of a stream because of excess capacity that
+        is not being used. You must specify the shard to be merged and
+        the adjacent shard for a stream. For more information about
+        merging shards, see `Merge Two Shards`_ in the Amazon Kinesis
+        Developer Guide .
+        If the stream is in the `ACTIVE` state, you can call
+        `MergeShards`. If a stream is in the `CREATING`, `UPDATING`,
+        or `DELETING` state, `MergeShards` returns a
+        `ResourceInUseException`. If the specified stream does not
+        exist, `MergeShards` returns a `ResourceNotFoundException`.
+        You can use DescribeStream to check the state of the stream,
+        which is returned in `StreamStatus`.
+        `MergeShards` is an asynchronous operation. Upon receiving a
+        `MergeShards` request, Amazon Kinesis immediately returns a
+        response and sets the `StreamStatus` to `UPDATING`. After the
+        operation is completed, Amazon Kinesis sets the `StreamStatus`
+        to `ACTIVE`. Read and write operations continue to work while
+        the stream is in the `UPDATING` state.
+        You use DescribeStream to determine the shard IDs that are
+        specified in the `MergeShards` request.
+        If you try to operate on too many streams in parallel using
+        CreateStream, DeleteStream, `MergeShards` or SplitShard, you
+        will receive a `LimitExceededException`.
+        `MergeShards` has limit of 5 transactions per second per
+        account.
+        :type stream_name: string
+        :param stream_name: The name of the stream for the merge.
+        :type shard_to_merge: string
+        :param shard_to_merge: The shard ID of the shard to combine with the
+            adjacent shard for the merge.
+        :type adjacent_shard_to_merge: string
+        :param adjacent_shard_to_merge: The shard ID of the adjacent shard for
+            the merge.
+        """
+        params = {
+            'StreamName': stream_name,
+            'ShardToMerge': shard_to_merge,
+            'AdjacentShardToMerge': adjacent_shard_to_merge,
+        }
+        return self.make_request(action='MergeShards',
+                                 body=json.dumps(params))
+    def put_record(self, stream_name, data, partition_key,
+                   explicit_hash_key=None,
+                   sequence_number_for_ordering=None,
+                   exclusive_minimum_sequence_number=None,
+                   b64_encode=True):
+        """
+        This operation puts a data record into an Amazon Kinesis
+        stream from a producer. This operation must be called to send
+        data from the producer into the Amazon Kinesis stream for
+        real-time ingestion and subsequent processing. The `PutRecord`
+        operation requires the name of the stream that captures,
+        stores, and transports the data; a partition key; and the data
+        blob itself. The data blob could be a segment from a log file,
+        geographic/location data, website clickstream data, or any
+        other data type.
+        The partition key is used to distribute data across shards.
+        Amazon Kinesis segregates the data records that belong to a
+        data stream into multiple shards, using the partition key
+        associated with each data record to determine which shard a
+        given data record belongs to.
+        Partition keys are Unicode strings, with a maximum length
+        limit of 256 bytes. An MD5 hash function is used to map
+        partition keys to 128-bit integer values and to map associated
+        data records to shards using the hash key ranges of the
+        shards. You can override hashing the partition key to
+        determine the shard by explicitly specifying a hash value
+        using the `ExplicitHashKey` parameter. For more information,
+        see the `Amazon Kinesis Developer Guide`_.
+        `PutRecord` returns the shard ID of where the data record was
+        placed and the sequence number that was assigned to the data
+        record.
+        Sequence numbers generally increase over time. To guarantee
+        strictly increasing ordering, use the
+        `SequenceNumberForOrdering` parameter. For more information,
+        see the `Amazon Kinesis Developer Guide`_.
+        If a `PutRecord` request cannot be processed because of
+        insufficient provisioned throughput on the shard involved in
+        the request, `PutRecord` throws
+        `ProvisionedThroughputExceededException`.
+        Data records are accessible for only 24 hours from the time
+        that they are added to an Amazon Kinesis stream.
+        :type stream_name: string
+        :param stream_name: The name of the stream to put the data record into.
+        :type data: blob
+        :param data: The data blob to put into the record, which is
+            Base64-encoded when the blob is serialized.
+            The maximum size of the data blob (the payload after
+            Base64-decoding) is 50 kilobytes (KB)
+            Set `b64_encode` to disable automatic Base64 encoding.
+        :type partition_key: string
+        :param partition_key: Determines which shard in the stream the data
+            record is assigned to. Partition keys are Unicode strings with a
+            maximum length limit of 256 bytes. Amazon Kinesis uses the
+            partition key as input to a hash function that maps the partition
+            key and associated data to a specific shard. Specifically, an MD5
+            hash function is used to map partition keys to 128-bit integer
+            values and to map associated data records to shards. As a result of
+            this hashing mechanism, all data records with the same partition
+            key will map to the same shard within the stream.
+        :type explicit_hash_key: string
+        :param explicit_hash_key: The hash value used to explicitly determine
+            the shard the data record is assigned to by overriding the
+            partition key hash.
+        :type sequence_number_for_ordering: string
+        :param sequence_number_for_ordering: Guarantees strictly increasing
+            sequence numbers, for puts from the same client and to the same
+            partition key. Usage: set the `SequenceNumberForOrdering` of record
+            n to the sequence number of record n-1 (as returned in the
+            PutRecordResult when putting record n-1 ). If this parameter is not
+            set, records will be coarsely ordered based on arrival time.
+        :type b64_encode: boolean
+        :param b64_encode: Whether to Base64 encode `data`. Can be set to
+            ``False`` if `data` is already encoded to prevent double encoding.
+        """
+        params = {
+            'StreamName': stream_name,
+            'Data': data,
+            'PartitionKey': partition_key,
+        }
+        if explicit_hash_key is not None:
+            params['ExplicitHashKey'] = explicit_hash_key
+        if sequence_number_for_ordering is not None:
+            params['SequenceNumberForOrdering'] = sequence_number_for_ordering
+        if b64_encode:
+            if not isinstance(params['Data'], six.binary_type):
+                params['Data'] = params['Data'].encode('utf-8')
+            params['Data'] = base64.b64encode(params['Data']).decode('utf-8')
+        return self.make_request(action='PutRecord',
+                                 body=json.dumps(params))
+    def put_records(self, records, stream_name, b64_encode=True):
+        """
+        Puts (writes) multiple data records from a producer into an
+        Amazon Kinesis stream in a single call (also referred to as a
+        `PutRecords` request). Use this operation to send data from a
+        data producer into the Amazon Kinesis stream for real-time
+        ingestion and processing. Each shard can support up to 1000
+        records written per second, up to a maximum total of 1 MB data
+        written per second.
+        You must specify the name of the stream that captures, stores,
+        and transports the data; and an array of request `Records`,
+        with each record in the array requiring a partition key and
+        data blob.
+        The data blob can be any type of data; for example, a segment
+        from a log file, geographic/location data, website clickstream
+        data, and so on.
+        The partition key is used by Amazon Kinesis as input to a hash
+        function that maps the partition key and associated data to a
+        specific shard. An MD5 hash function is used to map partition
+        keys to 128-bit integer values and to map associated data
+        records to shards. As a result of this hashing mechanism, all
+        data records with the same partition key map to the same shard
+        within the stream. For more information, see `Partition Key`_
+        in the Amazon Kinesis Developer Guide .
+        Each record in the `Records` array may include an optional
+        parameter, `ExplicitHashKey`, which overrides the partition
+        key to shard mapping. This parameter allows a data producer to
+        determine explicitly the shard where the record is stored. For
+        more information, see `Adding Multiple Records with
+        PutRecords`_ in the Amazon Kinesis Developer Guide .
+        The `PutRecords` response includes an array of response
+        `Records`. Each record in the response array directly
+        correlates with a record in the request array using natural
+        ordering, from the top to the bottom of the request and
+        response. The response `Records` array always includes the
+        same number of records as the request array.
+        The response `Records` array includes both successfully and
+        unsuccessfully processed records. Amazon Kinesis attempts to
+        process all records in each `PutRecords` request. A single
+        record failure does not stop the processing of subsequent
+        records.
+        A successfully-processed record includes `ShardId` and
+        `SequenceNumber` values. The `ShardId` parameter identifies
+        the shard in the stream where the record is stored. The
+        `SequenceNumber` parameter is an identifier assigned to the
+        put record, unique to all records in the stream.
+        An unsuccessfully-processed record includes `ErrorCode` and
+        `ErrorMessage` values. `ErrorCode` reflects the type of error
+        and can be one of the following values:
+        `ProvisionedThroughputExceededException` or `InternalFailure`.
+        `ErrorMessage` provides more detailed information about the
+        `ProvisionedThroughputExceededException` exception including
+        the account ID, stream name, and shard ID of the record that
+        was throttled.
+        Data records are accessible for only 24 hours from the time
+        that they are added to an Amazon Kinesis stream.
+        :type records: list
+        :param records: The records associated with the request.
+        :type stream_name: string
+        :param stream_name: The stream name associated with the request.
+        :type b64_encode: boolean
+        :param b64_encode: Whether to Base64 encode `data`. Can be set to
+            ``False`` if `data` is already encoded to prevent double encoding.
+        """
+        params = {'Records': records, 'StreamName': stream_name, }
+        if b64_encode:
+            for i in range(len(params['Records'])):
+                data = params['Records'][i]['Data']
+                if not isinstance(data, six.binary_type):
+                    data = data.encode('utf-8')
+                params['Records'][i]['Data'] = base64.b64encode(
+                    data).decode('utf-8')
+        return self.make_request(action='PutRecords',
+                                 body=json.dumps(params))
+    def remove_tags_from_stream(self, stream_name, tag_keys):
+        """
+        Deletes tags from the specified Amazon Kinesis stream.
+        If you specify a tag that does not exist, it is ignored.
+        :type stream_name: string
+        :param stream_name: The name of the stream.
+        :type tag_keys: list
+        :param tag_keys: A list of tag keys. Each corresponding tag is removed
+            from the stream.
+        """
+        params = {'StreamName': stream_name, 'TagKeys': tag_keys, }
+        return self.make_request(action='RemoveTagsFromStream',
+                                 body=json.dumps(params))
+    def split_shard(self, stream_name, shard_to_split, new_starting_hash_key):
+        """
+        Splits a shard into two new shards in the stream, to increase
+        the stream's capacity to ingest and transport data.
+        `SplitShard` is called when there is a need to increase the
+        overall capacity of stream because of an expected increase in
+        the volume of data records being ingested.
+        You can also use `SplitShard` when a shard appears to be
+        approaching its maximum utilization, for example, when the set
+        of producers sending data into the specific shard are suddenly
+        sending more than previously anticipated. You can also call
+        `SplitShard` to increase stream capacity, so that more Amazon
+        Kinesis applications can simultaneously read data from the
+        stream for real-time processing.
+        You must specify the shard to be split and the new hash key,
+        which is the position in the shard where the shard gets split
+        in two. In many cases, the new hash key might simply be the
+        average of the beginning and ending hash key, but it can be
+        any hash key value in the range being mapped into the shard.
+        For more information about splitting shards, see `Split a
+        Shard`_ in the Amazon Kinesis Developer Guide .
+        You can use DescribeStream to determine the shard ID and hash
+        key values for the `ShardToSplit` and `NewStartingHashKey`
+        parameters that are specified in the `SplitShard` request.
+        `SplitShard` is an asynchronous operation. Upon receiving a
+        `SplitShard` request, Amazon Kinesis immediately returns a
+        response and sets the stream status to `UPDATING`. After the
+        operation is completed, Amazon Kinesis sets the stream status
+        to `ACTIVE`. Read and write operations continue to work while
+        the stream is in the `UPDATING` state.
+        You can use `DescribeStream` to check the status of the
+        stream, which is returned in `StreamStatus`. If the stream is
+        in the `ACTIVE` state, you can call `SplitShard`. If a stream
+        is in `CREATING` or `UPDATING` or `DELETING` states,
+        `DescribeStream` returns a `ResourceInUseException`.
+        If the specified stream does not exist, `DescribeStream`
+        returns a `ResourceNotFoundException`. If you try to create
+        more shards than are authorized for your account, you receive
+        a `LimitExceededException`.
+        The default limit for an AWS account is 10 shards per stream.
+        If you need to create a stream with more than 10 shards,
+        `contact AWS Support`_ to increase the limit on your account.
+        If you try to operate on too many streams in parallel using
+        CreateStream, DeleteStream, MergeShards or SplitShard, you
+        receive a `LimitExceededException`.
+        `SplitShard` has limit of 5 transactions per second per
+        account.
+        :type stream_name: string
+        :param stream_name: The name of the stream for the shard split.
+        :type shard_to_split: string
+        :param shard_to_split: The shard ID of the shard to split.
+        :type new_starting_hash_key: string
+        :param new_starting_hash_key: A hash key value for the starting hash
+            key of one of the child shards created by the split. The hash key
+            range for a given shard constitutes a set of ordered contiguous
+            positive integers. The value for `NewStartingHashKey` must be in
+            the range of hash keys being mapped into the shard. The
+            `NewStartingHashKey` hash key value and all higher hash key values
+            in hash key range are distributed to one of the child shards. All
+            the lower hash key values in the range are distributed to the other
+            child shard.
+        """
+        params = {
+            'StreamName': stream_name,
+            'ShardToSplit': shard_to_split,
+            'NewStartingHashKey': new_starting_hash_key,
+        }
+        return self.make_request(action='SplitShard',
+                                 body=json.dumps(params))
+    def make_request(self, action, body):
+        headers = {
+            'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action),
+            'Host': self.region.endpoint,
+            'Content-Type': 'application/x-amz-json-1.1',
+            'Content-Length': str(len(body)),
+        }
+        http_request = self.build_base_http_request(
+            method='POST', path='/', auth_path='/', params={},
+            headers=headers, data=body)
+        response = self._mexe(http_request, sender=None,
+                              override_num_retries=10)
+        response_body = response.read().decode('utf-8')
+        boto.log.debug(response.getheaders())
+        boto.log.debug(response_body)
+        if response.status == 200:
+            if response_body:
+                return json.loads(response_body)
+        else:
+            json_body = json.loads(response_body)
+            fault_name = json_body.get('__type', None)
+            exception_class = self._faults.get(fault_name, self.ResponseError)
+            raise exception_class(response.status, response.reason,
+                                  body=json_body)