diff planemo/lib/python3.7/site-packages/boto/cloudsearch2/document.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/boto/cloudsearch2/document.py	Fri Jul 31 00:18:57 2020 -0400
@@ -0,0 +1,315 @@
+# Copyright (c) 2012 Mitch Garnaat http://garnaat.org/
+# Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+
+import boto.exception
+from boto.compat import json
+import requests
+import boto
+from boto.cloudsearchdomain.layer1 import CloudSearchDomainConnection
+
+
+class SearchServiceException(Exception):
+    pass
+
+
+class CommitMismatchError(Exception):
+    # Let's do some extra work and let the user handle errors on his/her own.
+
+    errors = None
+
+
+class EncodingError(Exception):
+    """
+    Content sent for Cloud Search indexing was incorrectly encoded.
+
+    This usually happens when a document is marked as unicode but non-unicode
+    characters are present.
+    """
+    pass
+
+
+class ContentTooLongError(Exception):
+    """
+    Content sent for Cloud Search indexing was too long
+
+    This will usually happen when documents queued for indexing add up to more
+    than the limit allowed per upload batch (5MB)
+
+    """
+    pass
+
+
+class DocumentServiceConnection(object):
+    """
+    A CloudSearch document service.
+
+    The DocumentServiceConection is used to add, remove and update documents in
+    CloudSearch. Commands are uploaded to CloudSearch in SDF (Search Document
+    Format).
+
+    To generate an appropriate SDF, use :func:`add` to add or update documents,
+    as well as :func:`delete` to remove documents.
+
+    Once the set of documents is ready to be index, use :func:`commit` to send
+    the commands to CloudSearch.
+
+    If there are a lot of documents to index, it may be preferable to split the
+    generation of SDF data and the actual uploading into CloudSearch. Retrieve
+    the current SDF with :func:`get_sdf`. If this file is the uploaded into S3,
+    it can be retrieved back afterwards for upload into CloudSearch using
+    :func:`add_sdf_from_s3`.
+
+    The SDF is not cleared after a :func:`commit`. If you wish to continue
+    using the DocumentServiceConnection for another batch upload of commands,
+    you will need to :func:`clear_sdf` first to stop the previous batch of
+    commands from being uploaded again.
+
+    """
+
+    def __init__(self, domain=None, endpoint=None):
+        self.domain = domain
+        self.endpoint = endpoint
+        if not self.endpoint:
+            self.endpoint = domain.doc_service_endpoint
+        self.documents_batch = []
+        self._sdf = None
+
+        # Copy proxy settings from connection and check if request should be signed
+        self.proxy = {}
+        self.sign_request = False
+        if self.domain and self.domain.layer1:
+            if self.domain.layer1.use_proxy:
+                self.proxy = {'http': self.domain.layer1.get_proxy_url_with_auth()}
+
+            self.sign_request = getattr(self.domain.layer1, 'sign_request', False)
+
+            if self.sign_request:
+                # Create a domain connection to send signed requests
+                layer1 = self.domain.layer1
+                self.domain_connection = CloudSearchDomainConnection(
+                    host=self.endpoint,
+                    aws_access_key_id=layer1.aws_access_key_id,
+                    aws_secret_access_key=layer1.aws_secret_access_key,
+                    region=layer1.region,
+                    provider=layer1.provider
+                )
+
+    def add(self, _id, fields):
+        """
+        Add a document to be processed by the DocumentService
+
+        The document will not actually be added until :func:`commit` is called
+
+        :type _id: string
+        :param _id: A unique ID used to refer to this document.
+
+        :type fields: dict
+        :param fields: A dictionary of key-value pairs to be uploaded .
+        """
+
+        d = {'type': 'add', 'id': _id, 'fields': fields}
+        self.documents_batch.append(d)
+
+    def delete(self, _id):
+        """
+        Schedule a document to be removed from the CloudSearch service
+
+        The document will not actually be scheduled for removal until
+        :func:`commit` is called
+
+        :type _id: string
+        :param _id: The unique ID of this document.
+        """
+
+        d = {'type': 'delete', 'id': _id}
+        self.documents_batch.append(d)
+
+    def get_sdf(self):
+        """
+        Generate the working set of documents in Search Data Format (SDF)
+
+        :rtype: string
+        :returns: JSON-formatted string of the documents in SDF
+        """
+
+        return self._sdf if self._sdf else json.dumps(self.documents_batch)
+
+    def clear_sdf(self):
+        """
+        Clear the working documents from this DocumentServiceConnection
+
+        This should be used after :func:`commit` if the connection will be
+        reused for another set of documents.
+        """
+
+        self._sdf = None
+        self.documents_batch = []
+
+    def add_sdf_from_s3(self, key_obj):
+        """
+        Load an SDF from S3
+
+        Using this method will result in documents added through
+        :func:`add` and :func:`delete` being ignored.
+
+        :type key_obj: :class:`boto.s3.key.Key`
+        :param key_obj: An S3 key which contains an SDF
+        """
+        #@todo:: (lucas) would be nice if this could just take an s3://uri..."
+
+        self._sdf = key_obj.get_contents_as_string()
+
+    def _commit_with_auth(self, sdf, api_version):
+        return self.domain_connection.upload_documents(sdf, 'application/json')
+
+    def _commit_without_auth(self, sdf, api_version):
+        url = "http://%s/%s/documents/batch" % (self.endpoint, api_version)
+
+        # Keep-alive is automatic in a post-1.0 requests world.
+        session = requests.Session()
+        session.proxies = self.proxy
+        adapter = requests.adapters.HTTPAdapter(
+            pool_connections=20,
+            pool_maxsize=50,
+            max_retries=5
+        )
+        session.mount('http://', adapter)
+        session.mount('https://', adapter)
+
+        resp = session.post(url, data=sdf, headers={'Content-Type': 'application/json'})
+        return resp
+
+    def commit(self):
+        """
+        Actually send an SDF to CloudSearch for processing
+
+        If an SDF file has been explicitly loaded it will be used. Otherwise,
+        documents added through :func:`add` and :func:`delete` will be used.
+
+        :rtype: :class:`CommitResponse`
+        :returns: A summary of documents added and deleted
+        """
+
+        sdf = self.get_sdf()
+
+        if ': null' in sdf:
+            boto.log.error('null value in sdf detected. This will probably '
+                           'raise 500 error.')
+            index = sdf.index(': null')
+            boto.log.error(sdf[index - 100:index + 100])
+
+        api_version = '2013-01-01'
+        if self.domain and self.domain.layer1:
+            api_version = self.domain.layer1.APIVersion
+
+        if self.sign_request:
+            r = self._commit_with_auth(sdf, api_version)
+        else:
+            r = self._commit_without_auth(sdf, api_version)
+
+        return CommitResponse(r, self, sdf, signed_request=self.sign_request)
+
+
+class CommitResponse(object):
+    """Wrapper for response to Cloudsearch document batch commit.
+
+    :type response: :class:`requests.models.Response`
+    :param response: Response from Cloudsearch /documents/batch API
+
+    :type doc_service: :class:`boto.cloudsearch2.document.DocumentServiceConnection`
+    :param doc_service: Object containing the documents posted and methods to
+        retry
+
+    :raises: :class:`boto.exception.BotoServerError`
+    :raises: :class:`boto.cloudsearch2.document.SearchServiceException`
+    :raises: :class:`boto.cloudsearch2.document.EncodingError`
+    :raises: :class:`boto.cloudsearch2.document.ContentTooLongError`
+    """
+    def __init__(self, response, doc_service, sdf, signed_request=False):
+        self.response = response
+        self.doc_service = doc_service
+        self.sdf = sdf
+        self.signed_request = signed_request
+
+        if self.signed_request:
+            self.content = response
+        else:
+            _body = response.content.decode('utf-8')
+
+            try:
+                self.content = json.loads(_body)
+            except:
+                boto.log.error('Error indexing documents.\nResponse Content:\n{0}'
+                               '\n\nSDF:\n{1}'.format(_body, self.sdf))
+                raise boto.exception.BotoServerError(self.response.status_code, '',
+                                                     body=_body)
+
+        self.status = self.content['status']
+        if self.status == 'error':
+            self.errors = [e.get('message') for e in self.content.get('errors',
+                                                                      [])]
+            for e in self.errors:
+                if "Illegal Unicode character" in e:
+                    raise EncodingError("Illegal Unicode character in document")
+                elif e == "The Content-Length is too long":
+                    raise ContentTooLongError("Content was too long")
+        else:
+            self.errors = []
+
+        self.adds = self.content['adds']
+        self.deletes = self.content['deletes']
+        self._check_num_ops('add', self.adds)
+        self._check_num_ops('delete', self.deletes)
+
+    def _check_num_ops(self, type_, response_num):
+        """Raise exception if number of ops in response doesn't match commit
+
+        :type type_: str
+        :param type_: Type of commit operation: 'add' or 'delete'
+
+        :type response_num: int
+        :param response_num: Number of adds or deletes in the response.
+
+        :raises: :class:`boto.cloudsearch2.document.CommitMismatchError`
+        """
+        commit_num = len([d for d in self.doc_service.documents_batch
+                          if d['type'] == type_])
+
+        if response_num != commit_num:
+            if self.signed_request:
+                boto.log.debug(self.response)
+            else:
+                boto.log.debug(self.response.content)
+            # There will always be a commit mismatch error if there is any
+            # errors on cloudsearch. self.errors gets lost when this
+            # CommitMismatchError is raised. Whoever is using boto has no idea
+            # why their commit failed. They can't even notify the user of the
+            # cause by parsing the error messages from amazon. So let's
+            # attach the self.errors to the exceptions if we already spent
+            # time and effort collecting them out of the response.
+            exc = CommitMismatchError(
+                'Incorrect number of {0}s returned. Commit: {1} Response: {2}'
+                .format(type_, commit_num, response_num)
+            )
+            exc.errors = self.errors
+            raise exc