Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/boltons/socketutils.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
comparison
equal
deleted
inserted
replaced
| 4:79f47841a781 | 5:9b1c78e6ba9c |
|---|---|
| 1 # -*- coding: utf-8 -*- | |
| 2 """At its heart, Python can be viewed as an extension of the C | |
| 3 programming language. Springing from the most popular systems | |
| 4 programming language has made Python itself a great language for | |
| 5 systems programming. One key to success in this domain is Python's | |
| 6 very serviceable :mod:`socket` module and its :class:`socket.socket` | |
| 7 type. | |
| 8 | |
| 9 The ``socketutils`` module provides natural next steps to the ``socket`` | |
| 10 builtin: straightforward, tested building blocks for higher-level | |
| 11 protocols. | |
| 12 | |
| 13 The :class:`BufferedSocket` wraps an ordinary socket, providing a | |
| 14 layer of intuitive buffering for both sending and receiving. This | |
| 15 facilitates parsing messages from streams, i.e., all sockets with type | |
| 16 ``SOCK_STREAM``. The BufferedSocket enables receiving until the next | |
| 17 relevant token, up to a certain size, or until the connection is | |
| 18 closed. For all of these, it provides consistent APIs to size | |
| 19 limiting, as well as timeouts that are compatible with multiple | |
| 20 concurrency paradigms. Use it to parse the next one-off text or binary | |
| 21 socket protocol you encounter. | |
| 22 | |
| 23 This module also provides the :class:`NetstringSocket`, a pure-Python | |
| 24 implementation of `the Netstring protocol`_, built on top of the | |
| 25 :class:`BufferedSocket`, serving as a ready-made, production-grade example. | |
| 26 | |
| 27 Special thanks to `Kurt Rose`_ for his original authorship and all his | |
| 28 contributions on this module. Also thanks to `Daniel J. Bernstein`_, the | |
| 29 original author of `Netstring`_. | |
| 30 | |
| 31 .. _the Netstring protocol: https://en.wikipedia.org/wiki/Netstring | |
| 32 .. _Kurt Rose: https://github.com/doublereedkurt | |
| 33 .. _Daniel J. Bernstein: https://cr.yp.to/ | |
| 34 .. _Netstring: https://cr.yp.to/proto/netstrings.txt | |
| 35 | |
| 36 """ | |
| 37 | |
| 38 import time | |
| 39 import socket | |
| 40 | |
| 41 try: | |
| 42 from threading import RLock | |
| 43 except Exception: | |
| 44 class RLock(object): | |
| 45 'Dummy reentrant lock for builds without threads' | |
| 46 def __enter__(self): | |
| 47 pass | |
| 48 | |
| 49 def __exit__(self, exctype, excinst, exctb): | |
| 50 pass | |
| 51 | |
| 52 | |
| 53 try: | |
| 54 from typeutils import make_sentinel | |
| 55 _UNSET = make_sentinel(var_name='_UNSET') | |
| 56 except ImportError: | |
| 57 _UNSET = object() | |
| 58 | |
| 59 | |
| 60 DEFAULT_TIMEOUT = 10 # 10 seconds | |
| 61 DEFAULT_MAXSIZE = 32 * 1024 # 32kb | |
| 62 _RECV_LARGE_MAXSIZE = 1024 ** 5 # 1PB | |
| 63 | |
| 64 | |
| 65 class BufferedSocket(object): | |
| 66 """Mainly provides recv_until and recv_size. recv, send, sendall, and | |
| 67 peek all function as similarly as possible to the built-in socket | |
| 68 API. | |
| 69 | |
| 70 This type has been tested against both the built-in socket type as | |
| 71 well as those from gevent and eventlet. It also features support | |
| 72 for sockets with timeouts set to 0 (aka nonblocking), provided the | |
| 73 caller is prepared to handle the EWOULDBLOCK exceptions. | |
| 74 | |
| 75 Args: | |
| 76 sock (socket): The connected socket to be wrapped. | |
| 77 timeout (float): The default timeout for sends and recvs, in | |
| 78 seconds. Set to ``None`` for no timeout, and 0 for | |
| 79 nonblocking. Defaults to *sock*'s own timeout if already set, | |
| 80 and 10 seconds otherwise. | |
| 81 maxsize (int): The default maximum number of bytes to be received | |
| 82 into the buffer before it is considered full and raises an | |
| 83 exception. Defaults to 32 kilobytes. | |
| 84 recvsize (int): The number of bytes to recv for every | |
| 85 lower-level :meth:`socket.recv` call. Defaults to *maxsize*. | |
| 86 | |
| 87 *timeout* and *maxsize* can both be overridden on individual socket | |
| 88 operations. | |
| 89 | |
| 90 All ``recv`` methods return bytestrings (:class:`bytes`) and can | |
| 91 raise :exc:`socket.error`. :exc:`Timeout`, | |
| 92 :exc:`ConnectionClosed`, and :exc:`MessageTooLong` all inherit | |
| 93 from :exc:`socket.error` and exist to provide better error | |
| 94 messages. Received bytes are always buffered, even if an exception | |
| 95 is raised. Use :meth:`BufferedSocket.getrecvbuffer` to retrieve | |
| 96 partial recvs. | |
| 97 | |
| 98 BufferedSocket does not replace the built-in socket by any | |
| 99 means. While the overlapping parts of the API are kept parallel to | |
| 100 the built-in :class:`socket.socket`, BufferedSocket does not | |
| 101 inherit from socket, and most socket functionality is only | |
| 102 available on the underlying socket. :meth:`socket.getpeername`, | |
| 103 :meth:`socket.getsockname`, :meth:`socket.fileno`, and others are | |
| 104 only available on the underlying socket that is wrapped. Use the | |
| 105 ``BufferedSocket.sock`` attribute to access it. See the examples | |
| 106 for more information on how to use BufferedSockets with built-in | |
| 107 sockets. | |
| 108 | |
| 109 The BufferedSocket is threadsafe, but consider the semantics of | |
| 110 your protocol before accessing a single socket from multiple | |
| 111 threads. Similarly, once the BufferedSocket is constructed, avoid | |
| 112 using the underlying socket directly. Only use it for operations | |
| 113 unrelated to messages, e.g., :meth:`socket.getpeername`. | |
| 114 | |
| 115 """ | |
| 116 def __init__(self, sock, timeout=_UNSET, | |
| 117 maxsize=DEFAULT_MAXSIZE, recvsize=_UNSET): | |
| 118 self.sock = sock | |
| 119 self.rbuf = b'' | |
| 120 self.sbuf = [] | |
| 121 self.maxsize = int(maxsize) | |
| 122 | |
| 123 if timeout is _UNSET: | |
| 124 if self.sock.gettimeout() is None: | |
| 125 self.timeout = DEFAULT_TIMEOUT | |
| 126 else: | |
| 127 self.timeout = self.sock.gettimeout() | |
| 128 else: | |
| 129 if timeout is None: | |
| 130 self.timeout = timeout | |
| 131 else: | |
| 132 self.timeout = float(timeout) | |
| 133 | |
| 134 if recvsize is _UNSET: | |
| 135 self._recvsize = self.maxsize | |
| 136 else: | |
| 137 self._recvsize = int(recvsize) | |
| 138 | |
| 139 self._send_lock = RLock() | |
| 140 self._recv_lock = RLock() | |
| 141 | |
| 142 def settimeout(self, timeout): | |
| 143 "Set the default *timeout* for future operations, in seconds." | |
| 144 self.timeout = timeout | |
| 145 | |
| 146 def gettimeout(self): | |
| 147 return self.timeout | |
| 148 | |
| 149 def setblocking(self, blocking): | |
| 150 self.timeout = None if blocking else 0.0 | |
| 151 | |
| 152 def setmaxsize(self, maxsize): | |
| 153 """Set the default maximum buffer size *maxsize* for future | |
| 154 operations, in bytes. Does not truncate the current buffer. | |
| 155 """ | |
| 156 self.maxsize = maxsize | |
| 157 | |
| 158 def getrecvbuffer(self): | |
| 159 "Returns the receive buffer bytestring (rbuf)." | |
| 160 with self._recv_lock: | |
| 161 return self.rbuf | |
| 162 | |
| 163 def getsendbuffer(self): | |
| 164 "Returns a copy of the send buffer list." | |
| 165 with self._send_lock: | |
| 166 return b''.join(self.sbuf) | |
| 167 | |
| 168 def recv(self, size, flags=0, timeout=_UNSET): | |
| 169 """Returns **up to** *size* bytes, using the internal buffer before | |
| 170 performing a single :meth:`socket.recv` operation. | |
| 171 | |
| 172 Args: | |
| 173 size (int): The maximum number of bytes to receive. | |
| 174 flags (int): Kept for API compatibility with sockets. Only | |
| 175 the default, ``0``, is valid. | |
| 176 timeout (float): The timeout for this operation. Can be | |
| 177 ``0`` for nonblocking and ``None`` for no | |
| 178 timeout. Defaults to the value set in the constructor | |
| 179 of BufferedSocket. | |
| 180 | |
| 181 If the operation does not complete in *timeout* seconds, a | |
| 182 :exc:`Timeout` is raised. Much like the built-in | |
| 183 :class:`socket.socket`, if this method returns an empty string, | |
| 184 then the socket is closed and recv buffer is empty. Further | |
| 185 calls to recv will raise :exc:`socket.error`. | |
| 186 | |
| 187 """ | |
| 188 with self._recv_lock: | |
| 189 if timeout is _UNSET: | |
| 190 timeout = self.timeout | |
| 191 if flags: | |
| 192 raise ValueError("non-zero flags not supported: %r" % flags) | |
| 193 if len(self.rbuf) >= size: | |
| 194 data, self.rbuf = self.rbuf[:size], self.rbuf[size:] | |
| 195 return data | |
| 196 if self.rbuf: | |
| 197 ret, self.rbuf = self.rbuf, b'' | |
| 198 return ret | |
| 199 self.sock.settimeout(timeout) | |
| 200 try: | |
| 201 data = self.sock.recv(self._recvsize) | |
| 202 except socket.timeout: | |
| 203 raise Timeout(timeout) # check the rbuf attr for more | |
| 204 if len(data) > size: | |
| 205 data, self.rbuf = data[:size], data[size:] | |
| 206 return data | |
| 207 | |
| 208 def peek(self, size, timeout=_UNSET): | |
| 209 """Returns *size* bytes from the socket and/or internal buffer. Bytes | |
| 210 are retained in BufferedSocket's internal recv buffer. To only | |
| 211 see bytes in the recv buffer, use :meth:`getrecvbuffer`. | |
| 212 | |
| 213 Args: | |
| 214 size (int): The exact number of bytes to peek at | |
| 215 timeout (float): The timeout for this operation. Can be 0 for | |
| 216 nonblocking and None for no timeout. Defaults to the value | |
| 217 set in the constructor of BufferedSocket. | |
| 218 | |
| 219 If the appropriate number of bytes cannot be fetched from the | |
| 220 buffer and socket before *timeout* expires, then a | |
| 221 :exc:`Timeout` will be raised. If the connection is closed, a | |
| 222 :exc:`ConnectionClosed` will be raised. | |
| 223 """ | |
| 224 with self._recv_lock: | |
| 225 if len(self.rbuf) >= size: | |
| 226 return self.rbuf[:size] | |
| 227 data = self.recv_size(size, timeout=timeout) | |
| 228 self.rbuf = data + self.rbuf | |
| 229 return data | |
| 230 | |
| 231 def recv_close(self, timeout=_UNSET, maxsize=_UNSET): | |
| 232 """Receive until the connection is closed, up to *maxsize* bytes. If | |
| 233 more than *maxsize* bytes are received, raises :exc:`MessageTooLong`. | |
| 234 """ | |
| 235 # recv_close works by using recv_size to request maxsize data, | |
| 236 # and ignoring ConnectionClose, returning and clearing the | |
| 237 # internal buffer instead. It raises an exception if | |
| 238 # ConnectionClosed isn't raised. | |
| 239 with self._recv_lock: | |
| 240 if maxsize is _UNSET: | |
| 241 maxsize = self.maxsize | |
| 242 if maxsize is None: | |
| 243 maxsize = _RECV_LARGE_MAXSIZE | |
| 244 try: | |
| 245 recvd = self.recv_size(maxsize + 1, timeout) | |
| 246 except ConnectionClosed: | |
| 247 ret, self.rbuf = self.rbuf, b'' | |
| 248 else: | |
| 249 # put extra received bytes (now in rbuf) after recvd | |
| 250 self.rbuf = recvd + self.rbuf | |
| 251 size_read = min(maxsize, len(self.rbuf)) | |
| 252 raise MessageTooLong(size_read) # check receive buffer | |
| 253 return ret | |
| 254 | |
| 255 def recv_until(self, delimiter, timeout=_UNSET, maxsize=_UNSET, | |
| 256 with_delimiter=False): | |
| 257 """Receive until *delimiter* is found, *maxsize* bytes have been read, | |
| 258 or *timeout* is exceeded. | |
| 259 | |
| 260 Args: | |
| 261 delimiter (bytes): One or more bytes to be searched for | |
| 262 in the socket stream. | |
| 263 timeout (float): The timeout for this operation. Can be 0 for | |
| 264 nonblocking and None for no timeout. Defaults to the value | |
| 265 set in the constructor of BufferedSocket. | |
| 266 maxsize (int): The maximum size for the internal buffer. | |
| 267 Defaults to the value set in the constructor. | |
| 268 with_delimiter (bool): Whether or not to include the | |
| 269 delimiter in the output. ``False`` by default, but | |
| 270 ``True`` is useful in cases where one is simply | |
| 271 forwarding the messages. | |
| 272 | |
| 273 ``recv_until`` will raise the following exceptions: | |
| 274 | |
| 275 * :exc:`Timeout` if more than *timeout* seconds expire. | |
| 276 * :exc:`ConnectionClosed` if the underlying socket is closed | |
| 277 by the sending end. | |
| 278 * :exc:`MessageTooLong` if the delimiter is not found in the | |
| 279 first *maxsize* bytes. | |
| 280 * :exc:`socket.error` if operating in nonblocking mode | |
| 281 (*timeout* equal to 0), or if some unexpected socket error | |
| 282 occurs, such as operating on a closed socket. | |
| 283 | |
| 284 """ | |
| 285 with self._recv_lock: | |
| 286 if maxsize is _UNSET: | |
| 287 maxsize = self.maxsize | |
| 288 if maxsize is None: | |
| 289 maxsize = _RECV_LARGE_MAXSIZE | |
| 290 if timeout is _UNSET: | |
| 291 timeout = self.timeout | |
| 292 len_delimiter = len(delimiter) | |
| 293 | |
| 294 sock = self.sock | |
| 295 recvd = bytearray(self.rbuf) | |
| 296 start = time.time() | |
| 297 find_offset_start = 0 # becomes a negative index below | |
| 298 | |
| 299 if not timeout: # covers None (no timeout) and 0 (nonblocking) | |
| 300 sock.settimeout(timeout) | |
| 301 try: | |
| 302 while 1: | |
| 303 offset = recvd.find(delimiter, find_offset_start, maxsize) | |
| 304 if offset != -1: # str.find returns -1 when no match found | |
| 305 if with_delimiter: # include delimiter in return | |
| 306 offset += len_delimiter | |
| 307 rbuf_offset = offset | |
| 308 else: | |
| 309 rbuf_offset = offset + len_delimiter | |
| 310 break | |
| 311 elif len(recvd) > maxsize: | |
| 312 raise MessageTooLong(maxsize, delimiter) # see rbuf | |
| 313 if timeout: | |
| 314 cur_timeout = timeout - (time.time() - start) | |
| 315 if cur_timeout <= 0.0: | |
| 316 raise socket.timeout() | |
| 317 sock.settimeout(cur_timeout) | |
| 318 nxt = sock.recv(self._recvsize) | |
| 319 if not nxt: | |
| 320 args = (len(recvd), delimiter) | |
| 321 msg = ('connection closed after reading %s bytes' | |
| 322 ' without finding symbol: %r' % args) | |
| 323 raise ConnectionClosed(msg) # check the recv buffer | |
| 324 recvd.extend(nxt) | |
| 325 find_offset_start = -len(nxt) - len_delimiter + 1 | |
| 326 except socket.timeout: | |
| 327 self.rbuf = bytes(recvd) | |
| 328 msg = ('read %s bytes without finding delimiter: %r' | |
| 329 % (len(recvd), delimiter)) | |
| 330 raise Timeout(timeout, msg) # check the recv buffer | |
| 331 except Exception: | |
| 332 self.rbuf = bytes(recvd) | |
| 333 raise | |
| 334 val, self.rbuf = bytes(recvd[:offset]), bytes(recvd[rbuf_offset:]) | |
| 335 return val | |
| 336 | |
| 337 def recv_size(self, size, timeout=_UNSET): | |
| 338 """Read off of the internal buffer, then off the socket, until | |
| 339 *size* bytes have been read. | |
| 340 | |
| 341 Args: | |
| 342 size (int): number of bytes to read before returning. | |
| 343 timeout (float): The timeout for this operation. Can be 0 for | |
| 344 nonblocking and None for no timeout. Defaults to the value | |
| 345 set in the constructor of BufferedSocket. | |
| 346 | |
| 347 If the appropriate number of bytes cannot be fetched from the | |
| 348 buffer and socket before *timeout* expires, then a | |
| 349 :exc:`Timeout` will be raised. If the connection is closed, a | |
| 350 :exc:`ConnectionClosed` will be raised. | |
| 351 """ | |
| 352 with self._recv_lock: | |
| 353 if timeout is _UNSET: | |
| 354 timeout = self.timeout | |
| 355 chunks = [] | |
| 356 total_bytes = 0 | |
| 357 try: | |
| 358 start = time.time() | |
| 359 self.sock.settimeout(timeout) | |
| 360 nxt = self.rbuf or self.sock.recv(self._recvsize) | |
| 361 while nxt: | |
| 362 total_bytes += len(nxt) | |
| 363 if total_bytes >= size: | |
| 364 break | |
| 365 chunks.append(nxt) | |
| 366 if timeout: | |
| 367 cur_timeout = timeout - (time.time() - start) | |
| 368 if cur_timeout <= 0.0: | |
| 369 raise socket.timeout() | |
| 370 self.sock.settimeout(cur_timeout) | |
| 371 nxt = self.sock.recv(self._recvsize) | |
| 372 else: | |
| 373 msg = ('connection closed after reading %s of %s requested' | |
| 374 ' bytes' % (total_bytes, size)) | |
| 375 raise ConnectionClosed(msg) # check recv buffer | |
| 376 except socket.timeout: | |
| 377 self.rbuf = b''.join(chunks) | |
| 378 msg = 'read %s of %s bytes' % (total_bytes, size) | |
| 379 raise Timeout(timeout, msg) # check recv buffer | |
| 380 except Exception: | |
| 381 # received data is still buffered in the case of errors | |
| 382 self.rbuf = b''.join(chunks) | |
| 383 raise | |
| 384 extra_bytes = total_bytes - size | |
| 385 if extra_bytes: | |
| 386 last, self.rbuf = nxt[:-extra_bytes], nxt[-extra_bytes:] | |
| 387 else: | |
| 388 last, self.rbuf = nxt, b'' | |
| 389 chunks.append(last) | |
| 390 return b''.join(chunks) | |
| 391 | |
| 392 def send(self, data, flags=0, timeout=_UNSET): | |
| 393 """Send the contents of the internal send buffer, as well as *data*, | |
| 394 to the receiving end of the connection. Returns the total | |
| 395 number of bytes sent. If no exception is raised, all of *data* was | |
| 396 sent and the internal send buffer is empty. | |
| 397 | |
| 398 Args: | |
| 399 data (bytes): The bytes to send. | |
| 400 flags (int): Kept for API compatibility with sockets. Only | |
| 401 the default 0 is valid. | |
| 402 timeout (float): The timeout for this operation. Can be 0 for | |
| 403 nonblocking and None for no timeout. Defaults to the value | |
| 404 set in the constructor of BufferedSocket. | |
| 405 | |
| 406 Will raise :exc:`Timeout` if the send operation fails to | |
| 407 complete before *timeout*. In the event of an exception, use | |
| 408 :meth:`BufferedSocket.getsendbuffer` to see which data was | |
| 409 unsent. | |
| 410 """ | |
| 411 with self._send_lock: | |
| 412 if timeout is _UNSET: | |
| 413 timeout = self.timeout | |
| 414 if flags: | |
| 415 raise ValueError("non-zero flags not supported") | |
| 416 sbuf = self.sbuf | |
| 417 sbuf.append(data) | |
| 418 if len(sbuf) > 1: | |
| 419 sbuf[:] = [b''.join([s for s in sbuf if s])] | |
| 420 self.sock.settimeout(timeout) | |
| 421 start, total_sent = time.time(), 0 | |
| 422 try: | |
| 423 while sbuf[0]: | |
| 424 sent = self.sock.send(sbuf[0]) | |
| 425 total_sent += sent | |
| 426 sbuf[0] = sbuf[0][sent:] | |
| 427 if timeout: | |
| 428 cur_timeout = timeout - (time.time() - start) | |
| 429 if cur_timeout <= 0.0: | |
| 430 raise socket.timeout() | |
| 431 self.sock.settimeout(cur_timeout) | |
| 432 except socket.timeout: | |
| 433 raise Timeout(timeout, '%s bytes unsent' % len(sbuf[0])) | |
| 434 return total_sent | |
| 435 | |
| 436 def sendall(self, data, flags=0, timeout=_UNSET): | |
| 437 """A passthrough to :meth:`~BufferedSocket.send`, retained for | |
| 438 parallelism to the :class:`socket.socket` API. | |
| 439 """ | |
| 440 return self.send(data, flags, timeout) | |
| 441 | |
| 442 def flush(self): | |
| 443 "Send the contents of the internal send buffer." | |
| 444 with self._send_lock: | |
| 445 self.send(b'') | |
| 446 return | |
| 447 | |
| 448 def buffer(self, data): | |
| 449 "Buffer *data* bytes for the next send operation." | |
| 450 with self._send_lock: | |
| 451 self.sbuf.append(data) | |
| 452 return | |
| 453 | |
| 454 # # # | |
| 455 # # # Passing through some socket basics | |
| 456 # # # | |
| 457 | |
| 458 def getsockname(self): | |
| 459 """Convenience function to return the wrapped socket's own address. | |
| 460 See :meth:`socket.getsockname` for more details. | |
| 461 """ | |
| 462 return self.sock.getsockname() | |
| 463 | |
| 464 def getpeername(self): | |
| 465 """Convenience function to return the remote address to which the | |
| 466 wrapped socket is connected. See :meth:`socket.getpeername` | |
| 467 for more details. | |
| 468 """ | |
| 469 return self.sock.getpeername() | |
| 470 | |
| 471 def getsockopt(self, level, optname, buflen=None): | |
| 472 """Convenience function passing through to the wrapped socket's | |
| 473 :meth:`socket.getsockopt`. | |
| 474 """ | |
| 475 args = (level, optname) | |
| 476 if buflen is not None: | |
| 477 args += (buflen,) | |
| 478 return self.sock.getsockopt(*args) | |
| 479 | |
| 480 def setsockopt(self, level, optname, value): | |
| 481 """Convenience function passing through to the wrapped socket's | |
| 482 :meth:`socket.setsockopt`. | |
| 483 """ | |
| 484 return self.sock.setsockopt(level, optname, value) | |
| 485 | |
| 486 @property | |
| 487 def type(self): | |
| 488 """A passthrough to the wrapped socket's type. Valid usages should | |
| 489 only ever see :data:`socket.SOCK_STREAM`. | |
| 490 """ | |
| 491 return self.sock.type | |
| 492 | |
| 493 @property | |
| 494 def family(self): | |
| 495 """A passthrough to the wrapped socket's family. BufferedSocket | |
| 496 supports all widely-used families, so this read-only attribute | |
| 497 can be one of :data:`socket.AF_INET` for IP, | |
| 498 :data:`socket.AF_INET6` for IPv6, and :data:`socket.AF_UNIX` | |
| 499 for UDS. | |
| 500 """ | |
| 501 return self.sock.family | |
| 502 | |
| 503 @property | |
| 504 def proto(self): | |
| 505 """A passthrough to the wrapped socket's protocol. The ``proto`` | |
| 506 attribute is very rarely used, so it's always 0, meaning "the | |
| 507 default" protocol. Pretty much all the practical information | |
| 508 is in :attr:`~BufferedSocket.type` and | |
| 509 :attr:`~BufferedSocket.family`, so you can go back to never | |
| 510 thinking about this. | |
| 511 """ | |
| 512 return self.sock.proto | |
| 513 | |
| 514 # # # | |
| 515 # # # Now for some more advanced interpretations of the builtin socket | |
| 516 # # # | |
| 517 | |
| 518 def fileno(self): | |
| 519 """Returns the file descriptor of the wrapped socket. -1 if it has | |
| 520 been closed on this end. | |
| 521 | |
| 522 Note that this makes the BufferedSocket selectable, i.e., | |
| 523 usable for operating system event loops without any external | |
| 524 libraries. Keep in mind that the operating system cannot know | |
| 525 about data in BufferedSocket's internal buffer. Exercise | |
| 526 discipline with calling ``recv*`` functions. | |
| 527 """ | |
| 528 return self.sock.fileno() | |
| 529 | |
| 530 def close(self): | |
| 531 """Closes the wrapped socket, and empties the internal buffers. The | |
| 532 send buffer is not flushed automatically, so if you have been | |
| 533 calling :meth:`~BufferedSocket.buffer`, be sure to call | |
| 534 :meth:`~BufferedSocket.flush` before calling this | |
| 535 method. After calling this method, future socket operations | |
| 536 will raise :exc:`socket.error`. | |
| 537 """ | |
| 538 with self._recv_lock: | |
| 539 with self._send_lock: | |
| 540 self.rbuf = b'' | |
| 541 self.rbuf_unconsumed = self.rbuf | |
| 542 self.sbuf[:] = [] | |
| 543 self.sock.close() | |
| 544 return | |
| 545 | |
| 546 def shutdown(self, how): | |
| 547 """Convenience method which passes through to the wrapped socket's | |
| 548 :meth:`~socket.shutdown`. Semantics vary by platform, so no | |
| 549 special internal handling is done with the buffers. This | |
| 550 method exists to facilitate the most common usage, wherein a | |
| 551 full ``shutdown`` is followed by a | |
| 552 :meth:`~BufferedSocket.close`. Developers requiring more | |
| 553 support, please open `an issue`_. | |
| 554 | |
| 555 .. _an issue: https://github.com/mahmoud/boltons/issues | |
| 556 """ | |
| 557 with self._recv_lock: | |
| 558 with self._send_lock: | |
| 559 self.sock.shutdown(how) | |
| 560 return | |
| 561 | |
| 562 # end BufferedSocket | |
| 563 | |
| 564 | |
| 565 class Error(socket.error): | |
| 566 """A subclass of :exc:`socket.error` from which all other | |
| 567 ``socketutils`` exceptions inherit. | |
| 568 | |
| 569 When using :class:`BufferedSocket` and other ``socketutils`` | |
| 570 types, generally you want to catch one of the specific exception | |
| 571 types below, or :exc:`socket.error`. | |
| 572 """ | |
| 573 pass | |
| 574 | |
| 575 | |
| 576 class ConnectionClosed(Error): | |
| 577 """Raised when receiving and the connection is unexpectedly closed | |
| 578 from the sending end. Raised from :class:`BufferedSocket`'s | |
| 579 :meth:`~BufferedSocket.peek`, :meth:`~BufferedSocket.recv_until`, | |
| 580 and :meth:`~BufferedSocket.recv_size`, and never from its | |
| 581 :meth:`~BufferedSocket.recv` or | |
| 582 :meth:`~BufferedSocket.recv_close`. | |
| 583 """ | |
| 584 pass | |
| 585 | |
| 586 | |
| 587 class MessageTooLong(Error): | |
| 588 """Raised from :meth:`BufferedSocket.recv_until` and | |
| 589 :meth:`BufferedSocket.recv_closed` when more than *maxsize* bytes are | |
| 590 read without encountering the delimiter or a closed connection, | |
| 591 respectively. | |
| 592 """ | |
| 593 def __init__(self, bytes_read=None, delimiter=None): | |
| 594 msg = 'message exceeded maximum size' | |
| 595 if bytes_read is not None: | |
| 596 msg += '. %s bytes read' % (bytes_read,) | |
| 597 if delimiter is not None: | |
| 598 msg += '. Delimiter not found: %r' % (delimiter,) | |
| 599 super(MessageTooLong, self).__init__(msg) | |
| 600 | |
| 601 | |
| 602 class Timeout(socket.timeout, Error): | |
| 603 """Inheriting from :exc:`socket.timeout`, Timeout is used to indicate | |
| 604 when a socket operation did not complete within the time | |
| 605 specified. Raised from any of :class:`BufferedSocket`'s ``recv`` | |
| 606 methods. | |
| 607 """ | |
| 608 def __init__(self, timeout, extra=""): | |
| 609 msg = 'socket operation timed out' | |
| 610 if timeout is not None: | |
| 611 msg += ' after %sms.' % (timeout * 1000) | |
| 612 if extra: | |
| 613 msg += ' ' + extra | |
| 614 super(Timeout, self).__init__(msg) | |
| 615 | |
| 616 | |
| 617 class NetstringSocket(object): | |
| 618 """ | |
| 619 Reads and writes using the netstring protocol. | |
| 620 | |
| 621 More info: https://en.wikipedia.org/wiki/Netstring | |
| 622 Even more info: http://cr.yp.to/proto/netstrings.txt | |
| 623 """ | |
| 624 def __init__(self, sock, timeout=DEFAULT_TIMEOUT, maxsize=DEFAULT_MAXSIZE): | |
| 625 self.bsock = BufferedSocket(sock) | |
| 626 self.timeout = timeout | |
| 627 self.maxsize = maxsize | |
| 628 self._msgsize_maxsize = len(str(maxsize)) + 1 # len(str()) == log10 | |
| 629 | |
| 630 def fileno(self): | |
| 631 return self.bsock.fileno() | |
| 632 | |
| 633 def settimeout(self, timeout): | |
| 634 self.timeout = timeout | |
| 635 | |
| 636 def setmaxsize(self, maxsize): | |
| 637 self.maxsize = maxsize | |
| 638 self._msgsize_maxsize = self._calc_msgsize_maxsize(maxsize) | |
| 639 | |
| 640 def _calc_msgsize_maxsize(self, maxsize): | |
| 641 return len(str(maxsize)) + 1 # len(str()) == log10 | |
| 642 | |
| 643 def read_ns(self, timeout=_UNSET, maxsize=_UNSET): | |
| 644 if timeout is _UNSET: | |
| 645 timeout = self.timeout | |
| 646 | |
| 647 if maxsize is _UNSET: | |
| 648 maxsize = self.maxsize | |
| 649 msgsize_maxsize = self._msgsize_maxsize | |
| 650 else: | |
| 651 msgsize_maxsize = self._calc_msgsize_maxsize(maxsize) | |
| 652 | |
| 653 size_prefix = self.bsock.recv_until(b':', | |
| 654 timeout=timeout, | |
| 655 maxsize=msgsize_maxsize) | |
| 656 try: | |
| 657 size = int(size_prefix) | |
| 658 except ValueError: | |
| 659 raise NetstringInvalidSize('netstring message size must be valid' | |
| 660 ' integer, not %r' % size_prefix) | |
| 661 | |
| 662 if size > maxsize: | |
| 663 raise NetstringMessageTooLong(size, maxsize) | |
| 664 payload = self.bsock.recv_size(size) | |
| 665 if self.bsock.recv(1) != b',': | |
| 666 raise NetstringProtocolError("expected trailing ',' after message") | |
| 667 | |
| 668 return payload | |
| 669 | |
| 670 def write_ns(self, payload): | |
| 671 size = len(payload) | |
| 672 if size > self.maxsize: | |
| 673 raise NetstringMessageTooLong(size, self.maxsize) | |
| 674 data = str(size).encode('ascii') + b':' + payload + b',' | |
| 675 self.bsock.send(data) | |
| 676 | |
| 677 | |
| 678 class NetstringProtocolError(Error): | |
| 679 "Base class for all of socketutils' Netstring exception types." | |
| 680 pass | |
| 681 | |
| 682 | |
| 683 class NetstringInvalidSize(NetstringProtocolError): | |
| 684 """NetstringInvalidSize is raised when the ``:``-delimited size prefix | |
| 685 of the message does not contain a valid integer. | |
| 686 | |
| 687 Message showing valid size:: | |
| 688 | |
| 689 5:hello, | |
| 690 | |
| 691 Here the ``5`` is the size. Anything in this prefix position that | |
| 692 is not parsable as a Python integer (i.e., :class:`int`) will raise | |
| 693 this exception. | |
| 694 """ | |
| 695 def __init__(self, msg): | |
| 696 super(NetstringInvalidSize, self).__init__(msg) | |
| 697 | |
| 698 | |
| 699 class NetstringMessageTooLong(NetstringProtocolError): | |
| 700 """NetstringMessageTooLong is raised when the size prefix contains a | |
| 701 valid integer, but that integer is larger than the | |
| 702 :class:`NetstringSocket`'s configured *maxsize*. | |
| 703 | |
| 704 When this exception is raised, it's recommended to simply close | |
| 705 the connection instead of trying to recover. | |
| 706 """ | |
| 707 def __init__(self, size, maxsize): | |
| 708 msg = ('netstring message length exceeds configured maxsize: %s > %s' | |
| 709 % (size, maxsize)) | |
| 710 super(NetstringMessageTooLong, self).__init__(msg) | |
| 711 | |
| 712 | |
| 713 """ | |
| 714 attrs worth adding/passing through: | |
| 715 | |
| 716 | |
| 717 properties: type, proto | |
| 718 | |
| 719 For its main functionality, BufferedSocket can wrap any object that | |
| 720 has the following methods: | |
| 721 | |
| 722 - gettimeout() | |
| 723 - settimeout() | |
| 724 - recv(size) | |
| 725 - send(data) | |
| 726 | |
| 727 The following methods are passed through: | |
| 728 | |
| 729 ... | |
| 730 | |
| 731 """ | |
| 732 | |
| 733 # TODO: buffered socket check socket.type == SOCK_STREAM? | |
| 734 # TODO: make recv_until support taking a regex | |
| 735 # TODO: including the delimiter in the recv_until return is not | |
| 736 # necessary, as ConnectionClosed differentiates empty messages | |
| 737 # from socket closes. |
