diff env/lib/python3.7/site-packages/requests_toolbelt/multipart/decoder.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/requests_toolbelt/multipart/decoder.py	Thu May 14 14:56:58 2020 -0400
@@ -0,0 +1,156 @@
+# -*- coding: utf-8 -*-
+"""
+
+requests_toolbelt.multipart.decoder
+===================================
+
+This holds all the implementation details of the MultipartDecoder
+
+"""
+
+import sys
+import email.parser
+from .encoder import encode_with
+from requests.structures import CaseInsensitiveDict
+
+
+def _split_on_find(content, bound):
+    point = content.find(bound)
+    return content[:point], content[point + len(bound):]
+
+
+class ImproperBodyPartContentException(Exception):
+    pass
+
+
+class NonMultipartContentTypeException(Exception):
+    pass
+
+
+def _header_parser(string, encoding):
+    major = sys.version_info[0]
+    if major == 3:
+        string = string.decode(encoding)
+    headers = email.parser.HeaderParser().parsestr(string).items()
+    return (
+        (encode_with(k, encoding), encode_with(v, encoding))
+        for k, v in headers
+    )
+
+
+class BodyPart(object):
+    """
+
+    The ``BodyPart`` object is a ``Response``-like interface to an individual
+    subpart of a multipart response. It is expected that these will
+    generally be created by objects of the ``MultipartDecoder`` class.
+
+    Like ``Response``, there is a ``CaseInsensitiveDict`` object named headers,
+    ``content`` to access bytes, ``text`` to access unicode, and ``encoding``
+    to access the unicode codec.
+
+    """
+
+    def __init__(self, content, encoding):
+        self.encoding = encoding
+        headers = {}
+        # Split into header section (if any) and the content
+        if b'\r\n\r\n' in content:
+            first, self.content = _split_on_find(content, b'\r\n\r\n')
+            if first != b'':
+                headers = _header_parser(first.lstrip(), encoding)
+        else:
+            raise ImproperBodyPartContentException(
+                'content does not contain CR-LF-CR-LF'
+            )
+        self.headers = CaseInsensitiveDict(headers)
+
+    @property
+    def text(self):
+        """Content of the ``BodyPart`` in unicode."""
+        return self.content.decode(self.encoding)
+
+
+class MultipartDecoder(object):
+    """
+
+    The ``MultipartDecoder`` object parses the multipart payload of
+    a bytestring into a tuple of ``Response``-like ``BodyPart`` objects.
+
+    The basic usage is::
+
+        import requests
+        from requests_toolbelt import MultipartDecoder
+
+        response = request.get(url)
+        decoder = MultipartDecoder.from_response(response)
+        for part in decoder.parts:
+            print(part.headers['content-type'])
+
+    If the multipart content is not from a response, basic usage is::
+
+        from requests_toolbelt import MultipartDecoder
+
+        decoder = MultipartDecoder(content, content_type)
+        for part in decoder.parts:
+            print(part.headers['content-type'])
+
+    For both these usages, there is an optional ``encoding`` parameter. This is
+    a string, which is the name of the unicode codec to use (default is
+    ``'utf-8'``).
+
+    """
+    def __init__(self, content, content_type, encoding='utf-8'):
+        #: Original Content-Type header
+        self.content_type = content_type
+        #: Response body encoding
+        self.encoding = encoding
+        #: Parsed parts of the multipart response body
+        self.parts = tuple()
+        self._find_boundary()
+        self._parse_body(content)
+
+    def _find_boundary(self):
+        ct_info = tuple(x.strip() for x in self.content_type.split(';'))
+        mimetype = ct_info[0]
+        if mimetype.split('/')[0].lower() != 'multipart':
+            raise NonMultipartContentTypeException(
+                "Unexpected mimetype in content-type: '{0}'".format(mimetype)
+            )
+        for item in ct_info[1:]:
+            attr, value = _split_on_find(
+                item,
+                '='
+            )
+            if attr.lower() == 'boundary':
+                self.boundary = encode_with(value.strip('"'), self.encoding)
+
+    @staticmethod
+    def _fix_first_part(part, boundary_marker):
+        bm_len = len(boundary_marker)
+        if boundary_marker == part[:bm_len]:
+            return part[bm_len:]
+        else:
+            return part
+
+    def _parse_body(self, content):
+        boundary = b''.join((b'--', self.boundary))
+
+        def body_part(part):
+            fixed = MultipartDecoder._fix_first_part(part, boundary)
+            return BodyPart(fixed, self.encoding)
+
+        def test_part(part):
+            return (part != b'' and
+                    part != b'\r\n' and
+                    part[:4] != b'--\r\n' and
+                    part != b'--')
+
+        parts = content.split(b''.join((b'\r\n', boundary)))
+        self.parts = tuple(body_part(x) for x in parts if test_part(x))
+
+    @classmethod
+    def from_response(cls, response, encoding='utf-8'):
+        content = response.content
+        content_type = response.headers.get('content-type', None)
+        return cls(content, content_type, encoding)