Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/requests_toolbelt/multipart/decoder.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
# -*- coding: utf-8 -*- """ requests_toolbelt.multipart.decoder =================================== This holds all the implementation details of the MultipartDecoder """ import sys import email.parser from .encoder import encode_with from requests.structures import CaseInsensitiveDict def _split_on_find(content, bound): point = content.find(bound) return content[:point], content[point + len(bound):] class ImproperBodyPartContentException(Exception): pass class NonMultipartContentTypeException(Exception): pass def _header_parser(string, encoding): major = sys.version_info[0] if major == 3: string = string.decode(encoding) headers = email.parser.HeaderParser().parsestr(string).items() return ( (encode_with(k, encoding), encode_with(v, encoding)) for k, v in headers ) class BodyPart(object): """ The ``BodyPart`` object is a ``Response``-like interface to an individual subpart of a multipart response. It is expected that these will generally be created by objects of the ``MultipartDecoder`` class. Like ``Response``, there is a ``CaseInsensitiveDict`` object named headers, ``content`` to access bytes, ``text`` to access unicode, and ``encoding`` to access the unicode codec. """ def __init__(self, content, encoding): self.encoding = encoding headers = {} # Split into header section (if any) and the content if b'\r\n\r\n' in content: first, self.content = _split_on_find(content, b'\r\n\r\n') if first != b'': headers = _header_parser(first.lstrip(), encoding) else: raise ImproperBodyPartContentException( 'content does not contain CR-LF-CR-LF' ) self.headers = CaseInsensitiveDict(headers) @property def text(self): """Content of the ``BodyPart`` in unicode.""" return self.content.decode(self.encoding) class MultipartDecoder(object): """ The ``MultipartDecoder`` object parses the multipart payload of a bytestring into a tuple of ``Response``-like ``BodyPart`` objects. The basic usage is:: import requests from requests_toolbelt import MultipartDecoder response = request.get(url) decoder = MultipartDecoder.from_response(response) for part in decoder.parts: print(part.headers['content-type']) If the multipart content is not from a response, basic usage is:: from requests_toolbelt import MultipartDecoder decoder = MultipartDecoder(content, content_type) for part in decoder.parts: print(part.headers['content-type']) For both these usages, there is an optional ``encoding`` parameter. This is a string, which is the name of the unicode codec to use (default is ``'utf-8'``). """ def __init__(self, content, content_type, encoding='utf-8'): #: Original Content-Type header self.content_type = content_type #: Response body encoding self.encoding = encoding #: Parsed parts of the multipart response body self.parts = tuple() self._find_boundary() self._parse_body(content) def _find_boundary(self): ct_info = tuple(x.strip() for x in self.content_type.split(';')) mimetype = ct_info[0] if mimetype.split('/')[0].lower() != 'multipart': raise NonMultipartContentTypeException( "Unexpected mimetype in content-type: '{0}'".format(mimetype) ) for item in ct_info[1:]: attr, value = _split_on_find( item, '=' ) if attr.lower() == 'boundary': self.boundary = encode_with(value.strip('"'), self.encoding) @staticmethod def _fix_first_part(part, boundary_marker): bm_len = len(boundary_marker) if boundary_marker == part[:bm_len]: return part[bm_len:] else: return part def _parse_body(self, content): boundary = b''.join((b'--', self.boundary)) def body_part(part): fixed = MultipartDecoder._fix_first_part(part, boundary) return BodyPart(fixed, self.encoding) def test_part(part): return (part != b'' and part != b'\r\n' and part[:4] != b'--\r\n' and part != b'--') parts = content.split(b''.join((b'\r\n', boundary))) self.parts = tuple(body_part(x) for x in parts if test_part(x)) @classmethod def from_response(cls, response, encoding='utf-8'): content = response.content content_type = response.headers.get('content-type', None) return cls(content, content_type, encoding)