Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/boltons/socketutils.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/boltons/socketutils.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,737 @@ +# -*- coding: utf-8 -*- +"""At its heart, Python can be viewed as an extension of the C +programming language. Springing from the most popular systems +programming language has made Python itself a great language for +systems programming. One key to success in this domain is Python's +very serviceable :mod:`socket` module and its :class:`socket.socket` +type. + +The ``socketutils`` module provides natural next steps to the ``socket`` +builtin: straightforward, tested building blocks for higher-level +protocols. + +The :class:`BufferedSocket` wraps an ordinary socket, providing a +layer of intuitive buffering for both sending and receiving. This +facilitates parsing messages from streams, i.e., all sockets with type +``SOCK_STREAM``. The BufferedSocket enables receiving until the next +relevant token, up to a certain size, or until the connection is +closed. For all of these, it provides consistent APIs to size +limiting, as well as timeouts that are compatible with multiple +concurrency paradigms. Use it to parse the next one-off text or binary +socket protocol you encounter. + +This module also provides the :class:`NetstringSocket`, a pure-Python +implementation of `the Netstring protocol`_, built on top of the +:class:`BufferedSocket`, serving as a ready-made, production-grade example. + +Special thanks to `Kurt Rose`_ for his original authorship and all his +contributions on this module. Also thanks to `Daniel J. Bernstein`_, the +original author of `Netstring`_. + +.. _the Netstring protocol: https://en.wikipedia.org/wiki/Netstring +.. _Kurt Rose: https://github.com/doublereedkurt +.. _Daniel J. Bernstein: https://cr.yp.to/ +.. _Netstring: https://cr.yp.to/proto/netstrings.txt + +""" + +import time +import socket + +try: + from threading import RLock +except Exception: + class RLock(object): + 'Dummy reentrant lock for builds without threads' + def __enter__(self): + pass + + def __exit__(self, exctype, excinst, exctb): + pass + + +try: + from typeutils import make_sentinel + _UNSET = make_sentinel(var_name='_UNSET') +except ImportError: + _UNSET = object() + + +DEFAULT_TIMEOUT = 10 # 10 seconds +DEFAULT_MAXSIZE = 32 * 1024 # 32kb +_RECV_LARGE_MAXSIZE = 1024 ** 5 # 1PB + + +class BufferedSocket(object): + """Mainly provides recv_until and recv_size. recv, send, sendall, and + peek all function as similarly as possible to the built-in socket + API. + + This type has been tested against both the built-in socket type as + well as those from gevent and eventlet. It also features support + for sockets with timeouts set to 0 (aka nonblocking), provided the + caller is prepared to handle the EWOULDBLOCK exceptions. + + Args: + sock (socket): The connected socket to be wrapped. + timeout (float): The default timeout for sends and recvs, in + seconds. Set to ``None`` for no timeout, and 0 for + nonblocking. Defaults to *sock*'s own timeout if already set, + and 10 seconds otherwise. + maxsize (int): The default maximum number of bytes to be received + into the buffer before it is considered full and raises an + exception. Defaults to 32 kilobytes. + recvsize (int): The number of bytes to recv for every + lower-level :meth:`socket.recv` call. Defaults to *maxsize*. + + *timeout* and *maxsize* can both be overridden on individual socket + operations. + + All ``recv`` methods return bytestrings (:class:`bytes`) and can + raise :exc:`socket.error`. :exc:`Timeout`, + :exc:`ConnectionClosed`, and :exc:`MessageTooLong` all inherit + from :exc:`socket.error` and exist to provide better error + messages. Received bytes are always buffered, even if an exception + is raised. Use :meth:`BufferedSocket.getrecvbuffer` to retrieve + partial recvs. + + BufferedSocket does not replace the built-in socket by any + means. While the overlapping parts of the API are kept parallel to + the built-in :class:`socket.socket`, BufferedSocket does not + inherit from socket, and most socket functionality is only + available on the underlying socket. :meth:`socket.getpeername`, + :meth:`socket.getsockname`, :meth:`socket.fileno`, and others are + only available on the underlying socket that is wrapped. Use the + ``BufferedSocket.sock`` attribute to access it. See the examples + for more information on how to use BufferedSockets with built-in + sockets. + + The BufferedSocket is threadsafe, but consider the semantics of + your protocol before accessing a single socket from multiple + threads. Similarly, once the BufferedSocket is constructed, avoid + using the underlying socket directly. Only use it for operations + unrelated to messages, e.g., :meth:`socket.getpeername`. + + """ + def __init__(self, sock, timeout=_UNSET, + maxsize=DEFAULT_MAXSIZE, recvsize=_UNSET): + self.sock = sock + self.rbuf = b'' + self.sbuf = [] + self.maxsize = int(maxsize) + + if timeout is _UNSET: + if self.sock.gettimeout() is None: + self.timeout = DEFAULT_TIMEOUT + else: + self.timeout = self.sock.gettimeout() + else: + if timeout is None: + self.timeout = timeout + else: + self.timeout = float(timeout) + + if recvsize is _UNSET: + self._recvsize = self.maxsize + else: + self._recvsize = int(recvsize) + + self._send_lock = RLock() + self._recv_lock = RLock() + + def settimeout(self, timeout): + "Set the default *timeout* for future operations, in seconds." + self.timeout = timeout + + def gettimeout(self): + return self.timeout + + def setblocking(self, blocking): + self.timeout = None if blocking else 0.0 + + def setmaxsize(self, maxsize): + """Set the default maximum buffer size *maxsize* for future + operations, in bytes. Does not truncate the current buffer. + """ + self.maxsize = maxsize + + def getrecvbuffer(self): + "Returns the receive buffer bytestring (rbuf)." + with self._recv_lock: + return self.rbuf + + def getsendbuffer(self): + "Returns a copy of the send buffer list." + with self._send_lock: + return b''.join(self.sbuf) + + def recv(self, size, flags=0, timeout=_UNSET): + """Returns **up to** *size* bytes, using the internal buffer before + performing a single :meth:`socket.recv` operation. + + Args: + size (int): The maximum number of bytes to receive. + flags (int): Kept for API compatibility with sockets. Only + the default, ``0``, is valid. + timeout (float): The timeout for this operation. Can be + ``0`` for nonblocking and ``None`` for no + timeout. Defaults to the value set in the constructor + of BufferedSocket. + + If the operation does not complete in *timeout* seconds, a + :exc:`Timeout` is raised. Much like the built-in + :class:`socket.socket`, if this method returns an empty string, + then the socket is closed and recv buffer is empty. Further + calls to recv will raise :exc:`socket.error`. + + """ + with self._recv_lock: + if timeout is _UNSET: + timeout = self.timeout + if flags: + raise ValueError("non-zero flags not supported: %r" % flags) + if len(self.rbuf) >= size: + data, self.rbuf = self.rbuf[:size], self.rbuf[size:] + return data + if self.rbuf: + ret, self.rbuf = self.rbuf, b'' + return ret + self.sock.settimeout(timeout) + try: + data = self.sock.recv(self._recvsize) + except socket.timeout: + raise Timeout(timeout) # check the rbuf attr for more + if len(data) > size: + data, self.rbuf = data[:size], data[size:] + return data + + def peek(self, size, timeout=_UNSET): + """Returns *size* bytes from the socket and/or internal buffer. Bytes + are retained in BufferedSocket's internal recv buffer. To only + see bytes in the recv buffer, use :meth:`getrecvbuffer`. + + Args: + size (int): The exact number of bytes to peek at + timeout (float): The timeout for this operation. Can be 0 for + nonblocking and None for no timeout. Defaults to the value + set in the constructor of BufferedSocket. + + If the appropriate number of bytes cannot be fetched from the + buffer and socket before *timeout* expires, then a + :exc:`Timeout` will be raised. If the connection is closed, a + :exc:`ConnectionClosed` will be raised. + """ + with self._recv_lock: + if len(self.rbuf) >= size: + return self.rbuf[:size] + data = self.recv_size(size, timeout=timeout) + self.rbuf = data + self.rbuf + return data + + def recv_close(self, timeout=_UNSET, maxsize=_UNSET): + """Receive until the connection is closed, up to *maxsize* bytes. If + more than *maxsize* bytes are received, raises :exc:`MessageTooLong`. + """ + # recv_close works by using recv_size to request maxsize data, + # and ignoring ConnectionClose, returning and clearing the + # internal buffer instead. It raises an exception if + # ConnectionClosed isn't raised. + with self._recv_lock: + if maxsize is _UNSET: + maxsize = self.maxsize + if maxsize is None: + maxsize = _RECV_LARGE_MAXSIZE + try: + recvd = self.recv_size(maxsize + 1, timeout) + except ConnectionClosed: + ret, self.rbuf = self.rbuf, b'' + else: + # put extra received bytes (now in rbuf) after recvd + self.rbuf = recvd + self.rbuf + size_read = min(maxsize, len(self.rbuf)) + raise MessageTooLong(size_read) # check receive buffer + return ret + + def recv_until(self, delimiter, timeout=_UNSET, maxsize=_UNSET, + with_delimiter=False): + """Receive until *delimiter* is found, *maxsize* bytes have been read, + or *timeout* is exceeded. + + Args: + delimiter (bytes): One or more bytes to be searched for + in the socket stream. + timeout (float): The timeout for this operation. Can be 0 for + nonblocking and None for no timeout. Defaults to the value + set in the constructor of BufferedSocket. + maxsize (int): The maximum size for the internal buffer. + Defaults to the value set in the constructor. + with_delimiter (bool): Whether or not to include the + delimiter in the output. ``False`` by default, but + ``True`` is useful in cases where one is simply + forwarding the messages. + + ``recv_until`` will raise the following exceptions: + + * :exc:`Timeout` if more than *timeout* seconds expire. + * :exc:`ConnectionClosed` if the underlying socket is closed + by the sending end. + * :exc:`MessageTooLong` if the delimiter is not found in the + first *maxsize* bytes. + * :exc:`socket.error` if operating in nonblocking mode + (*timeout* equal to 0), or if some unexpected socket error + occurs, such as operating on a closed socket. + + """ + with self._recv_lock: + if maxsize is _UNSET: + maxsize = self.maxsize + if maxsize is None: + maxsize = _RECV_LARGE_MAXSIZE + if timeout is _UNSET: + timeout = self.timeout + len_delimiter = len(delimiter) + + sock = self.sock + recvd = bytearray(self.rbuf) + start = time.time() + find_offset_start = 0 # becomes a negative index below + + if not timeout: # covers None (no timeout) and 0 (nonblocking) + sock.settimeout(timeout) + try: + while 1: + offset = recvd.find(delimiter, find_offset_start, maxsize) + if offset != -1: # str.find returns -1 when no match found + if with_delimiter: # include delimiter in return + offset += len_delimiter + rbuf_offset = offset + else: + rbuf_offset = offset + len_delimiter + break + elif len(recvd) > maxsize: + raise MessageTooLong(maxsize, delimiter) # see rbuf + if timeout: + cur_timeout = timeout - (time.time() - start) + if cur_timeout <= 0.0: + raise socket.timeout() + sock.settimeout(cur_timeout) + nxt = sock.recv(self._recvsize) + if not nxt: + args = (len(recvd), delimiter) + msg = ('connection closed after reading %s bytes' + ' without finding symbol: %r' % args) + raise ConnectionClosed(msg) # check the recv buffer + recvd.extend(nxt) + find_offset_start = -len(nxt) - len_delimiter + 1 + except socket.timeout: + self.rbuf = bytes(recvd) + msg = ('read %s bytes without finding delimiter: %r' + % (len(recvd), delimiter)) + raise Timeout(timeout, msg) # check the recv buffer + except Exception: + self.rbuf = bytes(recvd) + raise + val, self.rbuf = bytes(recvd[:offset]), bytes(recvd[rbuf_offset:]) + return val + + def recv_size(self, size, timeout=_UNSET): + """Read off of the internal buffer, then off the socket, until + *size* bytes have been read. + + Args: + size (int): number of bytes to read before returning. + timeout (float): The timeout for this operation. Can be 0 for + nonblocking and None for no timeout. Defaults to the value + set in the constructor of BufferedSocket. + + If the appropriate number of bytes cannot be fetched from the + buffer and socket before *timeout* expires, then a + :exc:`Timeout` will be raised. If the connection is closed, a + :exc:`ConnectionClosed` will be raised. + """ + with self._recv_lock: + if timeout is _UNSET: + timeout = self.timeout + chunks = [] + total_bytes = 0 + try: + start = time.time() + self.sock.settimeout(timeout) + nxt = self.rbuf or self.sock.recv(self._recvsize) + while nxt: + total_bytes += len(nxt) + if total_bytes >= size: + break + chunks.append(nxt) + if timeout: + cur_timeout = timeout - (time.time() - start) + if cur_timeout <= 0.0: + raise socket.timeout() + self.sock.settimeout(cur_timeout) + nxt = self.sock.recv(self._recvsize) + else: + msg = ('connection closed after reading %s of %s requested' + ' bytes' % (total_bytes, size)) + raise ConnectionClosed(msg) # check recv buffer + except socket.timeout: + self.rbuf = b''.join(chunks) + msg = 'read %s of %s bytes' % (total_bytes, size) + raise Timeout(timeout, msg) # check recv buffer + except Exception: + # received data is still buffered in the case of errors + self.rbuf = b''.join(chunks) + raise + extra_bytes = total_bytes - size + if extra_bytes: + last, self.rbuf = nxt[:-extra_bytes], nxt[-extra_bytes:] + else: + last, self.rbuf = nxt, b'' + chunks.append(last) + return b''.join(chunks) + + def send(self, data, flags=0, timeout=_UNSET): + """Send the contents of the internal send buffer, as well as *data*, + to the receiving end of the connection. Returns the total + number of bytes sent. If no exception is raised, all of *data* was + sent and the internal send buffer is empty. + + Args: + data (bytes): The bytes to send. + flags (int): Kept for API compatibility with sockets. Only + the default 0 is valid. + timeout (float): The timeout for this operation. Can be 0 for + nonblocking and None for no timeout. Defaults to the value + set in the constructor of BufferedSocket. + + Will raise :exc:`Timeout` if the send operation fails to + complete before *timeout*. In the event of an exception, use + :meth:`BufferedSocket.getsendbuffer` to see which data was + unsent. + """ + with self._send_lock: + if timeout is _UNSET: + timeout = self.timeout + if flags: + raise ValueError("non-zero flags not supported") + sbuf = self.sbuf + sbuf.append(data) + if len(sbuf) > 1: + sbuf[:] = [b''.join([s for s in sbuf if s])] + self.sock.settimeout(timeout) + start, total_sent = time.time(), 0 + try: + while sbuf[0]: + sent = self.sock.send(sbuf[0]) + total_sent += sent + sbuf[0] = sbuf[0][sent:] + if timeout: + cur_timeout = timeout - (time.time() - start) + if cur_timeout <= 0.0: + raise socket.timeout() + self.sock.settimeout(cur_timeout) + except socket.timeout: + raise Timeout(timeout, '%s bytes unsent' % len(sbuf[0])) + return total_sent + + def sendall(self, data, flags=0, timeout=_UNSET): + """A passthrough to :meth:`~BufferedSocket.send`, retained for + parallelism to the :class:`socket.socket` API. + """ + return self.send(data, flags, timeout) + + def flush(self): + "Send the contents of the internal send buffer." + with self._send_lock: + self.send(b'') + return + + def buffer(self, data): + "Buffer *data* bytes for the next send operation." + with self._send_lock: + self.sbuf.append(data) + return + + # # # + # # # Passing through some socket basics + # # # + + def getsockname(self): + """Convenience function to return the wrapped socket's own address. + See :meth:`socket.getsockname` for more details. + """ + return self.sock.getsockname() + + def getpeername(self): + """Convenience function to return the remote address to which the + wrapped socket is connected. See :meth:`socket.getpeername` + for more details. + """ + return self.sock.getpeername() + + def getsockopt(self, level, optname, buflen=None): + """Convenience function passing through to the wrapped socket's + :meth:`socket.getsockopt`. + """ + args = (level, optname) + if buflen is not None: + args += (buflen,) + return self.sock.getsockopt(*args) + + def setsockopt(self, level, optname, value): + """Convenience function passing through to the wrapped socket's + :meth:`socket.setsockopt`. + """ + return self.sock.setsockopt(level, optname, value) + + @property + def type(self): + """A passthrough to the wrapped socket's type. Valid usages should + only ever see :data:`socket.SOCK_STREAM`. + """ + return self.sock.type + + @property + def family(self): + """A passthrough to the wrapped socket's family. BufferedSocket + supports all widely-used families, so this read-only attribute + can be one of :data:`socket.AF_INET` for IP, + :data:`socket.AF_INET6` for IPv6, and :data:`socket.AF_UNIX` + for UDS. + """ + return self.sock.family + + @property + def proto(self): + """A passthrough to the wrapped socket's protocol. The ``proto`` + attribute is very rarely used, so it's always 0, meaning "the + default" protocol. Pretty much all the practical information + is in :attr:`~BufferedSocket.type` and + :attr:`~BufferedSocket.family`, so you can go back to never + thinking about this. + """ + return self.sock.proto + + # # # + # # # Now for some more advanced interpretations of the builtin socket + # # # + + def fileno(self): + """Returns the file descriptor of the wrapped socket. -1 if it has + been closed on this end. + + Note that this makes the BufferedSocket selectable, i.e., + usable for operating system event loops without any external + libraries. Keep in mind that the operating system cannot know + about data in BufferedSocket's internal buffer. Exercise + discipline with calling ``recv*`` functions. + """ + return self.sock.fileno() + + def close(self): + """Closes the wrapped socket, and empties the internal buffers. The + send buffer is not flushed automatically, so if you have been + calling :meth:`~BufferedSocket.buffer`, be sure to call + :meth:`~BufferedSocket.flush` before calling this + method. After calling this method, future socket operations + will raise :exc:`socket.error`. + """ + with self._recv_lock: + with self._send_lock: + self.rbuf = b'' + self.rbuf_unconsumed = self.rbuf + self.sbuf[:] = [] + self.sock.close() + return + + def shutdown(self, how): + """Convenience method which passes through to the wrapped socket's + :meth:`~socket.shutdown`. Semantics vary by platform, so no + special internal handling is done with the buffers. This + method exists to facilitate the most common usage, wherein a + full ``shutdown`` is followed by a + :meth:`~BufferedSocket.close`. Developers requiring more + support, please open `an issue`_. + + .. _an issue: https://github.com/mahmoud/boltons/issues + """ + with self._recv_lock: + with self._send_lock: + self.sock.shutdown(how) + return + + # end BufferedSocket + + +class Error(socket.error): + """A subclass of :exc:`socket.error` from which all other + ``socketutils`` exceptions inherit. + + When using :class:`BufferedSocket` and other ``socketutils`` + types, generally you want to catch one of the specific exception + types below, or :exc:`socket.error`. + """ + pass + + +class ConnectionClosed(Error): + """Raised when receiving and the connection is unexpectedly closed + from the sending end. Raised from :class:`BufferedSocket`'s + :meth:`~BufferedSocket.peek`, :meth:`~BufferedSocket.recv_until`, + and :meth:`~BufferedSocket.recv_size`, and never from its + :meth:`~BufferedSocket.recv` or + :meth:`~BufferedSocket.recv_close`. + """ + pass + + +class MessageTooLong(Error): + """Raised from :meth:`BufferedSocket.recv_until` and + :meth:`BufferedSocket.recv_closed` when more than *maxsize* bytes are + read without encountering the delimiter or a closed connection, + respectively. + """ + def __init__(self, bytes_read=None, delimiter=None): + msg = 'message exceeded maximum size' + if bytes_read is not None: + msg += '. %s bytes read' % (bytes_read,) + if delimiter is not None: + msg += '. Delimiter not found: %r' % (delimiter,) + super(MessageTooLong, self).__init__(msg) + + +class Timeout(socket.timeout, Error): + """Inheriting from :exc:`socket.timeout`, Timeout is used to indicate + when a socket operation did not complete within the time + specified. Raised from any of :class:`BufferedSocket`'s ``recv`` + methods. + """ + def __init__(self, timeout, extra=""): + msg = 'socket operation timed out' + if timeout is not None: + msg += ' after %sms.' % (timeout * 1000) + if extra: + msg += ' ' + extra + super(Timeout, self).__init__(msg) + + +class NetstringSocket(object): + """ + Reads and writes using the netstring protocol. + + More info: https://en.wikipedia.org/wiki/Netstring + Even more info: http://cr.yp.to/proto/netstrings.txt + """ + def __init__(self, sock, timeout=DEFAULT_TIMEOUT, maxsize=DEFAULT_MAXSIZE): + self.bsock = BufferedSocket(sock) + self.timeout = timeout + self.maxsize = maxsize + self._msgsize_maxsize = len(str(maxsize)) + 1 # len(str()) == log10 + + def fileno(self): + return self.bsock.fileno() + + def settimeout(self, timeout): + self.timeout = timeout + + def setmaxsize(self, maxsize): + self.maxsize = maxsize + self._msgsize_maxsize = self._calc_msgsize_maxsize(maxsize) + + def _calc_msgsize_maxsize(self, maxsize): + return len(str(maxsize)) + 1 # len(str()) == log10 + + def read_ns(self, timeout=_UNSET, maxsize=_UNSET): + if timeout is _UNSET: + timeout = self.timeout + + if maxsize is _UNSET: + maxsize = self.maxsize + msgsize_maxsize = self._msgsize_maxsize + else: + msgsize_maxsize = self._calc_msgsize_maxsize(maxsize) + + size_prefix = self.bsock.recv_until(b':', + timeout=timeout, + maxsize=msgsize_maxsize) + try: + size = int(size_prefix) + except ValueError: + raise NetstringInvalidSize('netstring message size must be valid' + ' integer, not %r' % size_prefix) + + if size > maxsize: + raise NetstringMessageTooLong(size, maxsize) + payload = self.bsock.recv_size(size) + if self.bsock.recv(1) != b',': + raise NetstringProtocolError("expected trailing ',' after message") + + return payload + + def write_ns(self, payload): + size = len(payload) + if size > self.maxsize: + raise NetstringMessageTooLong(size, self.maxsize) + data = str(size).encode('ascii') + b':' + payload + b',' + self.bsock.send(data) + + +class NetstringProtocolError(Error): + "Base class for all of socketutils' Netstring exception types." + pass + + +class NetstringInvalidSize(NetstringProtocolError): + """NetstringInvalidSize is raised when the ``:``-delimited size prefix + of the message does not contain a valid integer. + + Message showing valid size:: + + 5:hello, + + Here the ``5`` is the size. Anything in this prefix position that + is not parsable as a Python integer (i.e., :class:`int`) will raise + this exception. + """ + def __init__(self, msg): + super(NetstringInvalidSize, self).__init__(msg) + + +class NetstringMessageTooLong(NetstringProtocolError): + """NetstringMessageTooLong is raised when the size prefix contains a + valid integer, but that integer is larger than the + :class:`NetstringSocket`'s configured *maxsize*. + + When this exception is raised, it's recommended to simply close + the connection instead of trying to recover. + """ + def __init__(self, size, maxsize): + msg = ('netstring message length exceeds configured maxsize: %s > %s' + % (size, maxsize)) + super(NetstringMessageTooLong, self).__init__(msg) + + +""" +attrs worth adding/passing through: + + +properties: type, proto + +For its main functionality, BufferedSocket can wrap any object that +has the following methods: + + - gettimeout() + - settimeout() + - recv(size) + - send(data) + +The following methods are passed through: + +... + +""" + +# TODO: buffered socket check socket.type == SOCK_STREAM? +# TODO: make recv_until support taking a regex +# TODO: including the delimiter in the recv_until return is not +# necessary, as ConnectionClosed differentiates empty messages +# from socket closes.