diff planemo/lib/python3.7/site-packages/psutil/tests/test_connections.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/psutil/tests/test_connections.py	Fri Jul 31 00:32:28 2020 -0400
@@ -0,0 +1,639 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for net_connections() and Process.connections() APIs."""
+
+import contextlib
+import errno
+import os
+import socket
+import textwrap
+from contextlib import closing
+from socket import AF_INET
+from socket import AF_INET6
+from socket import SOCK_DGRAM
+from socket import SOCK_STREAM
+
+import psutil
+from psutil import FREEBSD
+from psutil import LINUX
+from psutil import MACOS
+from psutil import NETBSD
+from psutil import OPENBSD
+from psutil import POSIX
+from psutil import SUNOS
+from psutil import WINDOWS
+from psutil._common import supports_ipv6
+from psutil._compat import PY3
+from psutil.tests import AF_UNIX
+from psutil.tests import bind_socket
+from psutil.tests import bind_unix_socket
+from psutil.tests import check_net_address
+from psutil.tests import CIRRUS
+from psutil.tests import create_sockets
+from psutil.tests import enum
+from psutil.tests import get_free_port
+from psutil.tests import HAS_CONNECTIONS_UNIX
+from psutil.tests import PsutilTestCase
+from psutil.tests import reap_children
+from psutil.tests import retry_on_failure
+from psutil.tests import serialrun
+from psutil.tests import skip_on_access_denied
+from psutil.tests import SKIP_SYSCONS
+from psutil.tests import tcp_socketpair
+from psutil.tests import TRAVIS
+from psutil.tests import unittest
+from psutil.tests import unix_socketpair
+from psutil.tests import wait_for_file
+
+
+thisproc = psutil.Process()
+SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())
+
+
+@serialrun
+class _ConnTestCase(PsutilTestCase):
+
+    def setUp(self):
+        if not (NETBSD or FREEBSD):
+            # process opens a UNIX socket to /var/log/run.
+            cons = thisproc.connections(kind='all')
+            assert not cons, cons
+
+    def tearDown(self):
+        if not (FREEBSD or NETBSD):
+            # Make sure we closed all resources.
+            # NetBSD opens a UNIX socket to /var/log/run.
+            cons = thisproc.connections(kind='all')
+            assert not cons, cons
+
+    def compare_procsys_connections(self, pid, proc_cons, kind='all'):
+        """Given a process PID and its list of connections compare
+        those against system-wide connections retrieved via
+        psutil.net_connections.
+        """
+        try:
+            sys_cons = psutil.net_connections(kind=kind)
+        except psutil.AccessDenied:
+            # On MACOS, system-wide connections are retrieved by iterating
+            # over all processes
+            if MACOS:
+                return
+            else:
+                raise
+        # Filter for this proc PID and exlucde PIDs from the tuple.
+        sys_cons = [c[:-1] for c in sys_cons if c.pid == pid]
+        sys_cons.sort()
+        proc_cons.sort()
+        self.assertEqual(proc_cons, sys_cons)
+
+    def check_connection_ntuple(self, conn):
+        """Check validity of a connection namedtuple."""
+        def check_ntuple(conn):
+            has_pid = len(conn) == 7
+            self.assertIn(len(conn), (6, 7))
+            self.assertEqual(conn[0], conn.fd)
+            self.assertEqual(conn[1], conn.family)
+            self.assertEqual(conn[2], conn.type)
+            self.assertEqual(conn[3], conn.laddr)
+            self.assertEqual(conn[4], conn.raddr)
+            self.assertEqual(conn[5], conn.status)
+            if has_pid:
+                self.assertEqual(conn[6], conn.pid)
+
+        def check_family(conn):
+            self.assertIn(conn.family, (AF_INET, AF_INET6, AF_UNIX))
+            if enum is not None:
+                assert isinstance(conn.family, enum.IntEnum), conn
+            else:
+                assert isinstance(conn.family, int), conn
+            if conn.family == AF_INET:
+                # actually try to bind the local socket; ignore IPv6
+                # sockets as their address might be represented as
+                # an IPv4-mapped-address (e.g. "::127.0.0.1")
+                # and that's rejected by bind()
+                s = socket.socket(conn.family, conn.type)
+                with contextlib.closing(s):
+                    try:
+                        s.bind((conn.laddr[0], 0))
+                    except socket.error as err:
+                        if err.errno != errno.EADDRNOTAVAIL:
+                            raise
+            elif conn.family == AF_UNIX:
+                self.assertEqual(conn.status, psutil.CONN_NONE)
+
+        def check_type(conn):
+            # SOCK_SEQPACKET may happen in case of AF_UNIX socks
+            self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET))
+            if enum is not None:
+                assert isinstance(conn.type, enum.IntEnum), conn
+            else:
+                assert isinstance(conn.type, int), conn
+            if conn.type == SOCK_DGRAM:
+                self.assertEqual(conn.status, psutil.CONN_NONE)
+
+        def check_addrs(conn):
+            # check IP address and port sanity
+            for addr in (conn.laddr, conn.raddr):
+                if conn.family in (AF_INET, AF_INET6):
+                    self.assertIsInstance(addr, tuple)
+                    if not addr:
+                        continue
+                    self.assertIsInstance(addr.port, int)
+                    assert 0 <= addr.port <= 65535, addr.port
+                    check_net_address(addr.ip, conn.family)
+                elif conn.family == AF_UNIX:
+                    self.assertIsInstance(addr, str)
+
+        def check_status(conn):
+            self.assertIsInstance(conn.status, str)
+            valids = [getattr(psutil, x) for x in dir(psutil)
+                      if x.startswith('CONN_')]
+            self.assertIn(conn.status, valids)
+            if conn.family in (AF_INET, AF_INET6) and conn.type == SOCK_STREAM:
+                self.assertNotEqual(conn.status, psutil.CONN_NONE)
+            else:
+                self.assertEqual(conn.status, psutil.CONN_NONE)
+
+        check_ntuple(conn)
+        check_family(conn)
+        check_type(conn)
+        check_addrs(conn)
+        check_status(conn)
+
+
+class TestBasicOperations(_ConnTestCase):
+
+    @unittest.skipIf(SKIP_SYSCONS, "requires root")
+    def test_system(self):
+        with create_sockets():
+            for conn in psutil.net_connections(kind='all'):
+                self.check_connection_ntuple(conn)
+
+    def test_process(self):
+        with create_sockets():
+            for conn in psutil.Process().connections(kind='all'):
+                self.check_connection_ntuple(conn)
+
+    def test_invalid_kind(self):
+        self.assertRaises(ValueError, thisproc.connections, kind='???')
+        self.assertRaises(ValueError, psutil.net_connections, kind='???')
+
+
+@serialrun
+class TestUnconnectedSockets(_ConnTestCase):
+    """Tests sockets which are open but not connected to anything."""
+
+    def get_conn_from_sock(self, sock):
+        cons = thisproc.connections(kind='all')
+        smap = dict([(c.fd, c) for c in cons])
+        if NETBSD or FREEBSD:
+            # NetBSD opens a UNIX socket to /var/log/run
+            # so there may be more connections.
+            return smap[sock.fileno()]
+        else:
+            self.assertEqual(len(cons), 1)
+            if cons[0].fd != -1:
+                self.assertEqual(smap[sock.fileno()].fd, sock.fileno())
+            return cons[0]
+
+    def check_socket(self, sock):
+        """Given a socket, makes sure it matches the one obtained
+        via psutil. It assumes this process created one connection
+        only (the one supposed to be checked).
+        """
+        conn = self.get_conn_from_sock(sock)
+        self.check_connection_ntuple(conn)
+
+        # fd, family, type
+        if conn.fd != -1:
+            self.assertEqual(conn.fd, sock.fileno())
+        self.assertEqual(conn.family, sock.family)
+        # see: http://bugs.python.org/issue30204
+        self.assertEqual(
+            conn.type, sock.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE))
+
+        # local address
+        laddr = sock.getsockname()
+        if not laddr and PY3 and isinstance(laddr, bytes):
+            # See: http://bugs.python.org/issue30205
+            laddr = laddr.decode()
+        if sock.family == AF_INET6:
+            laddr = laddr[:2]
+        if sock.family == AF_UNIX and OPENBSD:
+            # No addresses are set for UNIX sockets on OpenBSD.
+            pass
+        else:
+            self.assertEqual(conn.laddr, laddr)
+
+        # XXX Solaris can't retrieve system-wide UNIX sockets
+        if sock.family == AF_UNIX and HAS_CONNECTIONS_UNIX:
+            cons = thisproc.connections(kind='all')
+            self.compare_procsys_connections(os.getpid(), cons, kind='all')
+        return conn
+
+    def test_tcp_v4(self):
+        addr = ("127.0.0.1", get_free_port())
+        with closing(bind_socket(AF_INET, SOCK_STREAM, addr=addr)) as sock:
+            conn = self.check_socket(sock)
+            assert not conn.raddr
+            self.assertEqual(conn.status, psutil.CONN_LISTEN)
+
+    @unittest.skipIf(not supports_ipv6(), "IPv6 not supported")
+    def test_tcp_v6(self):
+        addr = ("::1", get_free_port())
+        with closing(bind_socket(AF_INET6, SOCK_STREAM, addr=addr)) as sock:
+            conn = self.check_socket(sock)
+            assert not conn.raddr
+            self.assertEqual(conn.status, psutil.CONN_LISTEN)
+
+    def test_udp_v4(self):
+        addr = ("127.0.0.1", get_free_port())
+        with closing(bind_socket(AF_INET, SOCK_DGRAM, addr=addr)) as sock:
+            conn = self.check_socket(sock)
+            assert not conn.raddr
+            self.assertEqual(conn.status, psutil.CONN_NONE)
+
+    @unittest.skipIf(not supports_ipv6(), "IPv6 not supported")
+    def test_udp_v6(self):
+        addr = ("::1", get_free_port())
+        with closing(bind_socket(AF_INET6, SOCK_DGRAM, addr=addr)) as sock:
+            conn = self.check_socket(sock)
+            assert not conn.raddr
+            self.assertEqual(conn.status, psutil.CONN_NONE)
+
+    @unittest.skipIf(not POSIX, 'POSIX only')
+    def test_unix_tcp(self):
+        testfn = self.get_testfn()
+        with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
+            conn = self.check_socket(sock)
+            assert not conn.raddr
+            self.assertEqual(conn.status, psutil.CONN_NONE)
+
+    @unittest.skipIf(not POSIX, 'POSIX only')
+    def test_unix_udp(self):
+        testfn = self.get_testfn()
+        with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
+            conn = self.check_socket(sock)
+            assert not conn.raddr
+            self.assertEqual(conn.status, psutil.CONN_NONE)
+
+
+@serialrun
+class TestConnectedSocket(_ConnTestCase):
+    """Test socket pairs which are are actually connected to
+    each other.
+    """
+
+    # On SunOS, even after we close() it, the server socket stays around
+    # in TIME_WAIT state.
+    @unittest.skipIf(SUNOS, "unreliable on SUONS")
+    def test_tcp(self):
+        addr = ("127.0.0.1", get_free_port())
+        assert not thisproc.connections(kind='tcp4')
+        server, client = tcp_socketpair(AF_INET, addr=addr)
+        try:
+            cons = thisproc.connections(kind='tcp4')
+            self.assertEqual(len(cons), 2)
+            self.assertEqual(cons[0].status, psutil.CONN_ESTABLISHED)
+            self.assertEqual(cons[1].status, psutil.CONN_ESTABLISHED)
+            # May not be fast enough to change state so it stays
+            # commenteed.
+            # client.close()
+            # cons = thisproc.connections(kind='all')
+            # self.assertEqual(len(cons), 1)
+            # self.assertEqual(cons[0].status, psutil.CONN_CLOSE_WAIT)
+        finally:
+            server.close()
+            client.close()
+
+    @unittest.skipIf(not POSIX, 'POSIX only')
+    def test_unix(self):
+        testfn = self.get_testfn()
+        server, client = unix_socketpair(testfn)
+        try:
+            cons = thisproc.connections(kind='unix')
+            assert not (cons[0].laddr and cons[0].raddr)
+            assert not (cons[1].laddr and cons[1].raddr)
+            if NETBSD or FREEBSD:
+                # On NetBSD creating a UNIX socket will cause
+                # a UNIX connection to  /var/run/log.
+                cons = [c for c in cons if c.raddr != '/var/run/log']
+                if CIRRUS:
+                    cons = [c for c in cons if c.fd in
+                            (server.fileno(), client.fileno())]
+            self.assertEqual(len(cons), 2, msg=cons)
+            if LINUX or FREEBSD or SUNOS:
+                # remote path is never set
+                self.assertEqual(cons[0].raddr, "")
+                self.assertEqual(cons[1].raddr, "")
+                # one local address should though
+                self.assertEqual(testfn, cons[0].laddr or cons[1].laddr)
+            elif OPENBSD:
+                # No addresses whatsoever here.
+                for addr in (cons[0].laddr, cons[0].raddr,
+                             cons[1].laddr, cons[1].raddr):
+                    self.assertEqual(addr, "")
+            else:
+                # On other systems either the laddr or raddr
+                # of both peers are set.
+                self.assertEqual(cons[0].laddr or cons[1].laddr, testfn)
+                self.assertEqual(cons[0].raddr or cons[1].raddr, testfn)
+        finally:
+            server.close()
+            client.close()
+
+
+class TestFilters(_ConnTestCase):
+
+    def test_filters(self):
+        def check(kind, families, types):
+            for conn in thisproc.connections(kind=kind):
+                self.assertIn(conn.family, families)
+                self.assertIn(conn.type, types)
+            if not SKIP_SYSCONS:
+                for conn in psutil.net_connections(kind=kind):
+                    self.assertIn(conn.family, families)
+                    self.assertIn(conn.type, types)
+
+        with create_sockets():
+            check('all',
+                  [AF_INET, AF_INET6, AF_UNIX],
+                  [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET])
+            check('inet',
+                  [AF_INET, AF_INET6],
+                  [SOCK_STREAM, SOCK_DGRAM])
+            check('inet4',
+                  [AF_INET],
+                  [SOCK_STREAM, SOCK_DGRAM])
+            check('tcp',
+                  [AF_INET, AF_INET6],
+                  [SOCK_STREAM])
+            check('tcp4',
+                  [AF_INET],
+                  [SOCK_STREAM])
+            check('tcp6',
+                  [AF_INET6],
+                  [SOCK_STREAM])
+            check('udp',
+                  [AF_INET, AF_INET6],
+                  [SOCK_DGRAM])
+            check('udp4',
+                  [AF_INET],
+                  [SOCK_DGRAM])
+            check('udp6',
+                  [AF_INET6],
+                  [SOCK_DGRAM])
+            if HAS_CONNECTIONS_UNIX:
+                check('unix',
+                      [AF_UNIX],
+                      [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET])
+
+    @skip_on_access_denied(only_if=MACOS)
+    def test_combos(self):
+        reap_children()
+
+        def check_conn(proc, conn, family, type, laddr, raddr, status, kinds):
+            all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4",
+                         "tcp6", "udp", "udp4", "udp6")
+            self.check_connection_ntuple(conn)
+            self.assertEqual(conn.family, family)
+            self.assertEqual(conn.type, type)
+            self.assertEqual(conn.laddr, laddr)
+            self.assertEqual(conn.raddr, raddr)
+            self.assertEqual(conn.status, status)
+            for kind in all_kinds:
+                cons = proc.connections(kind=kind)
+                if kind in kinds:
+                    assert cons
+                else:
+                    assert not cons, cons
+            # compare against system-wide connections
+            # XXX Solaris can't retrieve system-wide UNIX
+            # sockets.
+            if HAS_CONNECTIONS_UNIX:
+                self.compare_procsys_connections(proc.pid, [conn])
+
+        tcp_template = textwrap.dedent("""
+            import socket, time
+            s = socket.socket({family}, socket.SOCK_STREAM)
+            s.bind(('{addr}', 0))
+            s.listen(5)
+            with open('{testfn}', 'w') as f:
+                f.write(str(s.getsockname()[:2]))
+            time.sleep(60)
+            """)
+
+        udp_template = textwrap.dedent("""
+            import socket, time
+            s = socket.socket({family}, socket.SOCK_DGRAM)
+            s.bind(('{addr}', 0))
+            with open('{testfn}', 'w') as f:
+                f.write(str(s.getsockname()[:2]))
+            time.sleep(60)
+            """)
+
+        # must be relative on Windows
+        testfile = os.path.basename(self.get_testfn(dir=os.getcwd()))
+        tcp4_template = tcp_template.format(
+            family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
+        udp4_template = udp_template.format(
+            family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
+        tcp6_template = tcp_template.format(
+            family=int(AF_INET6), addr="::1", testfn=testfile)
+        udp6_template = udp_template.format(
+            family=int(AF_INET6), addr="::1", testfn=testfile)
+
+        # launch various subprocess instantiating a socket of various
+        # families and types to enrich psutil results
+        tcp4_proc = self.pyrun(tcp4_template)
+        tcp4_addr = eval(wait_for_file(testfile, delete=True))
+        udp4_proc = self.pyrun(udp4_template)
+        udp4_addr = eval(wait_for_file(testfile, delete=True))
+        if supports_ipv6():
+            tcp6_proc = self.pyrun(tcp6_template)
+            tcp6_addr = eval(wait_for_file(testfile, delete=True))
+            udp6_proc = self.pyrun(udp6_template)
+            udp6_addr = eval(wait_for_file(testfile, delete=True))
+        else:
+            tcp6_proc = None
+            udp6_proc = None
+            tcp6_addr = None
+            udp6_addr = None
+
+        for p in thisproc.children():
+            cons = p.connections()
+            self.assertEqual(len(cons), 1)
+            for conn in cons:
+                # TCP v4
+                if p.pid == tcp4_proc.pid:
+                    check_conn(p, conn, AF_INET, SOCK_STREAM, tcp4_addr, (),
+                               psutil.CONN_LISTEN,
+                               ("all", "inet", "inet4", "tcp", "tcp4"))
+                # UDP v4
+                elif p.pid == udp4_proc.pid:
+                    check_conn(p, conn, AF_INET, SOCK_DGRAM, udp4_addr, (),
+                               psutil.CONN_NONE,
+                               ("all", "inet", "inet4", "udp", "udp4"))
+                # TCP v6
+                elif p.pid == getattr(tcp6_proc, "pid", None):
+                    check_conn(p, conn, AF_INET6, SOCK_STREAM, tcp6_addr, (),
+                               psutil.CONN_LISTEN,
+                               ("all", "inet", "inet6", "tcp", "tcp6"))
+                # UDP v6
+                elif p.pid == getattr(udp6_proc, "pid", None):
+                    check_conn(p, conn, AF_INET6, SOCK_DGRAM, udp6_addr, (),
+                               psutil.CONN_NONE,
+                               ("all", "inet", "inet6", "udp", "udp6"))
+
+    def test_count(self):
+        with create_sockets():
+            # tcp
+            cons = thisproc.connections(kind='tcp')
+            self.assertEqual(len(cons), 2 if supports_ipv6() else 1)
+            for conn in cons:
+                self.assertIn(conn.family, (AF_INET, AF_INET6))
+                self.assertEqual(conn.type, SOCK_STREAM)
+            # tcp4
+            cons = thisproc.connections(kind='tcp4')
+            self.assertEqual(len(cons), 1)
+            self.assertEqual(cons[0].family, AF_INET)
+            self.assertEqual(cons[0].type, SOCK_STREAM)
+            # tcp6
+            if supports_ipv6():
+                cons = thisproc.connections(kind='tcp6')
+                self.assertEqual(len(cons), 1)
+                self.assertEqual(cons[0].family, AF_INET6)
+                self.assertEqual(cons[0].type, SOCK_STREAM)
+            # udp
+            cons = thisproc.connections(kind='udp')
+            self.assertEqual(len(cons), 2 if supports_ipv6() else 1)
+            for conn in cons:
+                self.assertIn(conn.family, (AF_INET, AF_INET6))
+                self.assertEqual(conn.type, SOCK_DGRAM)
+            # udp4
+            cons = thisproc.connections(kind='udp4')
+            self.assertEqual(len(cons), 1)
+            self.assertEqual(cons[0].family, AF_INET)
+            self.assertEqual(cons[0].type, SOCK_DGRAM)
+            # udp6
+            if supports_ipv6():
+                cons = thisproc.connections(kind='udp6')
+                self.assertEqual(len(cons), 1)
+                self.assertEqual(cons[0].family, AF_INET6)
+                self.assertEqual(cons[0].type, SOCK_DGRAM)
+            # inet
+            cons = thisproc.connections(kind='inet')
+            self.assertEqual(len(cons), 4 if supports_ipv6() else 2)
+            for conn in cons:
+                self.assertIn(conn.family, (AF_INET, AF_INET6))
+                self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))
+            # inet6
+            if supports_ipv6():
+                cons = thisproc.connections(kind='inet6')
+                self.assertEqual(len(cons), 2)
+                for conn in cons:
+                    self.assertEqual(conn.family, AF_INET6)
+                    self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))
+            # Skipped on BSD becayse by default the Python process
+            # creates a UNIX socket to '/var/run/log'.
+            if HAS_CONNECTIONS_UNIX and not (FREEBSD or NETBSD):
+                cons = thisproc.connections(kind='unix')
+                self.assertEqual(len(cons), 3)
+                for conn in cons:
+                    self.assertEqual(conn.family, AF_UNIX)
+                    self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))
+
+
+@unittest.skipIf(SKIP_SYSCONS, "requires root")
+class TestSystemWideConnections(_ConnTestCase):
+    """Tests for net_connections()."""
+
+    def test_it(self):
+        def check(cons, families, types_):
+            for conn in cons:
+                self.assertIn(conn.family, families, msg=conn)
+                if conn.family != AF_UNIX:
+                    self.assertIn(conn.type, types_, msg=conn)
+                self.check_connection_ntuple(conn)
+
+        with create_sockets():
+            from psutil._common import conn_tmap
+            for kind, groups in conn_tmap.items():
+                # XXX: SunOS does not retrieve UNIX sockets.
+                if kind == 'unix' and not HAS_CONNECTIONS_UNIX:
+                    continue
+                families, types_ = groups
+                cons = psutil.net_connections(kind)
+                self.assertEqual(len(cons), len(set(cons)))
+                check(cons, families, types_)
+
+    # See: https://travis-ci.org/giampaolo/psutil/jobs/237566297
+    @unittest.skipIf(MACOS and TRAVIS, "unreliable on MACOS + TRAVIS")
+    @retry_on_failure()
+    def test_multi_sockets_procs(self):
+        # Creates multiple sub processes, each creating different
+        # sockets. For each process check that proc.connections()
+        # and net_connections() return the same results.
+        # This is done mainly to check whether net_connections()'s
+        # pid is properly set, see:
+        # https://github.com/giampaolo/psutil/issues/1013
+        with create_sockets() as socks:
+            expected = len(socks)
+        pids = []
+        times = 10
+        fnames = []
+        for i in range(times):
+            fname = self.get_testfn()
+            fnames.append(fname)
+            src = textwrap.dedent("""\
+                import time, os
+                from psutil.tests import create_sockets
+                with create_sockets():
+                    with open(r'%s', 'w') as f:
+                        f.write("hello")
+                    time.sleep(60)
+                """ % fname)
+            sproc = self.pyrun(src)
+            pids.append(sproc.pid)
+
+        # sync
+        for fname in fnames:
+            wait_for_file(fname)
+
+        syscons = [x for x in psutil.net_connections(kind='all') if x.pid
+                   in pids]
+        for pid in pids:
+            self.assertEqual(len([x for x in syscons if x.pid == pid]),
+                             expected)
+            p = psutil.Process(pid)
+            self.assertEqual(len(p.connections('all')), expected)
+
+
+class TestMisc(PsutilTestCase):
+
+    def test_connection_constants(self):
+        ints = []
+        strs = []
+        for name in dir(psutil):
+            if name.startswith('CONN_'):
+                num = getattr(psutil, name)
+                str_ = str(num)
+                assert str_.isupper(), str_
+                self.assertNotIn(str, strs)
+                self.assertNotIn(num, ints)
+                ints.append(num)
+                strs.append(str_)
+        if SUNOS:
+            psutil.CONN_IDLE
+            psutil.CONN_BOUND
+        if WINDOWS:
+            psutil.CONN_DELETE_TCB
+
+
+if __name__ == '__main__':
+    from psutil.tests.runner import run_from_name
+    run_from_name(__file__)