Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/boltons/socketutils.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/boltons/socketutils.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,737 +0,0 @@ -# -*- coding: utf-8 -*- -"""At its heart, Python can be viewed as an extension of the C -programming language. Springing from the most popular systems -programming language has made Python itself a great language for -systems programming. One key to success in this domain is Python's -very serviceable :mod:`socket` module and its :class:`socket.socket` -type. - -The ``socketutils`` module provides natural next steps to the ``socket`` -builtin: straightforward, tested building blocks for higher-level -protocols. - -The :class:`BufferedSocket` wraps an ordinary socket, providing a -layer of intuitive buffering for both sending and receiving. This -facilitates parsing messages from streams, i.e., all sockets with type -``SOCK_STREAM``. The BufferedSocket enables receiving until the next -relevant token, up to a certain size, or until the connection is -closed. For all of these, it provides consistent APIs to size -limiting, as well as timeouts that are compatible with multiple -concurrency paradigms. Use it to parse the next one-off text or binary -socket protocol you encounter. - -This module also provides the :class:`NetstringSocket`, a pure-Python -implementation of `the Netstring protocol`_, built on top of the -:class:`BufferedSocket`, serving as a ready-made, production-grade example. - -Special thanks to `Kurt Rose`_ for his original authorship and all his -contributions on this module. Also thanks to `Daniel J. Bernstein`_, the -original author of `Netstring`_. - -.. _the Netstring protocol: https://en.wikipedia.org/wiki/Netstring -.. _Kurt Rose: https://github.com/doublereedkurt -.. _Daniel J. Bernstein: https://cr.yp.to/ -.. _Netstring: https://cr.yp.to/proto/netstrings.txt - -""" - -import time -import socket - -try: - from threading import RLock -except Exception: - class RLock(object): - 'Dummy reentrant lock for builds without threads' - def __enter__(self): - pass - - def __exit__(self, exctype, excinst, exctb): - pass - - -try: - from typeutils import make_sentinel - _UNSET = make_sentinel(var_name='_UNSET') -except ImportError: - _UNSET = object() - - -DEFAULT_TIMEOUT = 10 # 10 seconds -DEFAULT_MAXSIZE = 32 * 1024 # 32kb -_RECV_LARGE_MAXSIZE = 1024 ** 5 # 1PB - - -class BufferedSocket(object): - """Mainly provides recv_until and recv_size. recv, send, sendall, and - peek all function as similarly as possible to the built-in socket - API. - - This type has been tested against both the built-in socket type as - well as those from gevent and eventlet. It also features support - for sockets with timeouts set to 0 (aka nonblocking), provided the - caller is prepared to handle the EWOULDBLOCK exceptions. - - Args: - sock (socket): The connected socket to be wrapped. - timeout (float): The default timeout for sends and recvs, in - seconds. Set to ``None`` for no timeout, and 0 for - nonblocking. Defaults to *sock*'s own timeout if already set, - and 10 seconds otherwise. - maxsize (int): The default maximum number of bytes to be received - into the buffer before it is considered full and raises an - exception. Defaults to 32 kilobytes. - recvsize (int): The number of bytes to recv for every - lower-level :meth:`socket.recv` call. Defaults to *maxsize*. - - *timeout* and *maxsize* can both be overridden on individual socket - operations. - - All ``recv`` methods return bytestrings (:class:`bytes`) and can - raise :exc:`socket.error`. :exc:`Timeout`, - :exc:`ConnectionClosed`, and :exc:`MessageTooLong` all inherit - from :exc:`socket.error` and exist to provide better error - messages. Received bytes are always buffered, even if an exception - is raised. Use :meth:`BufferedSocket.getrecvbuffer` to retrieve - partial recvs. - - BufferedSocket does not replace the built-in socket by any - means. While the overlapping parts of the API are kept parallel to - the built-in :class:`socket.socket`, BufferedSocket does not - inherit from socket, and most socket functionality is only - available on the underlying socket. :meth:`socket.getpeername`, - :meth:`socket.getsockname`, :meth:`socket.fileno`, and others are - only available on the underlying socket that is wrapped. Use the - ``BufferedSocket.sock`` attribute to access it. See the examples - for more information on how to use BufferedSockets with built-in - sockets. - - The BufferedSocket is threadsafe, but consider the semantics of - your protocol before accessing a single socket from multiple - threads. Similarly, once the BufferedSocket is constructed, avoid - using the underlying socket directly. Only use it for operations - unrelated to messages, e.g., :meth:`socket.getpeername`. - - """ - def __init__(self, sock, timeout=_UNSET, - maxsize=DEFAULT_MAXSIZE, recvsize=_UNSET): - self.sock = sock - self.rbuf = b'' - self.sbuf = [] - self.maxsize = int(maxsize) - - if timeout is _UNSET: - if self.sock.gettimeout() is None: - self.timeout = DEFAULT_TIMEOUT - else: - self.timeout = self.sock.gettimeout() - else: - if timeout is None: - self.timeout = timeout - else: - self.timeout = float(timeout) - - if recvsize is _UNSET: - self._recvsize = self.maxsize - else: - self._recvsize = int(recvsize) - - self._send_lock = RLock() - self._recv_lock = RLock() - - def settimeout(self, timeout): - "Set the default *timeout* for future operations, in seconds." - self.timeout = timeout - - def gettimeout(self): - return self.timeout - - def setblocking(self, blocking): - self.timeout = None if blocking else 0.0 - - def setmaxsize(self, maxsize): - """Set the default maximum buffer size *maxsize* for future - operations, in bytes. Does not truncate the current buffer. - """ - self.maxsize = maxsize - - def getrecvbuffer(self): - "Returns the receive buffer bytestring (rbuf)." - with self._recv_lock: - return self.rbuf - - def getsendbuffer(self): - "Returns a copy of the send buffer list." - with self._send_lock: - return b''.join(self.sbuf) - - def recv(self, size, flags=0, timeout=_UNSET): - """Returns **up to** *size* bytes, using the internal buffer before - performing a single :meth:`socket.recv` operation. - - Args: - size (int): The maximum number of bytes to receive. - flags (int): Kept for API compatibility with sockets. Only - the default, ``0``, is valid. - timeout (float): The timeout for this operation. Can be - ``0`` for nonblocking and ``None`` for no - timeout. Defaults to the value set in the constructor - of BufferedSocket. - - If the operation does not complete in *timeout* seconds, a - :exc:`Timeout` is raised. Much like the built-in - :class:`socket.socket`, if this method returns an empty string, - then the socket is closed and recv buffer is empty. Further - calls to recv will raise :exc:`socket.error`. - - """ - with self._recv_lock: - if timeout is _UNSET: - timeout = self.timeout - if flags: - raise ValueError("non-zero flags not supported: %r" % flags) - if len(self.rbuf) >= size: - data, self.rbuf = self.rbuf[:size], self.rbuf[size:] - return data - if self.rbuf: - ret, self.rbuf = self.rbuf, b'' - return ret - self.sock.settimeout(timeout) - try: - data = self.sock.recv(self._recvsize) - except socket.timeout: - raise Timeout(timeout) # check the rbuf attr for more - if len(data) > size: - data, self.rbuf = data[:size], data[size:] - return data - - def peek(self, size, timeout=_UNSET): - """Returns *size* bytes from the socket and/or internal buffer. Bytes - are retained in BufferedSocket's internal recv buffer. To only - see bytes in the recv buffer, use :meth:`getrecvbuffer`. - - Args: - size (int): The exact number of bytes to peek at - timeout (float): The timeout for this operation. Can be 0 for - nonblocking and None for no timeout. Defaults to the value - set in the constructor of BufferedSocket. - - If the appropriate number of bytes cannot be fetched from the - buffer and socket before *timeout* expires, then a - :exc:`Timeout` will be raised. If the connection is closed, a - :exc:`ConnectionClosed` will be raised. - """ - with self._recv_lock: - if len(self.rbuf) >= size: - return self.rbuf[:size] - data = self.recv_size(size, timeout=timeout) - self.rbuf = data + self.rbuf - return data - - def recv_close(self, timeout=_UNSET, maxsize=_UNSET): - """Receive until the connection is closed, up to *maxsize* bytes. If - more than *maxsize* bytes are received, raises :exc:`MessageTooLong`. - """ - # recv_close works by using recv_size to request maxsize data, - # and ignoring ConnectionClose, returning and clearing the - # internal buffer instead. It raises an exception if - # ConnectionClosed isn't raised. - with self._recv_lock: - if maxsize is _UNSET: - maxsize = self.maxsize - if maxsize is None: - maxsize = _RECV_LARGE_MAXSIZE - try: - recvd = self.recv_size(maxsize + 1, timeout) - except ConnectionClosed: - ret, self.rbuf = self.rbuf, b'' - else: - # put extra received bytes (now in rbuf) after recvd - self.rbuf = recvd + self.rbuf - size_read = min(maxsize, len(self.rbuf)) - raise MessageTooLong(size_read) # check receive buffer - return ret - - def recv_until(self, delimiter, timeout=_UNSET, maxsize=_UNSET, - with_delimiter=False): - """Receive until *delimiter* is found, *maxsize* bytes have been read, - or *timeout* is exceeded. - - Args: - delimiter (bytes): One or more bytes to be searched for - in the socket stream. - timeout (float): The timeout for this operation. Can be 0 for - nonblocking and None for no timeout. Defaults to the value - set in the constructor of BufferedSocket. - maxsize (int): The maximum size for the internal buffer. - Defaults to the value set in the constructor. - with_delimiter (bool): Whether or not to include the - delimiter in the output. ``False`` by default, but - ``True`` is useful in cases where one is simply - forwarding the messages. - - ``recv_until`` will raise the following exceptions: - - * :exc:`Timeout` if more than *timeout* seconds expire. - * :exc:`ConnectionClosed` if the underlying socket is closed - by the sending end. - * :exc:`MessageTooLong` if the delimiter is not found in the - first *maxsize* bytes. - * :exc:`socket.error` if operating in nonblocking mode - (*timeout* equal to 0), or if some unexpected socket error - occurs, such as operating on a closed socket. - - """ - with self._recv_lock: - if maxsize is _UNSET: - maxsize = self.maxsize - if maxsize is None: - maxsize = _RECV_LARGE_MAXSIZE - if timeout is _UNSET: - timeout = self.timeout - len_delimiter = len(delimiter) - - sock = self.sock - recvd = bytearray(self.rbuf) - start = time.time() - find_offset_start = 0 # becomes a negative index below - - if not timeout: # covers None (no timeout) and 0 (nonblocking) - sock.settimeout(timeout) - try: - while 1: - offset = recvd.find(delimiter, find_offset_start, maxsize) - if offset != -1: # str.find returns -1 when no match found - if with_delimiter: # include delimiter in return - offset += len_delimiter - rbuf_offset = offset - else: - rbuf_offset = offset + len_delimiter - break - elif len(recvd) > maxsize: - raise MessageTooLong(maxsize, delimiter) # see rbuf - if timeout: - cur_timeout = timeout - (time.time() - start) - if cur_timeout <= 0.0: - raise socket.timeout() - sock.settimeout(cur_timeout) - nxt = sock.recv(self._recvsize) - if not nxt: - args = (len(recvd), delimiter) - msg = ('connection closed after reading %s bytes' - ' without finding symbol: %r' % args) - raise ConnectionClosed(msg) # check the recv buffer - recvd.extend(nxt) - find_offset_start = -len(nxt) - len_delimiter + 1 - except socket.timeout: - self.rbuf = bytes(recvd) - msg = ('read %s bytes without finding delimiter: %r' - % (len(recvd), delimiter)) - raise Timeout(timeout, msg) # check the recv buffer - except Exception: - self.rbuf = bytes(recvd) - raise - val, self.rbuf = bytes(recvd[:offset]), bytes(recvd[rbuf_offset:]) - return val - - def recv_size(self, size, timeout=_UNSET): - """Read off of the internal buffer, then off the socket, until - *size* bytes have been read. - - Args: - size (int): number of bytes to read before returning. - timeout (float): The timeout for this operation. Can be 0 for - nonblocking and None for no timeout. Defaults to the value - set in the constructor of BufferedSocket. - - If the appropriate number of bytes cannot be fetched from the - buffer and socket before *timeout* expires, then a - :exc:`Timeout` will be raised. If the connection is closed, a - :exc:`ConnectionClosed` will be raised. - """ - with self._recv_lock: - if timeout is _UNSET: - timeout = self.timeout - chunks = [] - total_bytes = 0 - try: - start = time.time() - self.sock.settimeout(timeout) - nxt = self.rbuf or self.sock.recv(self._recvsize) - while nxt: - total_bytes += len(nxt) - if total_bytes >= size: - break - chunks.append(nxt) - if timeout: - cur_timeout = timeout - (time.time() - start) - if cur_timeout <= 0.0: - raise socket.timeout() - self.sock.settimeout(cur_timeout) - nxt = self.sock.recv(self._recvsize) - else: - msg = ('connection closed after reading %s of %s requested' - ' bytes' % (total_bytes, size)) - raise ConnectionClosed(msg) # check recv buffer - except socket.timeout: - self.rbuf = b''.join(chunks) - msg = 'read %s of %s bytes' % (total_bytes, size) - raise Timeout(timeout, msg) # check recv buffer - except Exception: - # received data is still buffered in the case of errors - self.rbuf = b''.join(chunks) - raise - extra_bytes = total_bytes - size - if extra_bytes: - last, self.rbuf = nxt[:-extra_bytes], nxt[-extra_bytes:] - else: - last, self.rbuf = nxt, b'' - chunks.append(last) - return b''.join(chunks) - - def send(self, data, flags=0, timeout=_UNSET): - """Send the contents of the internal send buffer, as well as *data*, - to the receiving end of the connection. Returns the total - number of bytes sent. If no exception is raised, all of *data* was - sent and the internal send buffer is empty. - - Args: - data (bytes): The bytes to send. - flags (int): Kept for API compatibility with sockets. Only - the default 0 is valid. - timeout (float): The timeout for this operation. Can be 0 for - nonblocking and None for no timeout. Defaults to the value - set in the constructor of BufferedSocket. - - Will raise :exc:`Timeout` if the send operation fails to - complete before *timeout*. In the event of an exception, use - :meth:`BufferedSocket.getsendbuffer` to see which data was - unsent. - """ - with self._send_lock: - if timeout is _UNSET: - timeout = self.timeout - if flags: - raise ValueError("non-zero flags not supported") - sbuf = self.sbuf - sbuf.append(data) - if len(sbuf) > 1: - sbuf[:] = [b''.join([s for s in sbuf if s])] - self.sock.settimeout(timeout) - start, total_sent = time.time(), 0 - try: - while sbuf[0]: - sent = self.sock.send(sbuf[0]) - total_sent += sent - sbuf[0] = sbuf[0][sent:] - if timeout: - cur_timeout = timeout - (time.time() - start) - if cur_timeout <= 0.0: - raise socket.timeout() - self.sock.settimeout(cur_timeout) - except socket.timeout: - raise Timeout(timeout, '%s bytes unsent' % len(sbuf[0])) - return total_sent - - def sendall(self, data, flags=0, timeout=_UNSET): - """A passthrough to :meth:`~BufferedSocket.send`, retained for - parallelism to the :class:`socket.socket` API. - """ - return self.send(data, flags, timeout) - - def flush(self): - "Send the contents of the internal send buffer." - with self._send_lock: - self.send(b'') - return - - def buffer(self, data): - "Buffer *data* bytes for the next send operation." - with self._send_lock: - self.sbuf.append(data) - return - - # # # - # # # Passing through some socket basics - # # # - - def getsockname(self): - """Convenience function to return the wrapped socket's own address. - See :meth:`socket.getsockname` for more details. - """ - return self.sock.getsockname() - - def getpeername(self): - """Convenience function to return the remote address to which the - wrapped socket is connected. See :meth:`socket.getpeername` - for more details. - """ - return self.sock.getpeername() - - def getsockopt(self, level, optname, buflen=None): - """Convenience function passing through to the wrapped socket's - :meth:`socket.getsockopt`. - """ - args = (level, optname) - if buflen is not None: - args += (buflen,) - return self.sock.getsockopt(*args) - - def setsockopt(self, level, optname, value): - """Convenience function passing through to the wrapped socket's - :meth:`socket.setsockopt`. - """ - return self.sock.setsockopt(level, optname, value) - - @property - def type(self): - """A passthrough to the wrapped socket's type. Valid usages should - only ever see :data:`socket.SOCK_STREAM`. - """ - return self.sock.type - - @property - def family(self): - """A passthrough to the wrapped socket's family. BufferedSocket - supports all widely-used families, so this read-only attribute - can be one of :data:`socket.AF_INET` for IP, - :data:`socket.AF_INET6` for IPv6, and :data:`socket.AF_UNIX` - for UDS. - """ - return self.sock.family - - @property - def proto(self): - """A passthrough to the wrapped socket's protocol. The ``proto`` - attribute is very rarely used, so it's always 0, meaning "the - default" protocol. Pretty much all the practical information - is in :attr:`~BufferedSocket.type` and - :attr:`~BufferedSocket.family`, so you can go back to never - thinking about this. - """ - return self.sock.proto - - # # # - # # # Now for some more advanced interpretations of the builtin socket - # # # - - def fileno(self): - """Returns the file descriptor of the wrapped socket. -1 if it has - been closed on this end. - - Note that this makes the BufferedSocket selectable, i.e., - usable for operating system event loops without any external - libraries. Keep in mind that the operating system cannot know - about data in BufferedSocket's internal buffer. Exercise - discipline with calling ``recv*`` functions. - """ - return self.sock.fileno() - - def close(self): - """Closes the wrapped socket, and empties the internal buffers. The - send buffer is not flushed automatically, so if you have been - calling :meth:`~BufferedSocket.buffer`, be sure to call - :meth:`~BufferedSocket.flush` before calling this - method. After calling this method, future socket operations - will raise :exc:`socket.error`. - """ - with self._recv_lock: - with self._send_lock: - self.rbuf = b'' - self.rbuf_unconsumed = self.rbuf - self.sbuf[:] = [] - self.sock.close() - return - - def shutdown(self, how): - """Convenience method which passes through to the wrapped socket's - :meth:`~socket.shutdown`. Semantics vary by platform, so no - special internal handling is done with the buffers. This - method exists to facilitate the most common usage, wherein a - full ``shutdown`` is followed by a - :meth:`~BufferedSocket.close`. Developers requiring more - support, please open `an issue`_. - - .. _an issue: https://github.com/mahmoud/boltons/issues - """ - with self._recv_lock: - with self._send_lock: - self.sock.shutdown(how) - return - - # end BufferedSocket - - -class Error(socket.error): - """A subclass of :exc:`socket.error` from which all other - ``socketutils`` exceptions inherit. - - When using :class:`BufferedSocket` and other ``socketutils`` - types, generally you want to catch one of the specific exception - types below, or :exc:`socket.error`. - """ - pass - - -class ConnectionClosed(Error): - """Raised when receiving and the connection is unexpectedly closed - from the sending end. Raised from :class:`BufferedSocket`'s - :meth:`~BufferedSocket.peek`, :meth:`~BufferedSocket.recv_until`, - and :meth:`~BufferedSocket.recv_size`, and never from its - :meth:`~BufferedSocket.recv` or - :meth:`~BufferedSocket.recv_close`. - """ - pass - - -class MessageTooLong(Error): - """Raised from :meth:`BufferedSocket.recv_until` and - :meth:`BufferedSocket.recv_closed` when more than *maxsize* bytes are - read without encountering the delimiter or a closed connection, - respectively. - """ - def __init__(self, bytes_read=None, delimiter=None): - msg = 'message exceeded maximum size' - if bytes_read is not None: - msg += '. %s bytes read' % (bytes_read,) - if delimiter is not None: - msg += '. Delimiter not found: %r' % (delimiter,) - super(MessageTooLong, self).__init__(msg) - - -class Timeout(socket.timeout, Error): - """Inheriting from :exc:`socket.timeout`, Timeout is used to indicate - when a socket operation did not complete within the time - specified. Raised from any of :class:`BufferedSocket`'s ``recv`` - methods. - """ - def __init__(self, timeout, extra=""): - msg = 'socket operation timed out' - if timeout is not None: - msg += ' after %sms.' % (timeout * 1000) - if extra: - msg += ' ' + extra - super(Timeout, self).__init__(msg) - - -class NetstringSocket(object): - """ - Reads and writes using the netstring protocol. - - More info: https://en.wikipedia.org/wiki/Netstring - Even more info: http://cr.yp.to/proto/netstrings.txt - """ - def __init__(self, sock, timeout=DEFAULT_TIMEOUT, maxsize=DEFAULT_MAXSIZE): - self.bsock = BufferedSocket(sock) - self.timeout = timeout - self.maxsize = maxsize - self._msgsize_maxsize = len(str(maxsize)) + 1 # len(str()) == log10 - - def fileno(self): - return self.bsock.fileno() - - def settimeout(self, timeout): - self.timeout = timeout - - def setmaxsize(self, maxsize): - self.maxsize = maxsize - self._msgsize_maxsize = self._calc_msgsize_maxsize(maxsize) - - def _calc_msgsize_maxsize(self, maxsize): - return len(str(maxsize)) + 1 # len(str()) == log10 - - def read_ns(self, timeout=_UNSET, maxsize=_UNSET): - if timeout is _UNSET: - timeout = self.timeout - - if maxsize is _UNSET: - maxsize = self.maxsize - msgsize_maxsize = self._msgsize_maxsize - else: - msgsize_maxsize = self._calc_msgsize_maxsize(maxsize) - - size_prefix = self.bsock.recv_until(b':', - timeout=timeout, - maxsize=msgsize_maxsize) - try: - size = int(size_prefix) - except ValueError: - raise NetstringInvalidSize('netstring message size must be valid' - ' integer, not %r' % size_prefix) - - if size > maxsize: - raise NetstringMessageTooLong(size, maxsize) - payload = self.bsock.recv_size(size) - if self.bsock.recv(1) != b',': - raise NetstringProtocolError("expected trailing ',' after message") - - return payload - - def write_ns(self, payload): - size = len(payload) - if size > self.maxsize: - raise NetstringMessageTooLong(size, self.maxsize) - data = str(size).encode('ascii') + b':' + payload + b',' - self.bsock.send(data) - - -class NetstringProtocolError(Error): - "Base class for all of socketutils' Netstring exception types." - pass - - -class NetstringInvalidSize(NetstringProtocolError): - """NetstringInvalidSize is raised when the ``:``-delimited size prefix - of the message does not contain a valid integer. - - Message showing valid size:: - - 5:hello, - - Here the ``5`` is the size. Anything in this prefix position that - is not parsable as a Python integer (i.e., :class:`int`) will raise - this exception. - """ - def __init__(self, msg): - super(NetstringInvalidSize, self).__init__(msg) - - -class NetstringMessageTooLong(NetstringProtocolError): - """NetstringMessageTooLong is raised when the size prefix contains a - valid integer, but that integer is larger than the - :class:`NetstringSocket`'s configured *maxsize*. - - When this exception is raised, it's recommended to simply close - the connection instead of trying to recover. - """ - def __init__(self, size, maxsize): - msg = ('netstring message length exceeds configured maxsize: %s > %s' - % (size, maxsize)) - super(NetstringMessageTooLong, self).__init__(msg) - - -""" -attrs worth adding/passing through: - - -properties: type, proto - -For its main functionality, BufferedSocket can wrap any object that -has the following methods: - - - gettimeout() - - settimeout() - - recv(size) - - send(data) - -The following methods are passed through: - -... - -""" - -# TODO: buffered socket check socket.type == SOCK_STREAM? -# TODO: make recv_until support taking a regex -# TODO: including the delimiter in the recv_until return is not -# necessary, as ConnectionClosed differentiates empty messages -# from socket closes.