diff env/lib/python3.7/site-packages/psutil/tests/test_misc.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/psutil/tests/test_misc.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1065 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-Miscellaneous tests.
-import ast
-import collections
-import contextlib
-import errno
-import json
-import os
-import pickle
-import socket
-import stat
-from psutil import FREEBSD
-from psutil import LINUX
-from psutil import NETBSD
-from psutil import POSIX
-from psutil import WINDOWS
-from psutil._common import memoize
-from psutil._common import memoize_when_activated
-from psutil._common import supports_ipv6
-from psutil._common import wrap_numbers
-from psutil._common import open_text
-from psutil._common import open_binary
-from psutil._compat import PY3
-from psutil.tests import APPVEYOR
-from psutil.tests import bind_socket
-from psutil.tests import bind_unix_socket
-from psutil.tests import call_until
-from psutil.tests import chdir
-from psutil.tests import CI_TESTING
-from psutil.tests import create_proc_children_pair
-from psutil.tests import create_sockets
-from psutil.tests import create_zombie_proc
-from psutil.tests import DEVNULL
-from psutil.tests import get_free_port
-from psutil.tests import get_test_subprocess
-from psutil.tests import HAS_BATTERY
-from psutil.tests import HAS_CONNECTIONS_UNIX
-from psutil.tests import HAS_MEMORY_MAPS
-from psutil.tests import HAS_NET_IO_COUNTERS
-from psutil.tests import HAS_SENSORS_BATTERY
-from psutil.tests import HAS_SENSORS_FANS
-from psutil.tests import HAS_SENSORS_TEMPERATURES
-from psutil.tests import import_module_by_path
-from psutil.tests import is_namedtuple
-from psutil.tests import mock
-from psutil.tests import PYTHON_EXE
-from psutil.tests import reap_children
-from psutil.tests import reload_module
-from psutil.tests import retry
-from psutil.tests import ROOT_DIR
-from psutil.tests import safe_mkdir
-from psutil.tests import safe_rmpath
-from psutil.tests import SCRIPTS_DIR
-from psutil.tests import sh
-from psutil.tests import tcp_socketpair
-from psutil.tests import TESTFN
-from psutil.tests import TOX
-from psutil.tests import TRAVIS
-from psutil.tests import unittest
-from psutil.tests import unix_socket_path
-from psutil.tests import unix_socketpair
-from psutil.tests import wait_for_file
-from psutil.tests import wait_for_pid
-import psutil
-import psutil.tests
-# ===================================================================
-# --- Misc / generic tests.
-# ===================================================================
-class TestMisc(unittest.TestCase):
-    def test_process__repr__(self, func=repr):
-        p = psutil.Process()
-        r = func(p)
-        self.assertIn("psutil.Process", r)
-        self.assertIn("pid=%s" % p.pid, r)
-        self.assertIn("name=", r)
-        self.assertIn(p.name(), r)
-        with mock.patch.object(psutil.Process, "name",
-                               side_effect=psutil.ZombieProcess(os.getpid())):
-            p = psutil.Process()
-            r = func(p)
-            self.assertIn("pid=%s" % p.pid, r)
-            self.assertIn("zombie", r)
-            self.assertNotIn("name=", r)
-        with mock.patch.object(psutil.Process, "name",
-                               side_effect=psutil.NoSuchProcess(os.getpid())):
-            p = psutil.Process()
-            r = func(p)
-            self.assertIn("pid=%s" % p.pid, r)
-            self.assertIn("terminated", r)
-            self.assertNotIn("name=", r)
-        with mock.patch.object(psutil.Process, "name",
-                               side_effect=psutil.AccessDenied(os.getpid())):
-            p = psutil.Process()
-            r = func(p)
-            self.assertIn("pid=%s" % p.pid, r)
-            self.assertNotIn("name=", r)
-    def test_process__str__(self):
-        self.test_process__repr__(func=str)
-    def test_no_such_process__repr__(self, func=repr):
-        self.assertEqual(
-            repr(psutil.NoSuchProcess(321)),
-            "psutil.NoSuchProcess process no longer exists (pid=321)")
-        self.assertEqual(
-            repr(psutil.NoSuchProcess(321, name='foo')),
-            "psutil.NoSuchProcess process no longer exists (pid=321, "
-            "name='foo')")
-        self.assertEqual(
-            repr(psutil.NoSuchProcess(321, msg='foo')),
-            "psutil.NoSuchProcess foo")
-    def test_zombie_process__repr__(self, func=repr):
-        self.assertEqual(
-            repr(psutil.ZombieProcess(321)),
-            "psutil.ZombieProcess process still exists but it's a zombie "
-            "(pid=321)")
-        self.assertEqual(
-            repr(psutil.ZombieProcess(321, name='foo')),
-            "psutil.ZombieProcess process still exists but it's a zombie "
-            "(pid=321, name='foo')")
-        self.assertEqual(
-            repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
-            "psutil.ZombieProcess process still exists but it's a zombie "
-            "(pid=321, name='foo', ppid=1)")
-        self.assertEqual(
-            repr(psutil.ZombieProcess(321, msg='foo')),
-            "psutil.ZombieProcess foo")
-    def test_access_denied__repr__(self, func=repr):
-        self.assertEqual(
-            repr(psutil.AccessDenied(321)),
-            "psutil.AccessDenied (pid=321)")
-        self.assertEqual(
-            repr(psutil.AccessDenied(321, name='foo')),
-            "psutil.AccessDenied (pid=321, name='foo')")
-        self.assertEqual(
-            repr(psutil.AccessDenied(321, msg='foo')),
-            "psutil.AccessDenied foo")
-    def test_timeout_expired__repr__(self, func=repr):
-        self.assertEqual(
-            repr(psutil.TimeoutExpired(321)),
-            "psutil.TimeoutExpired timeout after 321 seconds")
-        self.assertEqual(
-            repr(psutil.TimeoutExpired(321, pid=111)),
-            "psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
-        self.assertEqual(
-            repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
-            "psutil.TimeoutExpired timeout after 321 seconds "
-            "(pid=111, name='foo')")
-    def test_process__eq__(self):
-        p1 = psutil.Process()
-        p2 = psutil.Process()
-        self.assertEqual(p1, p2)
-        p2._ident = (0, 0)
-        self.assertNotEqual(p1, p2)
-        self.assertNotEqual(p1, 'foo')
-    def test_process__hash__(self):
-        s = set([psutil.Process(), psutil.Process()])
-        self.assertEqual(len(s), 1)
-    def test__all__(self):
-        dir_psutil = dir(psutil)
-        for name in dir_psutil:
-            if name in ('callable', 'error', 'namedtuple', 'tests',
-                        'long', 'test', 'NUM_CPUS', 'BOOT_TIME',
-                        'TOTAL_PHYMEM', 'PermissionError',
-                        'ProcessLookupError'):
-                continue
-            if not name.startswith('_'):
-                try:
-                    __import__(name)
-                except ImportError:
-                    if name not in psutil.__all__:
-                        fun = getattr(psutil, name)
-                        if fun is None:
-                            continue
-                        if (fun.__doc__ is not None and
-                                'deprecated' not in fun.__doc__.lower()):
-                            self.fail('%r not in psutil.__all__' % name)
-        # Import 'star' will break if __all__ is inconsistent, see:
-        # https://github.com/giampaolo/psutil/issues/656
-        # Can't do `from psutil import *` as it won't work on python 3
-        # so we simply iterate over __all__.
-        for name in psutil.__all__:
-            self.assertIn(name, dir_psutil)
-    def test_version(self):
-        self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
-                         psutil.__version__)
-    def test_process_as_dict_no_new_names(self):
-        # See https://github.com/giampaolo/psutil/issues/813
-        p = psutil.Process()
-        p.foo = '1'
-        self.assertNotIn('foo', p.as_dict())
-    def test_memoize(self):
-        @memoize
-        def foo(*args, **kwargs):
-            "foo docstring"
-            calls.append(None)
-            return (args, kwargs)
-        calls = []
-        # no args
-        for x in range(2):
-            ret = foo()
-            expected = ((), {})
-            self.assertEqual(ret, expected)
-            self.assertEqual(len(calls), 1)
-        # with args
-        for x in range(2):
-            ret = foo(1)
-            expected = ((1, ), {})
-            self.assertEqual(ret, expected)
-            self.assertEqual(len(calls), 2)
-        # with args + kwargs
-        for x in range(2):
-            ret = foo(1, bar=2)
-            expected = ((1, ), {'bar': 2})
-            self.assertEqual(ret, expected)
-            self.assertEqual(len(calls), 3)
-        # clear cache
-        foo.cache_clear()
-        ret = foo()
-        expected = ((), {})
-        self.assertEqual(ret, expected)
-        self.assertEqual(len(calls), 4)
-        # docstring
-        self.assertEqual(foo.__doc__, "foo docstring")
-    def test_memoize_when_activated(self):
-        class Foo:
-            @memoize_when_activated
-            def foo(self):
-                calls.append(None)
-        f = Foo()
-        calls = []
-        f.foo()
-        f.foo()
-        self.assertEqual(len(calls), 2)
-        # activate
-        calls = []
-        f.foo.cache_activate(f)
-        f.foo()
-        f.foo()
-        self.assertEqual(len(calls), 1)
-        # deactivate
-        calls = []
-        f.foo.cache_deactivate(f)
-        f.foo()
-        f.foo()
-        self.assertEqual(len(calls), 2)
-    def test_parse_environ_block(self):
-        from psutil._common import parse_environ_block
-        def k(s):
-            return s.upper() if WINDOWS else s
-        self.assertEqual(parse_environ_block("a=1\0"),
-                         {k("a"): "1"})
-        self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
-                         {k("a"): "1", k("b"): "2"})
-        self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
-                         {k("a"): "1", k("b"): ""})
-        # ignore everything after \0\0
-        self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
-                         {k("a"): "1", k("b"): "2"})
-        # ignore everything that is not an assignment
-        self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
-        self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
-        # do not fail if the block is incomplete
-        self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
-    def test_supports_ipv6(self):
-        self.addCleanup(supports_ipv6.cache_clear)
-        if supports_ipv6():
-            with mock.patch('psutil._common.socket') as s:
-                s.has_ipv6 = False
-                supports_ipv6.cache_clear()
-                assert not supports_ipv6()
-            supports_ipv6.cache_clear()
-            with mock.patch('psutil._common.socket.socket',
-                            side_effect=socket.error) as s:
-                assert not supports_ipv6()
-                assert s.called
-            supports_ipv6.cache_clear()
-            with mock.patch('psutil._common.socket.socket',
-                            side_effect=socket.gaierror) as s:
-                assert not supports_ipv6()
-                supports_ipv6.cache_clear()
-                assert s.called
-            supports_ipv6.cache_clear()
-            with mock.patch('psutil._common.socket.socket.bind',
-                            side_effect=socket.gaierror) as s:
-                assert not supports_ipv6()
-                supports_ipv6.cache_clear()
-                assert s.called
-        else:
-            with self.assertRaises(Exception):
-                sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
-                sock.bind(("::1", 0))
-    def test_isfile_strict(self):
-        from psutil._common import isfile_strict
-        this_file = os.path.abspath(__file__)
-        assert isfile_strict(this_file)
-        assert not isfile_strict(os.path.dirname(this_file))
-        with mock.patch('psutil._common.os.stat',
-                        side_effect=OSError(errno.EPERM, "foo")):
-            self.assertRaises(OSError, isfile_strict, this_file)
-        with mock.patch('psutil._common.os.stat',
-                        side_effect=OSError(errno.EACCES, "foo")):
-            self.assertRaises(OSError, isfile_strict, this_file)
-        with mock.patch('psutil._common.os.stat',
-                        side_effect=OSError(errno.EINVAL, "foo")):
-            assert not isfile_strict(this_file)
-        with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
-            assert not isfile_strict(this_file)
-    def test_serialization(self):
-        def check(ret):
-            if json is not None:
-                json.loads(json.dumps(ret))
-            a = pickle.dumps(ret)
-            b = pickle.loads(a)
-            self.assertEqual(ret, b)
-        check(psutil.Process().as_dict())
-        check(psutil.virtual_memory())
-        check(psutil.swap_memory())
-        check(psutil.cpu_times())
-        check(psutil.cpu_times_percent(interval=0))
-        check(psutil.net_io_counters())
-        if LINUX and not os.path.exists('/proc/diskstats'):
-            pass
-        else:
-            if not APPVEYOR:
-                check(psutil.disk_io_counters())
-        check(psutil.disk_partitions())
-        check(psutil.disk_usage(os.getcwd()))
-        check(psutil.users())
-    def test_setup_script(self):
-        setup_py = os.path.join(ROOT_DIR, 'setup.py')
-        if TRAVIS and not os.path.exists(setup_py):
-            return self.skipTest("can't find setup.py")
-        module = import_module_by_path(setup_py)
-        self.assertRaises(SystemExit, module.setup)
-        self.assertEqual(module.get_version(), psutil.__version__)
-    def test_ad_on_process_creation(self):
-        # We are supposed to be able to instantiate Process also in case
-        # of zombie processes or access denied.
-        with mock.patch.object(psutil.Process, 'create_time',
-                               side_effect=psutil.AccessDenied) as meth:
-            psutil.Process()
-            assert meth.called
-        with mock.patch.object(psutil.Process, 'create_time',
-                               side_effect=psutil.ZombieProcess(1)) as meth:
-            psutil.Process()
-            assert meth.called
-        with mock.patch.object(psutil.Process, 'create_time',
-                               side_effect=ValueError) as meth:
-            with self.assertRaises(ValueError):
-                psutil.Process()
-            assert meth.called
-    def test_sanity_version_check(self):
-        # see: https://github.com/giampaolo/psutil/issues/564
-        with mock.patch(
-                "psutil._psplatform.cext.version", return_value="0.0.0"):
-            with self.assertRaises(ImportError) as cm:
-                reload_module(psutil)
-            self.assertIn("version conflict", str(cm.exception).lower())
-# ===================================================================
-# --- Tests for wrap_numbers() function.
-# ===================================================================
-nt = collections.namedtuple('foo', 'a b c')
-class TestWrapNumbers(unittest.TestCase):
-    def setUp(self):
-        wrap_numbers.cache_clear()
-    tearDown = setUp
-    def test_first_call(self):
-        input = {'disk1': nt(5, 5, 5)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-    def test_input_hasnt_changed(self):
-        input = {'disk1': nt(5, 5, 5)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-    def test_increase_but_no_wrap(self):
-        input = {'disk1': nt(5, 5, 5)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        input = {'disk1': nt(10, 15, 20)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        input = {'disk1': nt(20, 25, 30)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        input = {'disk1': nt(20, 25, 30)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-    def test_wrap(self):
-        # let's say 100 is the threshold
-        input = {'disk1': nt(100, 100, 100)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        # first wrap restarts from 10
-        input = {'disk1': nt(100, 100, 10)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'),
-                         {'disk1': nt(100, 100, 110)})
-        # then it remains the same
-        input = {'disk1': nt(100, 100, 10)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'),
-                         {'disk1': nt(100, 100, 110)})
-        # then it goes up
-        input = {'disk1': nt(100, 100, 90)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'),
-                         {'disk1': nt(100, 100, 190)})
-        # then it wraps again
-        input = {'disk1': nt(100, 100, 20)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'),
-                         {'disk1': nt(100, 100, 210)})
-        # and remains the same
-        input = {'disk1': nt(100, 100, 20)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'),
-                         {'disk1': nt(100, 100, 210)})
-        # now wrap another num
-        input = {'disk1': nt(50, 100, 20)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'),
-                         {'disk1': nt(150, 100, 210)})
-        # and again
-        input = {'disk1': nt(40, 100, 20)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'),
-                         {'disk1': nt(190, 100, 210)})
-        # keep it the same
-        input = {'disk1': nt(40, 100, 20)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'),
-                         {'disk1': nt(190, 100, 210)})
-    def test_changing_keys(self):
-        # Emulate a case where the second call to disk_io()
-        # (or whatever) provides a new disk, then the new disk
-        # disappears on the third call.
-        input = {'disk1': nt(5, 5, 5)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        input = {'disk1': nt(5, 5, 5),
-                 'disk2': nt(7, 7, 7)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        input = {'disk1': nt(8, 8, 8)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-    def test_changing_keys_w_wrap(self):
-        input = {'disk1': nt(50, 50, 50),
-                 'disk2': nt(100, 100, 100)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        # disk 2 wraps
-        input = {'disk1': nt(50, 50, 50),
-                 'disk2': nt(100, 100, 10)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'),
-                         {'disk1': nt(50, 50, 50),
-                          'disk2': nt(100, 100, 110)})
-        # disk 2 disappears
-        input = {'disk1': nt(50, 50, 50)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        # then it appears again; the old wrap is supposed to be
-        # gone.
-        input = {'disk1': nt(50, 50, 50),
-                 'disk2': nt(100, 100, 100)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        # remains the same
-        input = {'disk1': nt(50, 50, 50),
-                 'disk2': nt(100, 100, 100)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
-        # and then wraps again
-        input = {'disk1': nt(50, 50, 50),
-                 'disk2': nt(100, 100, 10)}
-        self.assertEqual(wrap_numbers(input, 'disk_io'),
-                         {'disk1': nt(50, 50, 50),
-                          'disk2': nt(100, 100, 110)})
-    def test_real_data(self):
-        d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
-             'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
-             'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
-             'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
-        self.assertEqual(wrap_numbers(d, 'disk_io'), d)
-        self.assertEqual(wrap_numbers(d, 'disk_io'), d)
-        # decrease this   ↓
-        d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
-             'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
-             'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
-             'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
-        out = wrap_numbers(d, 'disk_io')
-        self.assertEqual(out['nvme0n1'][0], 400)
-    # --- cache tests
-    def test_cache_first_call(self):
-        input = {'disk1': nt(5, 5, 5)}
-        wrap_numbers(input, 'disk_io')
-        cache = wrap_numbers.cache_info()
-        self.assertEqual(cache[0], {'disk_io': input})
-        self.assertEqual(cache[1], {'disk_io': {}})
-        self.assertEqual(cache[2], {'disk_io': {}})
-    def test_cache_call_twice(self):
-        input = {'disk1': nt(5, 5, 5)}
-        wrap_numbers(input, 'disk_io')
-        input = {'disk1': nt(10, 10, 10)}
-        wrap_numbers(input, 'disk_io')
-        cache = wrap_numbers.cache_info()
-        self.assertEqual(cache[0], {'disk_io': input})
-        self.assertEqual(
-            cache[1],
-            {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
-        self.assertEqual(cache[2], {'disk_io': {}})
-    def test_cache_wrap(self):
-        # let's say 100 is the threshold
-        input = {'disk1': nt(100, 100, 100)}
-        wrap_numbers(input, 'disk_io')
-        # first wrap restarts from 10
-        input = {'disk1': nt(100, 100, 10)}
-        wrap_numbers(input, 'disk_io')
-        cache = wrap_numbers.cache_info()
-        self.assertEqual(cache[0], {'disk_io': input})
-        self.assertEqual(
-            cache[1],
-            {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}})
-        self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
-        def assert_():
-            cache = wrap_numbers.cache_info()
-            self.assertEqual(
-                cache[1],
-                {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0,
-                             ('disk1', 2): 100}})
-            self.assertEqual(cache[2],
-                             {'disk_io': {'disk1': set([('disk1', 2)])}})
-        # then it remains the same
-        input = {'disk1': nt(100, 100, 10)}
-        wrap_numbers(input, 'disk_io')
-        cache = wrap_numbers.cache_info()
-        self.assertEqual(cache[0], {'disk_io': input})
-        assert_()
-        # then it goes up
-        input = {'disk1': nt(100, 100, 90)}
-        wrap_numbers(input, 'disk_io')
-        cache = wrap_numbers.cache_info()
-        self.assertEqual(cache[0], {'disk_io': input})
-        assert_()
-        # then it wraps again
-        input = {'disk1': nt(100, 100, 20)}
-        wrap_numbers(input, 'disk_io')
-        cache = wrap_numbers.cache_info()
-        self.assertEqual(cache[0], {'disk_io': input})
-        self.assertEqual(
-            cache[1],
-            {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}})
-        self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
-    def test_cache_changing_keys(self):
-        input = {'disk1': nt(5, 5, 5)}
-        wrap_numbers(input, 'disk_io')
-        input = {'disk1': nt(5, 5, 5),
-                 'disk2': nt(7, 7, 7)}
-        wrap_numbers(input, 'disk_io')
-        cache = wrap_numbers.cache_info()
-        self.assertEqual(cache[0], {'disk_io': input})
-        self.assertEqual(
-            cache[1],
-            {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
-        self.assertEqual(cache[2], {'disk_io': {}})
-    def test_cache_clear(self):
-        input = {'disk1': nt(5, 5, 5)}
-        wrap_numbers(input, 'disk_io')
-        wrap_numbers(input, 'disk_io')
-        wrap_numbers.cache_clear('disk_io')
-        self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {}))
-        wrap_numbers.cache_clear('disk_io')
-        wrap_numbers.cache_clear('?!?')
-    @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
-    def test_cache_clear_public_apis(self):
-        if not psutil.disk_io_counters() or not psutil.net_io_counters():
-            return self.skipTest("no disks or NICs available")
-        psutil.disk_io_counters()
-        psutil.net_io_counters()
-        caches = wrap_numbers.cache_info()
-        for cache in caches:
-            self.assertIn('psutil.disk_io_counters', cache)
-            self.assertIn('psutil.net_io_counters', cache)
-        psutil.disk_io_counters.cache_clear()
-        caches = wrap_numbers.cache_info()
-        for cache in caches:
-            self.assertIn('psutil.net_io_counters', cache)
-            self.assertNotIn('psutil.disk_io_counters', cache)
-        psutil.net_io_counters.cache_clear()
-        caches = wrap_numbers.cache_info()
-        self.assertEqual(caches, ({}, {}, {}))
-# ===================================================================
-# --- Example script tests
-# ===================================================================
-@unittest.skipIf(TOX, "can't test on TOX")
-# See: https://travis-ci.org/giampaolo/psutil/jobs/295224806
-@unittest.skipIf(TRAVIS and not os.path.exists(SCRIPTS_DIR),
-                 "can't locate scripts directory")
-class TestScripts(unittest.TestCase):
-    """Tests for scripts in the "scripts" directory."""
-    @staticmethod
-    def assert_stdout(exe, *args, **kwargs):
-        exe = '%s' % os.path.join(SCRIPTS_DIR, exe)
-        cmd = [PYTHON_EXE, exe]
-        for arg in args:
-            cmd.append(arg)
-        try:
-            out = sh(cmd, **kwargs).strip()
-        except RuntimeError as err:
-            if 'AccessDenied' in str(err):
-                return str(err)
-            else:
-                raise
-        assert out, out
-        return out
-    @staticmethod
-    def assert_syntax(exe, args=None):
-        exe = os.path.join(SCRIPTS_DIR, exe)
-        if PY3:
-            f = open(exe, 'rt', encoding='utf8')
-        else:
-            f = open(exe, 'rt')
-        with f:
-            src = f.read()
-        ast.parse(src)
-    def test_coverage(self):
-        # make sure all example scripts have a test method defined
-        meths = dir(self)
-        for name in os.listdir(SCRIPTS_DIR):
-            if name.endswith('.py'):
-                if 'test_' + os.path.splitext(name)[0] not in meths:
-                    # self.assert_stdout(name)
-                    self.fail('no test defined for %r script'
-                              % os.path.join(SCRIPTS_DIR, name))
-    @unittest.skipIf(not POSIX, "POSIX only")
-    def test_executable(self):
-        for name in os.listdir(SCRIPTS_DIR):
-            if name.endswith('.py'):
-                path = os.path.join(SCRIPTS_DIR, name)
-                if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
-                    self.fail('%r is not executable' % path)
-    def test_disk_usage(self):
-        self.assert_stdout('disk_usage.py')
-    def test_free(self):
-        self.assert_stdout('free.py')
-    def test_meminfo(self):
-        self.assert_stdout('meminfo.py')
-    def test_procinfo(self):
-        self.assert_stdout('procinfo.py', str(os.getpid()))
-    @unittest.skipIf(CI_TESTING and not psutil.users(), "no users")
-    def test_who(self):
-        self.assert_stdout('who.py')
-    def test_ps(self):
-        self.assert_stdout('ps.py')
-    def test_pstree(self):
-        self.assert_stdout('pstree.py')
-    def test_netstat(self):
-        self.assert_stdout('netstat.py')
-    # permission denied on travis
-    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
-    def test_ifconfig(self):
-        self.assert_stdout('ifconfig.py')
-    @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
-    def test_pmap(self):
-        self.assert_stdout('pmap.py', str(os.getpid()))
-    def test_procsmem(self):
-        if 'uss' not in psutil.Process().memory_full_info()._fields:
-            raise self.skipTest("not supported")
-        self.assert_stdout('procsmem.py', stderr=DEVNULL)
-    def test_killall(self):
-        self.assert_syntax('killall.py')
-    def test_nettop(self):
-        self.assert_syntax('nettop.py')
-    def test_top(self):
-        self.assert_syntax('top.py')
-    def test_iotop(self):
-        self.assert_syntax('iotop.py')
-    def test_pidof(self):
-        output = self.assert_stdout('pidof.py', psutil.Process().name())
-        self.assertIn(str(os.getpid()), output)
-    @unittest.skipIf(not WINDOWS, "WINDOWS only")
-    def test_winservices(self):
-        self.assert_stdout('winservices.py')
-    def test_cpu_distribution(self):
-        self.assert_syntax('cpu_distribution.py')
-    @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
-    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
-    def test_temperatures(self):
-        if not psutil.sensors_temperatures():
-            self.skipTest("no temperatures")
-        self.assert_stdout('temperatures.py')
-    @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
-    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
-    def test_fans(self):
-        if not psutil.sensors_fans():
-            self.skipTest("no fans")
-        self.assert_stdout('fans.py')
-    @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
-    @unittest.skipIf(not HAS_BATTERY, "no battery")
-    def test_battery(self):
-        self.assert_stdout('battery.py')
-    def test_sensors(self):
-        self.assert_stdout('sensors.py')
-# ===================================================================
-# --- Unit tests for test utilities.
-# ===================================================================
-class TestRetryDecorator(unittest.TestCase):
-    @mock.patch('time.sleep')
-    def test_retry_success(self, sleep):
-        # Fail 3 times out of 5; make sure the decorated fun returns.
-        @retry(retries=5, interval=1, logfun=None)
-        def foo():
-            while queue:
-                queue.pop()
-                1 / 0
-            return 1
-        queue = list(range(3))
-        self.assertEqual(foo(), 1)
-        self.assertEqual(sleep.call_count, 3)
-    @mock.patch('time.sleep')
-    def test_retry_failure(self, sleep):
-        # Fail 6 times out of 5; th function is supposed to raise exc.
-        @retry(retries=5, interval=1, logfun=None)
-        def foo():
-            while queue:
-                queue.pop()
-                1 / 0
-            return 1
-        queue = list(range(6))
-        self.assertRaises(ZeroDivisionError, foo)
-        self.assertEqual(sleep.call_count, 5)
-    @mock.patch('time.sleep')
-    def test_exception_arg(self, sleep):
-        @retry(exception=ValueError, interval=1)
-        def foo():
-            raise TypeError
-        self.assertRaises(TypeError, foo)
-        self.assertEqual(sleep.call_count, 0)
-    @mock.patch('time.sleep')
-    def test_no_interval_arg(self, sleep):
-        # if interval is not specified sleep is not supposed to be called
-        @retry(retries=5, interval=None, logfun=None)
-        def foo():
-            1 / 0
-        self.assertRaises(ZeroDivisionError, foo)
-        self.assertEqual(sleep.call_count, 0)
-    @mock.patch('time.sleep')
-    def test_retries_arg(self, sleep):
-        @retry(retries=5, interval=1, logfun=None)
-        def foo():
-            1 / 0
-        self.assertRaises(ZeroDivisionError, foo)
-        self.assertEqual(sleep.call_count, 5)
-    @mock.patch('time.sleep')
-    def test_retries_and_timeout_args(self, sleep):
-        self.assertRaises(ValueError, retry, retries=5, timeout=1)
-class TestSyncTestUtils(unittest.TestCase):
-    def tearDown(self):
-        safe_rmpath(TESTFN)
-    def test_wait_for_pid(self):
-        wait_for_pid(os.getpid())
-        nopid = max(psutil.pids()) + 99999
-        with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
-            self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
-    def test_wait_for_file(self):
-        with open(TESTFN, 'w') as f:
-            f.write('foo')
-        wait_for_file(TESTFN)
-        assert not os.path.exists(TESTFN)
-    def test_wait_for_file_empty(self):
-        with open(TESTFN, 'w'):
-            pass
-        wait_for_file(TESTFN, empty=True)
-        assert not os.path.exists(TESTFN)
-    def test_wait_for_file_no_file(self):
-        with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
-            self.assertRaises(IOError, wait_for_file, TESTFN)
-    def test_wait_for_file_no_delete(self):
-        with open(TESTFN, 'w') as f:
-            f.write('foo')
-        wait_for_file(TESTFN, delete=False)
-        assert os.path.exists(TESTFN)
-    def test_call_until(self):
-        ret = call_until(lambda: 1, "ret == 1")
-        self.assertEqual(ret, 1)
-class TestFSTestUtils(unittest.TestCase):
-    def setUp(self):
-        safe_rmpath(TESTFN)
-    tearDown = setUp
-    def test_open_text(self):
-        with open_text(__file__) as f:
-            self.assertEqual(f.mode, 'rt')
-    def test_open_binary(self):
-        with open_binary(__file__) as f:
-            self.assertEqual(f.mode, 'rb')
-    def test_safe_mkdir(self):
-        safe_mkdir(TESTFN)
-        assert os.path.isdir(TESTFN)
-        safe_mkdir(TESTFN)
-        assert os.path.isdir(TESTFN)
-    def test_safe_rmpath(self):
-        # test file is removed
-        open(TESTFN, 'w').close()
-        safe_rmpath(TESTFN)
-        assert not os.path.exists(TESTFN)
-        # test no exception if path does not exist
-        safe_rmpath(TESTFN)
-        # test dir is removed
-        os.mkdir(TESTFN)
-        safe_rmpath(TESTFN)
-        assert not os.path.exists(TESTFN)
-        # test other exceptions are raised
-        with mock.patch('psutil.tests.os.stat',
-                        side_effect=OSError(errno.EINVAL, "")) as m:
-            with self.assertRaises(OSError):
-                safe_rmpath(TESTFN)
-            assert m.called
-    def test_chdir(self):
-        base = os.getcwd()
-        os.mkdir(TESTFN)
-        with chdir(TESTFN):
-            self.assertEqual(os.getcwd(), os.path.join(base, TESTFN))
-        self.assertEqual(os.getcwd(), base)
-class TestProcessUtils(unittest.TestCase):
-    def test_reap_children(self):
-        subp = get_test_subprocess()
-        p = psutil.Process(subp.pid)
-        assert p.is_running()
-        reap_children()
-        assert not p.is_running()
-        assert not psutil.tests._pids_started
-        assert not psutil.tests._subprocesses_started
-    def test_create_proc_children_pair(self):
-        p1, p2 = create_proc_children_pair()
-        self.assertNotEqual(p1.pid, p2.pid)
-        assert p1.is_running()
-        assert p2.is_running()
-        children = psutil.Process().children(recursive=True)
-        self.assertEqual(len(children), 2)
-        self.assertIn(p1, children)
-        self.assertIn(p2, children)
-        self.assertEqual(p1.ppid(), os.getpid())
-        self.assertEqual(p2.ppid(), p1.pid)
-        # make sure both of them are cleaned up
-        reap_children()
-        assert not p1.is_running()
-        assert not p2.is_running()
-        assert not psutil.tests._pids_started
-        assert not psutil.tests._subprocesses_started
-    @unittest.skipIf(not POSIX, "POSIX only")
-    def test_create_zombie_proc(self):
-        zpid = create_zombie_proc()
-        self.addCleanup(reap_children, recursive=True)
-        p = psutil.Process(zpid)
-        self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
-class TestNetUtils(unittest.TestCase):
-    def bind_socket(self):
-        port = get_free_port()
-        with contextlib.closing(bind_socket(addr=('', port))) as s:
-            self.assertEqual(s.getsockname()[1], port)
-    @unittest.skipIf(not POSIX, "POSIX only")
-    def test_bind_unix_socket(self):
-        with unix_socket_path() as name:
-            sock = bind_unix_socket(name)
-            with contextlib.closing(sock):
-                self.assertEqual(sock.family, socket.AF_UNIX)
-                self.assertEqual(sock.type, socket.SOCK_STREAM)
-                self.assertEqual(sock.getsockname(), name)
-                assert os.path.exists(name)
-                assert stat.S_ISSOCK(os.stat(name).st_mode)
-        # UDP
-        with unix_socket_path() as name:
-            sock = bind_unix_socket(name, type=socket.SOCK_DGRAM)
-            with contextlib.closing(sock):
-                self.assertEqual(sock.type, socket.SOCK_DGRAM)
-    def tcp_tcp_socketpair(self):
-        addr = ("", get_free_port())
-        server, client = tcp_socketpair(socket.AF_INET, addr=addr)
-        with contextlib.closing(server):
-            with contextlib.closing(client):
-                # Ensure they are connected and the positions are
-                # correct.
-                self.assertEqual(server.getsockname(), addr)
-                self.assertEqual(client.getpeername(), addr)
-                self.assertNotEqual(client.getsockname(), addr)
-    @unittest.skipIf(not POSIX, "POSIX only")
-    @unittest.skipIf(NETBSD or FREEBSD,
-                     "/var/run/log UNIX socket opened by default")
-    def test_unix_socketpair(self):
-        p = psutil.Process()
-        num_fds = p.num_fds()
-        assert not p.connections(kind='unix')
-        with unix_socket_path() as name:
-            server, client = unix_socketpair(name)
-            try:
-                assert os.path.exists(name)
-                assert stat.S_ISSOCK(os.stat(name).st_mode)
-                self.assertEqual(p.num_fds() - num_fds, 2)
-                self.assertEqual(len(p.connections(kind='unix')), 2)
-                self.assertEqual(server.getsockname(), name)
-                self.assertEqual(client.getpeername(), name)
-            finally:
-                client.close()
-                server.close()
-    def test_create_sockets(self):
-        with create_sockets() as socks:
-            fams = collections.defaultdict(int)
-            types = collections.defaultdict(int)
-            for s in socks:
-                fams[s.family] += 1
-                # work around http://bugs.python.org/issue30204
-                types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1
-            self.assertGreaterEqual(fams[socket.AF_INET], 2)
-            if supports_ipv6():
-                self.assertGreaterEqual(fams[socket.AF_INET6], 2)
-            if POSIX and HAS_CONNECTIONS_UNIX:
-                self.assertGreaterEqual(fams[socket.AF_UNIX], 2)
-            self.assertGreaterEqual(types[socket.SOCK_STREAM], 2)
-            self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2)
-class TestOtherUtils(unittest.TestCase):
-    def test_is_namedtuple(self):
-        assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3))
-        assert not is_namedtuple(tuple())
-if __name__ == '__main__':
-    from psutil.tests.runner import run
-    run(__file__)