Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/boltons/socketutils.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 # -*- coding: utf-8 -*- | |
2 """At its heart, Python can be viewed as an extension of the C | |
3 programming language. Springing from the most popular systems | |
4 programming language has made Python itself a great language for | |
5 systems programming. One key to success in this domain is Python's | |
6 very serviceable :mod:`socket` module and its :class:`socket.socket` | |
7 type. | |
8 | |
9 The ``socketutils`` module provides natural next steps to the ``socket`` | |
10 builtin: straightforward, tested building blocks for higher-level | |
11 protocols. | |
12 | |
13 The :class:`BufferedSocket` wraps an ordinary socket, providing a | |
14 layer of intuitive buffering for both sending and receiving. This | |
15 facilitates parsing messages from streams, i.e., all sockets with type | |
16 ``SOCK_STREAM``. The BufferedSocket enables receiving until the next | |
17 relevant token, up to a certain size, or until the connection is | |
18 closed. For all of these, it provides consistent APIs to size | |
19 limiting, as well as timeouts that are compatible with multiple | |
20 concurrency paradigms. Use it to parse the next one-off text or binary | |
21 socket protocol you encounter. | |
22 | |
23 This module also provides the :class:`NetstringSocket`, a pure-Python | |
24 implementation of `the Netstring protocol`_, built on top of the | |
25 :class:`BufferedSocket`, serving as a ready-made, production-grade example. | |
26 | |
27 Special thanks to `Kurt Rose`_ for his original authorship and all his | |
28 contributions on this module. Also thanks to `Daniel J. Bernstein`_, the | |
29 original author of `Netstring`_. | |
30 | |
31 .. _the Netstring protocol: https://en.wikipedia.org/wiki/Netstring | |
32 .. _Kurt Rose: https://github.com/doublereedkurt | |
33 .. _Daniel J. Bernstein: https://cr.yp.to/ | |
34 .. _Netstring: https://cr.yp.to/proto/netstrings.txt | |
35 | |
36 """ | |
37 | |
38 import time | |
39 import socket | |
40 | |
41 try: | |
42 from threading import RLock | |
43 except Exception: | |
44 class RLock(object): | |
45 'Dummy reentrant lock for builds without threads' | |
46 def __enter__(self): | |
47 pass | |
48 | |
49 def __exit__(self, exctype, excinst, exctb): | |
50 pass | |
51 | |
52 | |
53 try: | |
54 from typeutils import make_sentinel | |
55 _UNSET = make_sentinel(var_name='_UNSET') | |
56 except ImportError: | |
57 _UNSET = object() | |
58 | |
59 | |
60 DEFAULT_TIMEOUT = 10 # 10 seconds | |
61 DEFAULT_MAXSIZE = 32 * 1024 # 32kb | |
62 _RECV_LARGE_MAXSIZE = 1024 ** 5 # 1PB | |
63 | |
64 | |
65 class BufferedSocket(object): | |
66 """Mainly provides recv_until and recv_size. recv, send, sendall, and | |
67 peek all function as similarly as possible to the built-in socket | |
68 API. | |
69 | |
70 This type has been tested against both the built-in socket type as | |
71 well as those from gevent and eventlet. It also features support | |
72 for sockets with timeouts set to 0 (aka nonblocking), provided the | |
73 caller is prepared to handle the EWOULDBLOCK exceptions. | |
74 | |
75 Args: | |
76 sock (socket): The connected socket to be wrapped. | |
77 timeout (float): The default timeout for sends and recvs, in | |
78 seconds. Set to ``None`` for no timeout, and 0 for | |
79 nonblocking. Defaults to *sock*'s own timeout if already set, | |
80 and 10 seconds otherwise. | |
81 maxsize (int): The default maximum number of bytes to be received | |
82 into the buffer before it is considered full and raises an | |
83 exception. Defaults to 32 kilobytes. | |
84 recvsize (int): The number of bytes to recv for every | |
85 lower-level :meth:`socket.recv` call. Defaults to *maxsize*. | |
86 | |
87 *timeout* and *maxsize* can both be overridden on individual socket | |
88 operations. | |
89 | |
90 All ``recv`` methods return bytestrings (:class:`bytes`) and can | |
91 raise :exc:`socket.error`. :exc:`Timeout`, | |
92 :exc:`ConnectionClosed`, and :exc:`MessageTooLong` all inherit | |
93 from :exc:`socket.error` and exist to provide better error | |
94 messages. Received bytes are always buffered, even if an exception | |
95 is raised. Use :meth:`BufferedSocket.getrecvbuffer` to retrieve | |
96 partial recvs. | |
97 | |
98 BufferedSocket does not replace the built-in socket by any | |
99 means. While the overlapping parts of the API are kept parallel to | |
100 the built-in :class:`socket.socket`, BufferedSocket does not | |
101 inherit from socket, and most socket functionality is only | |
102 available on the underlying socket. :meth:`socket.getpeername`, | |
103 :meth:`socket.getsockname`, :meth:`socket.fileno`, and others are | |
104 only available on the underlying socket that is wrapped. Use the | |
105 ``BufferedSocket.sock`` attribute to access it. See the examples | |
106 for more information on how to use BufferedSockets with built-in | |
107 sockets. | |
108 | |
109 The BufferedSocket is threadsafe, but consider the semantics of | |
110 your protocol before accessing a single socket from multiple | |
111 threads. Similarly, once the BufferedSocket is constructed, avoid | |
112 using the underlying socket directly. Only use it for operations | |
113 unrelated to messages, e.g., :meth:`socket.getpeername`. | |
114 | |
115 """ | |
116 def __init__(self, sock, timeout=_UNSET, | |
117 maxsize=DEFAULT_MAXSIZE, recvsize=_UNSET): | |
118 self.sock = sock | |
119 self.rbuf = b'' | |
120 self.sbuf = [] | |
121 self.maxsize = int(maxsize) | |
122 | |
123 if timeout is _UNSET: | |
124 if self.sock.gettimeout() is None: | |
125 self.timeout = DEFAULT_TIMEOUT | |
126 else: | |
127 self.timeout = self.sock.gettimeout() | |
128 else: | |
129 if timeout is None: | |
130 self.timeout = timeout | |
131 else: | |
132 self.timeout = float(timeout) | |
133 | |
134 if recvsize is _UNSET: | |
135 self._recvsize = self.maxsize | |
136 else: | |
137 self._recvsize = int(recvsize) | |
138 | |
139 self._send_lock = RLock() | |
140 self._recv_lock = RLock() | |
141 | |
142 def settimeout(self, timeout): | |
143 "Set the default *timeout* for future operations, in seconds." | |
144 self.timeout = timeout | |
145 | |
146 def gettimeout(self): | |
147 return self.timeout | |
148 | |
149 def setblocking(self, blocking): | |
150 self.timeout = None if blocking else 0.0 | |
151 | |
152 def setmaxsize(self, maxsize): | |
153 """Set the default maximum buffer size *maxsize* for future | |
154 operations, in bytes. Does not truncate the current buffer. | |
155 """ | |
156 self.maxsize = maxsize | |
157 | |
158 def getrecvbuffer(self): | |
159 "Returns the receive buffer bytestring (rbuf)." | |
160 with self._recv_lock: | |
161 return self.rbuf | |
162 | |
163 def getsendbuffer(self): | |
164 "Returns a copy of the send buffer list." | |
165 with self._send_lock: | |
166 return b''.join(self.sbuf) | |
167 | |
168 def recv(self, size, flags=0, timeout=_UNSET): | |
169 """Returns **up to** *size* bytes, using the internal buffer before | |
170 performing a single :meth:`socket.recv` operation. | |
171 | |
172 Args: | |
173 size (int): The maximum number of bytes to receive. | |
174 flags (int): Kept for API compatibility with sockets. Only | |
175 the default, ``0``, is valid. | |
176 timeout (float): The timeout for this operation. Can be | |
177 ``0`` for nonblocking and ``None`` for no | |
178 timeout. Defaults to the value set in the constructor | |
179 of BufferedSocket. | |
180 | |
181 If the operation does not complete in *timeout* seconds, a | |
182 :exc:`Timeout` is raised. Much like the built-in | |
183 :class:`socket.socket`, if this method returns an empty string, | |
184 then the socket is closed and recv buffer is empty. Further | |
185 calls to recv will raise :exc:`socket.error`. | |
186 | |
187 """ | |
188 with self._recv_lock: | |
189 if timeout is _UNSET: | |
190 timeout = self.timeout | |
191 if flags: | |
192 raise ValueError("non-zero flags not supported: %r" % flags) | |
193 if len(self.rbuf) >= size: | |
194 data, self.rbuf = self.rbuf[:size], self.rbuf[size:] | |
195 return data | |
196 if self.rbuf: | |
197 ret, self.rbuf = self.rbuf, b'' | |
198 return ret | |
199 self.sock.settimeout(timeout) | |
200 try: | |
201 data = self.sock.recv(self._recvsize) | |
202 except socket.timeout: | |
203 raise Timeout(timeout) # check the rbuf attr for more | |
204 if len(data) > size: | |
205 data, self.rbuf = data[:size], data[size:] | |
206 return data | |
207 | |
208 def peek(self, size, timeout=_UNSET): | |
209 """Returns *size* bytes from the socket and/or internal buffer. Bytes | |
210 are retained in BufferedSocket's internal recv buffer. To only | |
211 see bytes in the recv buffer, use :meth:`getrecvbuffer`. | |
212 | |
213 Args: | |
214 size (int): The exact number of bytes to peek at | |
215 timeout (float): The timeout for this operation. Can be 0 for | |
216 nonblocking and None for no timeout. Defaults to the value | |
217 set in the constructor of BufferedSocket. | |
218 | |
219 If the appropriate number of bytes cannot be fetched from the | |
220 buffer and socket before *timeout* expires, then a | |
221 :exc:`Timeout` will be raised. If the connection is closed, a | |
222 :exc:`ConnectionClosed` will be raised. | |
223 """ | |
224 with self._recv_lock: | |
225 if len(self.rbuf) >= size: | |
226 return self.rbuf[:size] | |
227 data = self.recv_size(size, timeout=timeout) | |
228 self.rbuf = data + self.rbuf | |
229 return data | |
230 | |
231 def recv_close(self, timeout=_UNSET, maxsize=_UNSET): | |
232 """Receive until the connection is closed, up to *maxsize* bytes. If | |
233 more than *maxsize* bytes are received, raises :exc:`MessageTooLong`. | |
234 """ | |
235 # recv_close works by using recv_size to request maxsize data, | |
236 # and ignoring ConnectionClose, returning and clearing the | |
237 # internal buffer instead. It raises an exception if | |
238 # ConnectionClosed isn't raised. | |
239 with self._recv_lock: | |
240 if maxsize is _UNSET: | |
241 maxsize = self.maxsize | |
242 if maxsize is None: | |
243 maxsize = _RECV_LARGE_MAXSIZE | |
244 try: | |
245 recvd = self.recv_size(maxsize + 1, timeout) | |
246 except ConnectionClosed: | |
247 ret, self.rbuf = self.rbuf, b'' | |
248 else: | |
249 # put extra received bytes (now in rbuf) after recvd | |
250 self.rbuf = recvd + self.rbuf | |
251 size_read = min(maxsize, len(self.rbuf)) | |
252 raise MessageTooLong(size_read) # check receive buffer | |
253 return ret | |
254 | |
255 def recv_until(self, delimiter, timeout=_UNSET, maxsize=_UNSET, | |
256 with_delimiter=False): | |
257 """Receive until *delimiter* is found, *maxsize* bytes have been read, | |
258 or *timeout* is exceeded. | |
259 | |
260 Args: | |
261 delimiter (bytes): One or more bytes to be searched for | |
262 in the socket stream. | |
263 timeout (float): The timeout for this operation. Can be 0 for | |
264 nonblocking and None for no timeout. Defaults to the value | |
265 set in the constructor of BufferedSocket. | |
266 maxsize (int): The maximum size for the internal buffer. | |
267 Defaults to the value set in the constructor. | |
268 with_delimiter (bool): Whether or not to include the | |
269 delimiter in the output. ``False`` by default, but | |
270 ``True`` is useful in cases where one is simply | |
271 forwarding the messages. | |
272 | |
273 ``recv_until`` will raise the following exceptions: | |
274 | |
275 * :exc:`Timeout` if more than *timeout* seconds expire. | |
276 * :exc:`ConnectionClosed` if the underlying socket is closed | |
277 by the sending end. | |
278 * :exc:`MessageTooLong` if the delimiter is not found in the | |
279 first *maxsize* bytes. | |
280 * :exc:`socket.error` if operating in nonblocking mode | |
281 (*timeout* equal to 0), or if some unexpected socket error | |
282 occurs, such as operating on a closed socket. | |
283 | |
284 """ | |
285 with self._recv_lock: | |
286 if maxsize is _UNSET: | |
287 maxsize = self.maxsize | |
288 if maxsize is None: | |
289 maxsize = _RECV_LARGE_MAXSIZE | |
290 if timeout is _UNSET: | |
291 timeout = self.timeout | |
292 len_delimiter = len(delimiter) | |
293 | |
294 sock = self.sock | |
295 recvd = bytearray(self.rbuf) | |
296 start = time.time() | |
297 find_offset_start = 0 # becomes a negative index below | |
298 | |
299 if not timeout: # covers None (no timeout) and 0 (nonblocking) | |
300 sock.settimeout(timeout) | |
301 try: | |
302 while 1: | |
303 offset = recvd.find(delimiter, find_offset_start, maxsize) | |
304 if offset != -1: # str.find returns -1 when no match found | |
305 if with_delimiter: # include delimiter in return | |
306 offset += len_delimiter | |
307 rbuf_offset = offset | |
308 else: | |
309 rbuf_offset = offset + len_delimiter | |
310 break | |
311 elif len(recvd) > maxsize: | |
312 raise MessageTooLong(maxsize, delimiter) # see rbuf | |
313 if timeout: | |
314 cur_timeout = timeout - (time.time() - start) | |
315 if cur_timeout <= 0.0: | |
316 raise socket.timeout() | |
317 sock.settimeout(cur_timeout) | |
318 nxt = sock.recv(self._recvsize) | |
319 if not nxt: | |
320 args = (len(recvd), delimiter) | |
321 msg = ('connection closed after reading %s bytes' | |
322 ' without finding symbol: %r' % args) | |
323 raise ConnectionClosed(msg) # check the recv buffer | |
324 recvd.extend(nxt) | |
325 find_offset_start = -len(nxt) - len_delimiter + 1 | |
326 except socket.timeout: | |
327 self.rbuf = bytes(recvd) | |
328 msg = ('read %s bytes without finding delimiter: %r' | |
329 % (len(recvd), delimiter)) | |
330 raise Timeout(timeout, msg) # check the recv buffer | |
331 except Exception: | |
332 self.rbuf = bytes(recvd) | |
333 raise | |
334 val, self.rbuf = bytes(recvd[:offset]), bytes(recvd[rbuf_offset:]) | |
335 return val | |
336 | |
337 def recv_size(self, size, timeout=_UNSET): | |
338 """Read off of the internal buffer, then off the socket, until | |
339 *size* bytes have been read. | |
340 | |
341 Args: | |
342 size (int): number of bytes to read before returning. | |
343 timeout (float): The timeout for this operation. Can be 0 for | |
344 nonblocking and None for no timeout. Defaults to the value | |
345 set in the constructor of BufferedSocket. | |
346 | |
347 If the appropriate number of bytes cannot be fetched from the | |
348 buffer and socket before *timeout* expires, then a | |
349 :exc:`Timeout` will be raised. If the connection is closed, a | |
350 :exc:`ConnectionClosed` will be raised. | |
351 """ | |
352 with self._recv_lock: | |
353 if timeout is _UNSET: | |
354 timeout = self.timeout | |
355 chunks = [] | |
356 total_bytes = 0 | |
357 try: | |
358 start = time.time() | |
359 self.sock.settimeout(timeout) | |
360 nxt = self.rbuf or self.sock.recv(self._recvsize) | |
361 while nxt: | |
362 total_bytes += len(nxt) | |
363 if total_bytes >= size: | |
364 break | |
365 chunks.append(nxt) | |
366 if timeout: | |
367 cur_timeout = timeout - (time.time() - start) | |
368 if cur_timeout <= 0.0: | |
369 raise socket.timeout() | |
370 self.sock.settimeout(cur_timeout) | |
371 nxt = self.sock.recv(self._recvsize) | |
372 else: | |
373 msg = ('connection closed after reading %s of %s requested' | |
374 ' bytes' % (total_bytes, size)) | |
375 raise ConnectionClosed(msg) # check recv buffer | |
376 except socket.timeout: | |
377 self.rbuf = b''.join(chunks) | |
378 msg = 'read %s of %s bytes' % (total_bytes, size) | |
379 raise Timeout(timeout, msg) # check recv buffer | |
380 except Exception: | |
381 # received data is still buffered in the case of errors | |
382 self.rbuf = b''.join(chunks) | |
383 raise | |
384 extra_bytes = total_bytes - size | |
385 if extra_bytes: | |
386 last, self.rbuf = nxt[:-extra_bytes], nxt[-extra_bytes:] | |
387 else: | |
388 last, self.rbuf = nxt, b'' | |
389 chunks.append(last) | |
390 return b''.join(chunks) | |
391 | |
392 def send(self, data, flags=0, timeout=_UNSET): | |
393 """Send the contents of the internal send buffer, as well as *data*, | |
394 to the receiving end of the connection. Returns the total | |
395 number of bytes sent. If no exception is raised, all of *data* was | |
396 sent and the internal send buffer is empty. | |
397 | |
398 Args: | |
399 data (bytes): The bytes to send. | |
400 flags (int): Kept for API compatibility with sockets. Only | |
401 the default 0 is valid. | |
402 timeout (float): The timeout for this operation. Can be 0 for | |
403 nonblocking and None for no timeout. Defaults to the value | |
404 set in the constructor of BufferedSocket. | |
405 | |
406 Will raise :exc:`Timeout` if the send operation fails to | |
407 complete before *timeout*. In the event of an exception, use | |
408 :meth:`BufferedSocket.getsendbuffer` to see which data was | |
409 unsent. | |
410 """ | |
411 with self._send_lock: | |
412 if timeout is _UNSET: | |
413 timeout = self.timeout | |
414 if flags: | |
415 raise ValueError("non-zero flags not supported") | |
416 sbuf = self.sbuf | |
417 sbuf.append(data) | |
418 if len(sbuf) > 1: | |
419 sbuf[:] = [b''.join([s for s in sbuf if s])] | |
420 self.sock.settimeout(timeout) | |
421 start, total_sent = time.time(), 0 | |
422 try: | |
423 while sbuf[0]: | |
424 sent = self.sock.send(sbuf[0]) | |
425 total_sent += sent | |
426 sbuf[0] = sbuf[0][sent:] | |
427 if timeout: | |
428 cur_timeout = timeout - (time.time() - start) | |
429 if cur_timeout <= 0.0: | |
430 raise socket.timeout() | |
431 self.sock.settimeout(cur_timeout) | |
432 except socket.timeout: | |
433 raise Timeout(timeout, '%s bytes unsent' % len(sbuf[0])) | |
434 return total_sent | |
435 | |
436 def sendall(self, data, flags=0, timeout=_UNSET): | |
437 """A passthrough to :meth:`~BufferedSocket.send`, retained for | |
438 parallelism to the :class:`socket.socket` API. | |
439 """ | |
440 return self.send(data, flags, timeout) | |
441 | |
442 def flush(self): | |
443 "Send the contents of the internal send buffer." | |
444 with self._send_lock: | |
445 self.send(b'') | |
446 return | |
447 | |
448 def buffer(self, data): | |
449 "Buffer *data* bytes for the next send operation." | |
450 with self._send_lock: | |
451 self.sbuf.append(data) | |
452 return | |
453 | |
454 # # # | |
455 # # # Passing through some socket basics | |
456 # # # | |
457 | |
458 def getsockname(self): | |
459 """Convenience function to return the wrapped socket's own address. | |
460 See :meth:`socket.getsockname` for more details. | |
461 """ | |
462 return self.sock.getsockname() | |
463 | |
464 def getpeername(self): | |
465 """Convenience function to return the remote address to which the | |
466 wrapped socket is connected. See :meth:`socket.getpeername` | |
467 for more details. | |
468 """ | |
469 return self.sock.getpeername() | |
470 | |
471 def getsockopt(self, level, optname, buflen=None): | |
472 """Convenience function passing through to the wrapped socket's | |
473 :meth:`socket.getsockopt`. | |
474 """ | |
475 args = (level, optname) | |
476 if buflen is not None: | |
477 args += (buflen,) | |
478 return self.sock.getsockopt(*args) | |
479 | |
480 def setsockopt(self, level, optname, value): | |
481 """Convenience function passing through to the wrapped socket's | |
482 :meth:`socket.setsockopt`. | |
483 """ | |
484 return self.sock.setsockopt(level, optname, value) | |
485 | |
486 @property | |
487 def type(self): | |
488 """A passthrough to the wrapped socket's type. Valid usages should | |
489 only ever see :data:`socket.SOCK_STREAM`. | |
490 """ | |
491 return self.sock.type | |
492 | |
493 @property | |
494 def family(self): | |
495 """A passthrough to the wrapped socket's family. BufferedSocket | |
496 supports all widely-used families, so this read-only attribute | |
497 can be one of :data:`socket.AF_INET` for IP, | |
498 :data:`socket.AF_INET6` for IPv6, and :data:`socket.AF_UNIX` | |
499 for UDS. | |
500 """ | |
501 return self.sock.family | |
502 | |
503 @property | |
504 def proto(self): | |
505 """A passthrough to the wrapped socket's protocol. The ``proto`` | |
506 attribute is very rarely used, so it's always 0, meaning "the | |
507 default" protocol. Pretty much all the practical information | |
508 is in :attr:`~BufferedSocket.type` and | |
509 :attr:`~BufferedSocket.family`, so you can go back to never | |
510 thinking about this. | |
511 """ | |
512 return self.sock.proto | |
513 | |
514 # # # | |
515 # # # Now for some more advanced interpretations of the builtin socket | |
516 # # # | |
517 | |
518 def fileno(self): | |
519 """Returns the file descriptor of the wrapped socket. -1 if it has | |
520 been closed on this end. | |
521 | |
522 Note that this makes the BufferedSocket selectable, i.e., | |
523 usable for operating system event loops without any external | |
524 libraries. Keep in mind that the operating system cannot know | |
525 about data in BufferedSocket's internal buffer. Exercise | |
526 discipline with calling ``recv*`` functions. | |
527 """ | |
528 return self.sock.fileno() | |
529 | |
530 def close(self): | |
531 """Closes the wrapped socket, and empties the internal buffers. The | |
532 send buffer is not flushed automatically, so if you have been | |
533 calling :meth:`~BufferedSocket.buffer`, be sure to call | |
534 :meth:`~BufferedSocket.flush` before calling this | |
535 method. After calling this method, future socket operations | |
536 will raise :exc:`socket.error`. | |
537 """ | |
538 with self._recv_lock: | |
539 with self._send_lock: | |
540 self.rbuf = b'' | |
541 self.rbuf_unconsumed = self.rbuf | |
542 self.sbuf[:] = [] | |
543 self.sock.close() | |
544 return | |
545 | |
546 def shutdown(self, how): | |
547 """Convenience method which passes through to the wrapped socket's | |
548 :meth:`~socket.shutdown`. Semantics vary by platform, so no | |
549 special internal handling is done with the buffers. This | |
550 method exists to facilitate the most common usage, wherein a | |
551 full ``shutdown`` is followed by a | |
552 :meth:`~BufferedSocket.close`. Developers requiring more | |
553 support, please open `an issue`_. | |
554 | |
555 .. _an issue: https://github.com/mahmoud/boltons/issues | |
556 """ | |
557 with self._recv_lock: | |
558 with self._send_lock: | |
559 self.sock.shutdown(how) | |
560 return | |
561 | |
562 # end BufferedSocket | |
563 | |
564 | |
565 class Error(socket.error): | |
566 """A subclass of :exc:`socket.error` from which all other | |
567 ``socketutils`` exceptions inherit. | |
568 | |
569 When using :class:`BufferedSocket` and other ``socketutils`` | |
570 types, generally you want to catch one of the specific exception | |
571 types below, or :exc:`socket.error`. | |
572 """ | |
573 pass | |
574 | |
575 | |
576 class ConnectionClosed(Error): | |
577 """Raised when receiving and the connection is unexpectedly closed | |
578 from the sending end. Raised from :class:`BufferedSocket`'s | |
579 :meth:`~BufferedSocket.peek`, :meth:`~BufferedSocket.recv_until`, | |
580 and :meth:`~BufferedSocket.recv_size`, and never from its | |
581 :meth:`~BufferedSocket.recv` or | |
582 :meth:`~BufferedSocket.recv_close`. | |
583 """ | |
584 pass | |
585 | |
586 | |
587 class MessageTooLong(Error): | |
588 """Raised from :meth:`BufferedSocket.recv_until` and | |
589 :meth:`BufferedSocket.recv_closed` when more than *maxsize* bytes are | |
590 read without encountering the delimiter or a closed connection, | |
591 respectively. | |
592 """ | |
593 def __init__(self, bytes_read=None, delimiter=None): | |
594 msg = 'message exceeded maximum size' | |
595 if bytes_read is not None: | |
596 msg += '. %s bytes read' % (bytes_read,) | |
597 if delimiter is not None: | |
598 msg += '. Delimiter not found: %r' % (delimiter,) | |
599 super(MessageTooLong, self).__init__(msg) | |
600 | |
601 | |
602 class Timeout(socket.timeout, Error): | |
603 """Inheriting from :exc:`socket.timeout`, Timeout is used to indicate | |
604 when a socket operation did not complete within the time | |
605 specified. Raised from any of :class:`BufferedSocket`'s ``recv`` | |
606 methods. | |
607 """ | |
608 def __init__(self, timeout, extra=""): | |
609 msg = 'socket operation timed out' | |
610 if timeout is not None: | |
611 msg += ' after %sms.' % (timeout * 1000) | |
612 if extra: | |
613 msg += ' ' + extra | |
614 super(Timeout, self).__init__(msg) | |
615 | |
616 | |
617 class NetstringSocket(object): | |
618 """ | |
619 Reads and writes using the netstring protocol. | |
620 | |
621 More info: https://en.wikipedia.org/wiki/Netstring | |
622 Even more info: http://cr.yp.to/proto/netstrings.txt | |
623 """ | |
624 def __init__(self, sock, timeout=DEFAULT_TIMEOUT, maxsize=DEFAULT_MAXSIZE): | |
625 self.bsock = BufferedSocket(sock) | |
626 self.timeout = timeout | |
627 self.maxsize = maxsize | |
628 self._msgsize_maxsize = len(str(maxsize)) + 1 # len(str()) == log10 | |
629 | |
630 def fileno(self): | |
631 return self.bsock.fileno() | |
632 | |
633 def settimeout(self, timeout): | |
634 self.timeout = timeout | |
635 | |
636 def setmaxsize(self, maxsize): | |
637 self.maxsize = maxsize | |
638 self._msgsize_maxsize = self._calc_msgsize_maxsize(maxsize) | |
639 | |
640 def _calc_msgsize_maxsize(self, maxsize): | |
641 return len(str(maxsize)) + 1 # len(str()) == log10 | |
642 | |
643 def read_ns(self, timeout=_UNSET, maxsize=_UNSET): | |
644 if timeout is _UNSET: | |
645 timeout = self.timeout | |
646 | |
647 if maxsize is _UNSET: | |
648 maxsize = self.maxsize | |
649 msgsize_maxsize = self._msgsize_maxsize | |
650 else: | |
651 msgsize_maxsize = self._calc_msgsize_maxsize(maxsize) | |
652 | |
653 size_prefix = self.bsock.recv_until(b':', | |
654 timeout=timeout, | |
655 maxsize=msgsize_maxsize) | |
656 try: | |
657 size = int(size_prefix) | |
658 except ValueError: | |
659 raise NetstringInvalidSize('netstring message size must be valid' | |
660 ' integer, not %r' % size_prefix) | |
661 | |
662 if size > maxsize: | |
663 raise NetstringMessageTooLong(size, maxsize) | |
664 payload = self.bsock.recv_size(size) | |
665 if self.bsock.recv(1) != b',': | |
666 raise NetstringProtocolError("expected trailing ',' after message") | |
667 | |
668 return payload | |
669 | |
670 def write_ns(self, payload): | |
671 size = len(payload) | |
672 if size > self.maxsize: | |
673 raise NetstringMessageTooLong(size, self.maxsize) | |
674 data = str(size).encode('ascii') + b':' + payload + b',' | |
675 self.bsock.send(data) | |
676 | |
677 | |
678 class NetstringProtocolError(Error): | |
679 "Base class for all of socketutils' Netstring exception types." | |
680 pass | |
681 | |
682 | |
683 class NetstringInvalidSize(NetstringProtocolError): | |
684 """NetstringInvalidSize is raised when the ``:``-delimited size prefix | |
685 of the message does not contain a valid integer. | |
686 | |
687 Message showing valid size:: | |
688 | |
689 5:hello, | |
690 | |
691 Here the ``5`` is the size. Anything in this prefix position that | |
692 is not parsable as a Python integer (i.e., :class:`int`) will raise | |
693 this exception. | |
694 """ | |
695 def __init__(self, msg): | |
696 super(NetstringInvalidSize, self).__init__(msg) | |
697 | |
698 | |
699 class NetstringMessageTooLong(NetstringProtocolError): | |
700 """NetstringMessageTooLong is raised when the size prefix contains a | |
701 valid integer, but that integer is larger than the | |
702 :class:`NetstringSocket`'s configured *maxsize*. | |
703 | |
704 When this exception is raised, it's recommended to simply close | |
705 the connection instead of trying to recover. | |
706 """ | |
707 def __init__(self, size, maxsize): | |
708 msg = ('netstring message length exceeds configured maxsize: %s > %s' | |
709 % (size, maxsize)) | |
710 super(NetstringMessageTooLong, self).__init__(msg) | |
711 | |
712 | |
713 """ | |
714 attrs worth adding/passing through: | |
715 | |
716 | |
717 properties: type, proto | |
718 | |
719 For its main functionality, BufferedSocket can wrap any object that | |
720 has the following methods: | |
721 | |
722 - gettimeout() | |
723 - settimeout() | |
724 - recv(size) | |
725 - send(data) | |
726 | |
727 The following methods are passed through: | |
728 | |
729 ... | |
730 | |
731 """ | |
732 | |
733 # TODO: buffered socket check socket.type == SOCK_STREAM? | |
734 # TODO: make recv_until support taking a regex | |
735 # TODO: including the delimiter in the recv_until return is not | |
736 # necessary, as ConnectionClosed differentiates empty messages | |
737 # from socket closes. |