diff planemo/lib/python3.7/site-packages/boto/sqs/queue.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/boto/sqs/queue.py	Fri Jul 31 00:18:57 2020 -0400
@@ -0,0 +1,541 @@
+# Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+"""
+Represents an SQS Queue
+"""
+from boto.compat import urllib
+from boto.sqs.message import Message
+
+
+class Queue(object):
+
+    def __init__(self, connection=None, url=None, message_class=Message):
+        self.connection = connection
+        self.url = url
+        self.message_class = message_class
+        self.visibility_timeout = None
+
+    def __repr__(self):
+        return 'Queue(%s)' % self.url
+
+    def _id(self):
+        if self.url:
+            val = urllib.parse.urlparse(self.url)[2]
+        else:
+            val = self.url
+        return val
+    id = property(_id)
+
+    def _name(self):
+        if self.url:
+            val = urllib.parse.urlparse(self.url)[2].split('/')[2]
+        else:
+            val = self.url
+        return  val
+    name = property(_name)
+
+    def _arn(self):
+        parts = self.id.split('/')
+        if self.connection.region.name == 'cn-north-1':
+            partition = 'aws-cn'
+        else:
+            partition = 'aws'
+        return 'arn:%s:sqs:%s:%s:%s' % (
+            partition, self.connection.region.name, parts[1], parts[2])
+    arn = property(_arn)
+
+    def startElement(self, name, attrs, connection):
+        return None
+
+    def endElement(self, name, value, connection):
+        if name == 'QueueUrl':
+            self.url = value
+        elif name == 'VisibilityTimeout':
+            self.visibility_timeout = int(value)
+        else:
+            setattr(self, name, value)
+
+    def set_message_class(self, message_class):
+        """
+        Set the message class that should be used when instantiating
+        messages read from the queue.  By default, the class
+        :class:`boto.sqs.message.Message` is used but this can be overriden
+        with any class that behaves like a message.
+
+        :type message_class: Message-like class
+        :param message_class:  The new Message class
+        """
+        self.message_class = message_class
+
+    def get_attributes(self, attributes='All'):
+        """
+        Retrieves attributes about this queue object and returns
+        them in an Attribute instance (subclass of a Dictionary).
+
+        :type attributes: string
+        :param attributes: String containing one of:
+                           ApproximateNumberOfMessages,
+                           ApproximateNumberOfMessagesNotVisible,
+                           VisibilityTimeout,
+                           CreatedTimestamp,
+                           LastModifiedTimestamp,
+                           Policy
+                           ReceiveMessageWaitTimeSeconds
+        :rtype: Attribute object
+        :return: An Attribute object which is a mapping type holding the
+                 requested name/value pairs
+        """
+        return self.connection.get_queue_attributes(self, attributes)
+
+    def set_attribute(self, attribute, value):
+        """
+        Set a new value for an attribute of the Queue.
+
+        :type attribute: String
+        :param attribute: The name of the attribute you want to set.
+
+        :param value: The new value for the attribute must be:
+
+
+            * For `DelaySeconds` the value must be an integer number of
+            seconds from 0 to 900 (15 minutes).
+                >>> queue.set_attribute('DelaySeconds', 900)
+
+            * For `MaximumMessageSize` the value must be an integer number of
+            bytes from 1024 (1 KiB) to 262144 (256 KiB).
+                >>> queue.set_attribute('MaximumMessageSize', 262144)
+
+            * For `MessageRetentionPeriod` the value must be an integer number of
+            seconds from 60 (1 minute) to 1209600 (14 days).
+                >>> queue.set_attribute('MessageRetentionPeriod', 1209600)
+
+            * For `Policy` the value must be an string that contains JSON formatted
+            parameters and values.
+                >>> queue.set_attribute('Policy', json.dumps({
+                ...     'Version': '2008-10-17',
+                ...     'Id': '/123456789012/testQueue/SQSDefaultPolicy',
+                ...     'Statement': [
+                ...        {
+                ...            'Sid': 'Queue1ReceiveMessage',
+                ...            'Effect': 'Allow',
+                ...            'Principal': {
+                ...                'AWS': '*'
+                ...            },
+                ...            'Action': 'SQS:ReceiveMessage',
+                ...            'Resource': 'arn:aws:aws:sqs:us-east-1:123456789012:testQueue'
+                ...        }
+                ...    ]
+                ... }))
+
+            * For `ReceiveMessageWaitTimeSeconds` the value must be an integer number of
+            seconds from 0 to 20.
+                >>> queue.set_attribute('ReceiveMessageWaitTimeSeconds', 20)
+
+            * For `VisibilityTimeout` the value must be an integer number of
+            seconds from 0 to 43200 (12 hours).
+                >>> queue.set_attribute('VisibilityTimeout', 43200)
+
+            * For `RedrivePolicy` the value must be an string that contains JSON formatted
+            parameters and values. You can set maxReceiveCount to a value between 1 and 1000.
+            The deadLetterTargetArn value is the Amazon Resource Name (ARN) of the queue that
+            will receive the dead letter messages.
+                >>> queue.set_attribute('RedrivePolicy', json.dumps({
+                ...    'maxReceiveCount': 5,
+                ...    'deadLetterTargetArn': "arn:aws:aws:sqs:us-east-1:123456789012:testDeadLetterQueue"
+                ... }))
+
+        :rtype: bool
+        :return: True if successful, otherwise False.
+        """
+        return self.connection.set_queue_attribute(self, attribute, value)
+
+    def get_timeout(self):
+        """
+        Get the visibility timeout for the queue.
+
+        :rtype: int
+        :return: The number of seconds as an integer.
+        """
+        a = self.get_attributes('VisibilityTimeout')
+        return int(a['VisibilityTimeout'])
+
+    def set_timeout(self, visibility_timeout):
+        """
+        Set the visibility timeout for the queue.
+
+        :type visibility_timeout: int
+        :param visibility_timeout: The desired timeout in seconds
+        """
+        retval = self.set_attribute('VisibilityTimeout', visibility_timeout)
+        if retval:
+            self.visibility_timeout = visibility_timeout
+        return retval
+
+    def add_permission(self, label, aws_account_id, action_name):
+        """
+        Add a permission to a queue.
+
+        :type label: str or unicode
+        :param label: A unique identification of the permission you are setting.
+            Maximum of 80 characters ``[0-9a-zA-Z_-]``
+            Example, AliceSendMessage
+
+        :type aws_account_id: str or unicode
+        :param principal_id: The AWS account number of the principal who
+            will be given permission.  The principal must have an AWS account,
+            but does not need to be signed up for Amazon SQS. For information
+            about locating the AWS account identification.
+
+        :type action_name: str or unicode
+        :param action_name: The action.  Valid choices are:
+            SendMessage|ReceiveMessage|DeleteMessage|
+            ChangeMessageVisibility|GetQueueAttributes|*
+
+        :rtype: bool
+        :return: True if successful, False otherwise.
+
+        """
+        return self.connection.add_permission(self, label, aws_account_id,
+                                              action_name)
+
+    def remove_permission(self, label):
+        """
+        Remove a permission from a queue.
+
+        :type label: str or unicode
+        :param label: The unique label associated with the permission
+            being removed.
+
+        :rtype: bool
+        :return: True if successful, False otherwise.
+        """
+        return self.connection.remove_permission(self, label)
+
+    def read(self, visibility_timeout=None, wait_time_seconds=None,
+             message_attributes=None):
+        """
+        Read a single message from the queue.
+
+        :type visibility_timeout: int
+        :param visibility_timeout: The timeout for this message in seconds
+
+        :type wait_time_seconds: int
+        :param wait_time_seconds: The duration (in seconds) for which the call
+            will wait for a message to arrive in the queue before returning.
+            If a message is available, the call will return sooner than
+            wait_time_seconds.
+
+        :type message_attributes: list
+        :param message_attributes: The name(s) of additional message
+            attributes to return. The default is to return no additional
+            message attributes. Use ``['All']`` or ``['.*']`` to return all.
+
+        :rtype: :class:`boto.sqs.message.Message`
+        :return: A single message or None if queue is empty
+        """
+        rs = self.get_messages(1, visibility_timeout,
+                               wait_time_seconds=wait_time_seconds,
+                               message_attributes=message_attributes)
+        if len(rs) == 1:
+            return rs[0]
+        else:
+            return None
+
+    def write(self, message, delay_seconds=None):
+        """
+        Add a single message to the queue.
+
+        :type message: Message
+        :param message: The message to be written to the queue
+
+        :rtype: :class:`boto.sqs.message.Message`
+        :return: The :class:`boto.sqs.message.Message` object that was written.
+        """
+        new_msg = self.connection.send_message(self,
+            message.get_body_encoded(), delay_seconds=delay_seconds,
+            message_attributes=message.message_attributes)
+        message.id = new_msg.id
+        message.md5 = new_msg.md5
+        return message
+
+    def write_batch(self, messages):
+        """
+        Delivers up to 10 messages in a single request.
+
+        :type messages: List of lists.
+        :param messages: A list of lists or tuples.  Each inner
+            tuple represents a single message to be written
+            and consists of and ID (string) that must be unique
+            within the list of messages, the message body itself
+            which can be a maximum of 64K in length, an
+            integer which represents the delay time (in seconds)
+            for the message (0-900) before the message will
+            be delivered to the queue, and an optional dict of
+            message attributes like those passed to ``send_message``
+            in the connection class.
+        """
+        return self.connection.send_message_batch(self, messages)
+
+    def new_message(self, body='', **kwargs):
+        """
+        Create new message of appropriate class.
+
+        :type body: message body
+        :param body: The body of the newly created message (optional).
+
+        :rtype: :class:`boto.sqs.message.Message`
+        :return: A new Message object
+        """
+        m = self.message_class(self, body, **kwargs)
+        m.queue = self
+        return m
+
+    # get a variable number of messages, returns a list of messages
+    def get_messages(self, num_messages=1, visibility_timeout=None,
+                     attributes=None, wait_time_seconds=None,
+                     message_attributes=None):
+        """
+        Get a variable number of messages.
+
+        :type num_messages: int
+        :param num_messages: The maximum number of messages to read from
+            the queue.
+
+        :type visibility_timeout: int
+        :param visibility_timeout: The VisibilityTimeout for the messages read.
+
+        :type attributes: str
+        :param attributes: The name of additional attribute to return
+            with response or All if you want all attributes.  The
+            default is to return no additional attributes.  Valid
+            values: All SenderId SentTimestamp ApproximateReceiveCount
+            ApproximateFirstReceiveTimestamp
+
+        :type wait_time_seconds: int
+        :param wait_time_seconds: The duration (in seconds) for which the call
+            will wait for a message to arrive in the queue before returning.
+            If a message is available, the call will return sooner than
+            wait_time_seconds.
+
+        :type message_attributes: list
+        :param message_attributes: The name(s) of additional message
+            attributes to return. The default is to return no additional
+            message attributes. Use ``['All']`` or ``['.*']`` to return all.
+
+        :rtype: list
+        :return: A list of :class:`boto.sqs.message.Message` objects.
+        """
+        return self.connection.receive_message(
+            self, number_messages=num_messages,
+            visibility_timeout=visibility_timeout, attributes=attributes,
+            wait_time_seconds=wait_time_seconds,
+            message_attributes=message_attributes)
+
+    def delete_message(self, message):
+        """
+        Delete a message from the queue.
+
+        :type message: :class:`boto.sqs.message.Message`
+        :param message: The :class:`boto.sqs.message.Message` object to delete.
+
+        :rtype: bool
+        :return: True if successful, False otherwise
+        """
+        return self.connection.delete_message(self, message)
+
+    def delete_message_batch(self, messages):
+        """
+        Deletes a list of messages in a single request.
+
+        :type messages: List of :class:`boto.sqs.message.Message` objects.
+        :param messages: A list of message objects.
+        """
+        return self.connection.delete_message_batch(self, messages)
+
+    def change_message_visibility_batch(self, messages):
+        """
+        A batch version of change_message_visibility that can act
+        on up to 10 messages at a time.
+
+        :type messages: List of tuples.
+        :param messages: A list of tuples where each tuple consists
+            of a :class:`boto.sqs.message.Message` object and an integer
+            that represents the new visibility timeout for that message.
+        """
+        return self.connection.change_message_visibility_batch(self, messages)
+
+    def delete(self):
+        """
+        Delete the queue.
+        """
+        return self.connection.delete_queue(self)
+
+    def purge(self):
+        """
+        Purge all messages in the queue.
+        """
+        return self.connection.purge_queue(self)
+
+    def clear(self, page_size=10, vtimeout=10):
+        """Deprecated utility function to remove all messages from a queue"""
+        return self.purge()
+
+    def count(self, page_size=10, vtimeout=10):
+        """
+        Utility function to count the number of messages in a queue.
+        Note: This function now calls GetQueueAttributes to obtain
+        an 'approximate' count of the number of messages in a queue.
+        """
+        a = self.get_attributes('ApproximateNumberOfMessages')
+        return int(a['ApproximateNumberOfMessages'])
+
+    def count_slow(self, page_size=10, vtimeout=10):
+        """
+        Deprecated.  This is the old 'count' method that actually counts
+        the messages by reading them all.  This gives an accurate count but
+        is very slow for queues with non-trivial number of messasges.
+        Instead, use get_attributes('ApproximateNumberOfMessages') to take
+        advantage of the new SQS capability.  This is retained only for
+        the unit tests.
+        """
+        n = 0
+        l = self.get_messages(page_size, vtimeout)
+        while l:
+            for m in l:
+                n += 1
+            l = self.get_messages(page_size, vtimeout)
+        return n
+
+    def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
+        """Utility function to dump the messages in a queue to a file
+        NOTE: Page size must be < 10 else SQS errors"""
+        fp = open(file_name, 'wb')
+        n = 0
+        l = self.get_messages(page_size, vtimeout)
+        while l:
+            for m in l:
+                fp.write(m.get_body())
+                if sep:
+                    fp.write(sep)
+                n += 1
+            l = self.get_messages(page_size, vtimeout)
+        fp.close()
+        return n
+
+    def save_to_file(self, fp, sep='\n'):
+        """
+        Read all messages from the queue and persist them to file-like object.
+        Messages are written to the file and the 'sep' string is written
+        in between messages.  Messages are deleted from the queue after
+        being written to the file.
+        Returns the number of messages saved.
+        """
+        n = 0
+        m = self.read()
+        while m:
+            n += 1
+            fp.write(m.get_body())
+            if sep:
+                fp.write(sep)
+            self.delete_message(m)
+            m = self.read()
+        return n
+
+    def save_to_filename(self, file_name, sep='\n'):
+        """
+        Read all messages from the queue and persist them to local file.
+        Messages are written to the file and the 'sep' string is written
+        in between messages.  Messages are deleted from the queue after
+        being written to the file.
+        Returns the number of messages saved.
+        """
+        fp = open(file_name, 'wb')
+        n = self.save_to_file(fp, sep)
+        fp.close()
+        return n
+
+    # for backwards compatibility
+    save = save_to_filename
+
+    def save_to_s3(self, bucket):
+        """
+        Read all messages from the queue and persist them to S3.
+        Messages are stored in the S3 bucket using a naming scheme of::
+
+            <queue_id>/<message_id>
+
+        Messages are deleted from the queue after being saved to S3.
+        Returns the number of messages saved.
+        """
+        n = 0
+        m = self.read()
+        while m:
+            n += 1
+            key = bucket.new_key('%s/%s' % (self.id, m.id))
+            key.set_contents_from_string(m.get_body())
+            self.delete_message(m)
+            m = self.read()
+        return n
+
+    def load_from_s3(self, bucket, prefix=None):
+        """
+        Load messages previously saved to S3.
+        """
+        n = 0
+        if prefix:
+            prefix = '%s/' % prefix
+        else:
+            prefix = '%s/' % self.id[1:]
+        rs = bucket.list(prefix=prefix)
+        for key in rs:
+            n += 1
+            m = self.new_message(key.get_contents_as_string())
+            self.write(m)
+        return n
+
+    def load_from_file(self, fp, sep='\n'):
+        """Utility function to load messages from a file-like object to a queue"""
+        n = 0
+        body = ''
+        l = fp.readline()
+        while l:
+            if l == sep:
+                m = Message(self, body)
+                self.write(m)
+                n += 1
+                print('writing message %d' % n)
+                body = ''
+            else:
+                body = body + l
+            l = fp.readline()
+        return n
+
+    def load_from_filename(self, file_name, sep='\n'):
+        """Utility function to load messages from a local filename to a queue"""
+        fp = open(file_name, 'rb')
+        n = self.load_from_file(fp, sep)
+        fp.close()
+        return n
+
+    # for backward compatibility
+    load = load_from_filename
+