comparison env/lib/python3.7/site-packages/requests_toolbelt/multipart/decoder.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 # -*- coding: utf-8 -*-
2 """
3
4 requests_toolbelt.multipart.decoder
5 ===================================
6
7 This holds all the implementation details of the MultipartDecoder
8
9 """
10
11 import sys
12 import email.parser
13 from .encoder import encode_with
14 from requests.structures import CaseInsensitiveDict
15
16
17 def _split_on_find(content, bound):
18 point = content.find(bound)
19 return content[:point], content[point + len(bound):]
20
21
22 class ImproperBodyPartContentException(Exception):
23 pass
24
25
26 class NonMultipartContentTypeException(Exception):
27 pass
28
29
30 def _header_parser(string, encoding):
31 major = sys.version_info[0]
32 if major == 3:
33 string = string.decode(encoding)
34 headers = email.parser.HeaderParser().parsestr(string).items()
35 return (
36 (encode_with(k, encoding), encode_with(v, encoding))
37 for k, v in headers
38 )
39
40
41 class BodyPart(object):
42 """
43
44 The ``BodyPart`` object is a ``Response``-like interface to an individual
45 subpart of a multipart response. It is expected that these will
46 generally be created by objects of the ``MultipartDecoder`` class.
47
48 Like ``Response``, there is a ``CaseInsensitiveDict`` object named headers,
49 ``content`` to access bytes, ``text`` to access unicode, and ``encoding``
50 to access the unicode codec.
51
52 """
53
54 def __init__(self, content, encoding):
55 self.encoding = encoding
56 headers = {}
57 # Split into header section (if any) and the content
58 if b'\r\n\r\n' in content:
59 first, self.content = _split_on_find(content, b'\r\n\r\n')
60 if first != b'':
61 headers = _header_parser(first.lstrip(), encoding)
62 else:
63 raise ImproperBodyPartContentException(
64 'content does not contain CR-LF-CR-LF'
65 )
66 self.headers = CaseInsensitiveDict(headers)
67
68 @property
69 def text(self):
70 """Content of the ``BodyPart`` in unicode."""
71 return self.content.decode(self.encoding)
72
73
74 class MultipartDecoder(object):
75 """
76
77 The ``MultipartDecoder`` object parses the multipart payload of
78 a bytestring into a tuple of ``Response``-like ``BodyPart`` objects.
79
80 The basic usage is::
81
82 import requests
83 from requests_toolbelt import MultipartDecoder
84
85 response = request.get(url)
86 decoder = MultipartDecoder.from_response(response)
87 for part in decoder.parts:
88 print(part.headers['content-type'])
89
90 If the multipart content is not from a response, basic usage is::
91
92 from requests_toolbelt import MultipartDecoder
93
94 decoder = MultipartDecoder(content, content_type)
95 for part in decoder.parts:
96 print(part.headers['content-type'])
97
98 For both these usages, there is an optional ``encoding`` parameter. This is
99 a string, which is the name of the unicode codec to use (default is
100 ``'utf-8'``).
101
102 """
103 def __init__(self, content, content_type, encoding='utf-8'):
104 #: Original Content-Type header
105 self.content_type = content_type
106 #: Response body encoding
107 self.encoding = encoding
108 #: Parsed parts of the multipart response body
109 self.parts = tuple()
110 self._find_boundary()
111 self._parse_body(content)
112
113 def _find_boundary(self):
114 ct_info = tuple(x.strip() for x in self.content_type.split(';'))
115 mimetype = ct_info[0]
116 if mimetype.split('/')[0].lower() != 'multipart':
117 raise NonMultipartContentTypeException(
118 "Unexpected mimetype in content-type: '{0}'".format(mimetype)
119 )
120 for item in ct_info[1:]:
121 attr, value = _split_on_find(
122 item,
123 '='
124 )
125 if attr.lower() == 'boundary':
126 self.boundary = encode_with(value.strip('"'), self.encoding)
127
128 @staticmethod
129 def _fix_first_part(part, boundary_marker):
130 bm_len = len(boundary_marker)
131 if boundary_marker == part[:bm_len]:
132 return part[bm_len:]
133 else:
134 return part
135
136 def _parse_body(self, content):
137 boundary = b''.join((b'--', self.boundary))
138
139 def body_part(part):
140 fixed = MultipartDecoder._fix_first_part(part, boundary)
141 return BodyPart(fixed, self.encoding)
142
143 def test_part(part):
144 return (part != b'' and
145 part != b'\r\n' and
146 part[:4] != b'--\r\n' and
147 part != b'--')
148
149 parts = content.split(b''.join((b'\r\n', boundary)))
150 self.parts = tuple(body_part(x) for x in parts if test_part(x))
151
152 @classmethod
153 def from_response(cls, response, encoding='utf-8'):
154 content = response.content
155 content_type = response.headers.get('content-type', None)
156 return cls(content, content_type, encoding)