Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/planemo/network_util.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
import socket try: from http.client import BadStatusLine except ImportError: from httplib import BadStatusLine from time import time as now from six.moves.urllib.error import URLError from six.moves.urllib.request import urlopen def get_free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('localhost', 0)) port = sock.getsockname()[1] sock.close() return port def wait_http_service(url, timeout=None): if timeout: end = now() + timeout while True: try: if timeout: next_timeout = end - now() if next_timeout < 0: return False kwds = {} if timeout is None else dict(timeout=next_timeout) with urlopen(url, **kwds) as r: if r.getcode() != 200: continue return True except socket.error: pass except BadStatusLine: pass except URLError: pass # code.activestate.com/recipes/576655-wait-for-network-service-to-appear def wait_net_service(server, port, timeout=None): """ Wait for network service to appear. :param int timeout: in seconds, if None or 0 wait forever :return: A ``bool`` - if ``timeout`` is ``None`` may return only ``True`` or throw an unhandled network exception. """ if port is None: raise TypeError("wait_net_service passed NoneType port value.") port = int(port) if timeout: end = now() + timeout while True: s = socket.socket() # Following line prevents this method from interfering with process # it is waiting for on localhost. s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: if timeout: next_timeout = end - now() if next_timeout < 0: return False else: s.settimeout(next_timeout) s.connect((server, port)) except socket.timeout: # this exception occurs only if timeout is set if timeout: return False except socket.error: # if getattr(e, "errno") == 61: # refused_connections += 1 s.close() else: s.close() return True