diff planemo/lib/python3.7/site-packages/psutil/tests/test_misc.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/psutil/tests/test_misc.py	Fri Jul 31 00:32:28 2020 -0400
@@ -0,0 +1,772 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+Miscellaneous tests.
+"""
+
+import ast
+import collections
+import errno
+import json
+import os
+import pickle
+import socket
+import stat
+
+from psutil import LINUX
+from psutil import POSIX
+from psutil import WINDOWS
+from psutil._common import memoize
+from psutil._common import memoize_when_activated
+from psutil._common import supports_ipv6
+from psutil._common import wrap_numbers
+from psutil._compat import PY3
+from psutil.tests import APPVEYOR
+from psutil.tests import CI_TESTING
+from psutil.tests import DEVNULL
+from psutil.tests import HAS_BATTERY
+from psutil.tests import HAS_MEMORY_MAPS
+from psutil.tests import HAS_NET_IO_COUNTERS
+from psutil.tests import HAS_SENSORS_BATTERY
+from psutil.tests import HAS_SENSORS_FANS
+from psutil.tests import HAS_SENSORS_TEMPERATURES
+from psutil.tests import import_module_by_path
+from psutil.tests import mock
+from psutil.tests import PsutilTestCase
+from psutil.tests import PYTHON_EXE
+from psutil.tests import reload_module
+from psutil.tests import ROOT_DIR
+from psutil.tests import SCRIPTS_DIR
+from psutil.tests import sh
+from psutil.tests import TRAVIS
+from psutil.tests import unittest
+import psutil
+import psutil.tests
+
+
+# ===================================================================
+# --- Misc / generic tests.
+# ===================================================================
+
+
+class TestMisc(PsutilTestCase):
+
+    def test_process__repr__(self, func=repr):
+        p = psutil.Process(self.spawn_testproc().pid)
+        r = func(p)
+        self.assertIn("psutil.Process", r)
+        self.assertIn("pid=%s" % p.pid, r)
+        self.assertIn("name='%s'" % p.name(), r)
+        self.assertIn("status=", r)
+        self.assertNotIn("exitcode=", r)
+        p.terminate()
+        p.wait()
+        r = func(p)
+        self.assertIn("status='terminated'", r)
+        self.assertIn("exitcode=", r)
+
+        with mock.patch.object(psutil.Process, "name",
+                               side_effect=psutil.ZombieProcess(os.getpid())):
+            p = psutil.Process()
+            r = func(p)
+            self.assertIn("pid=%s" % p.pid, r)
+            self.assertIn("status='zombie'", r)
+            self.assertNotIn("name=", r)
+        with mock.patch.object(psutil.Process, "name",
+                               side_effect=psutil.NoSuchProcess(os.getpid())):
+            p = psutil.Process()
+            r = func(p)
+            self.assertIn("pid=%s" % p.pid, r)
+            self.assertIn("terminated", r)
+            self.assertNotIn("name=", r)
+        with mock.patch.object(psutil.Process, "name",
+                               side_effect=psutil.AccessDenied(os.getpid())):
+            p = psutil.Process()
+            r = func(p)
+            self.assertIn("pid=%s" % p.pid, r)
+            self.assertNotIn("name=", r)
+
+    def test_process__str__(self):
+        self.test_process__repr__(func=str)
+
+    def test_no_such_process__repr__(self, func=repr):
+        self.assertEqual(
+            repr(psutil.NoSuchProcess(321)),
+            "psutil.NoSuchProcess process no longer exists (pid=321)")
+        self.assertEqual(
+            repr(psutil.NoSuchProcess(321, name='foo')),
+            "psutil.NoSuchProcess process no longer exists (pid=321, "
+            "name='foo')")
+        self.assertEqual(
+            repr(psutil.NoSuchProcess(321, msg='foo')),
+            "psutil.NoSuchProcess foo")
+
+    def test_zombie_process__repr__(self, func=repr):
+        self.assertEqual(
+            repr(psutil.ZombieProcess(321)),
+            "psutil.ZombieProcess process still exists but it's a zombie "
+            "(pid=321)")
+        self.assertEqual(
+            repr(psutil.ZombieProcess(321, name='foo')),
+            "psutil.ZombieProcess process still exists but it's a zombie "
+            "(pid=321, name='foo')")
+        self.assertEqual(
+            repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
+            "psutil.ZombieProcess process still exists but it's a zombie "
+            "(pid=321, name='foo', ppid=1)")
+        self.assertEqual(
+            repr(psutil.ZombieProcess(321, msg='foo')),
+            "psutil.ZombieProcess foo")
+
+    def test_access_denied__repr__(self, func=repr):
+        self.assertEqual(
+            repr(psutil.AccessDenied(321)),
+            "psutil.AccessDenied (pid=321)")
+        self.assertEqual(
+            repr(psutil.AccessDenied(321, name='foo')),
+            "psutil.AccessDenied (pid=321, name='foo')")
+        self.assertEqual(
+            repr(psutil.AccessDenied(321, msg='foo')),
+            "psutil.AccessDenied foo")
+
+    def test_timeout_expired__repr__(self, func=repr):
+        self.assertEqual(
+            repr(psutil.TimeoutExpired(321)),
+            "psutil.TimeoutExpired timeout after 321 seconds")
+        self.assertEqual(
+            repr(psutil.TimeoutExpired(321, pid=111)),
+            "psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
+        self.assertEqual(
+            repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
+            "psutil.TimeoutExpired timeout after 321 seconds "
+            "(pid=111, name='foo')")
+
+    def test_process__eq__(self):
+        p1 = psutil.Process()
+        p2 = psutil.Process()
+        self.assertEqual(p1, p2)
+        p2._ident = (0, 0)
+        self.assertNotEqual(p1, p2)
+        self.assertNotEqual(p1, 'foo')
+
+    def test_process__hash__(self):
+        s = set([psutil.Process(), psutil.Process()])
+        self.assertEqual(len(s), 1)
+
+    def test__all__(self):
+        dir_psutil = dir(psutil)
+        for name in dir_psutil:
+            if name in ('long', 'tests', 'test', 'PermissionError',
+                        'ProcessLookupError'):
+                continue
+            if not name.startswith('_'):
+                try:
+                    __import__(name)
+                except ImportError:
+                    if name not in psutil.__all__:
+                        fun = getattr(psutil, name)
+                        if fun is None:
+                            continue
+                        if (fun.__doc__ is not None and
+                                'deprecated' not in fun.__doc__.lower()):
+                            self.fail('%r not in psutil.__all__' % name)
+
+        # Import 'star' will break if __all__ is inconsistent, see:
+        # https://github.com/giampaolo/psutil/issues/656
+        # Can't do `from psutil import *` as it won't work on python 3
+        # so we simply iterate over __all__.
+        for name in psutil.__all__:
+            self.assertIn(name, dir_psutil)
+
+    def test_version(self):
+        self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
+                         psutil.__version__)
+
+    def test_process_as_dict_no_new_names(self):
+        # See https://github.com/giampaolo/psutil/issues/813
+        p = psutil.Process()
+        p.foo = '1'
+        self.assertNotIn('foo', p.as_dict())
+
+    def test_memoize(self):
+        @memoize
+        def foo(*args, **kwargs):
+            "foo docstring"
+            calls.append(None)
+            return (args, kwargs)
+
+        calls = []
+        # no args
+        for x in range(2):
+            ret = foo()
+            expected = ((), {})
+            self.assertEqual(ret, expected)
+            self.assertEqual(len(calls), 1)
+        # with args
+        for x in range(2):
+            ret = foo(1)
+            expected = ((1, ), {})
+            self.assertEqual(ret, expected)
+            self.assertEqual(len(calls), 2)
+        # with args + kwargs
+        for x in range(2):
+            ret = foo(1, bar=2)
+            expected = ((1, ), {'bar': 2})
+            self.assertEqual(ret, expected)
+            self.assertEqual(len(calls), 3)
+        # clear cache
+        foo.cache_clear()
+        ret = foo()
+        expected = ((), {})
+        self.assertEqual(ret, expected)
+        self.assertEqual(len(calls), 4)
+        # docstring
+        self.assertEqual(foo.__doc__, "foo docstring")
+
+    def test_memoize_when_activated(self):
+        class Foo:
+
+            @memoize_when_activated
+            def foo(self):
+                calls.append(None)
+
+        f = Foo()
+        calls = []
+        f.foo()
+        f.foo()
+        self.assertEqual(len(calls), 2)
+
+        # activate
+        calls = []
+        f.foo.cache_activate(f)
+        f.foo()
+        f.foo()
+        self.assertEqual(len(calls), 1)
+
+        # deactivate
+        calls = []
+        f.foo.cache_deactivate(f)
+        f.foo()
+        f.foo()
+        self.assertEqual(len(calls), 2)
+
+    def test_parse_environ_block(self):
+        from psutil._common import parse_environ_block
+
+        def k(s):
+            return s.upper() if WINDOWS else s
+
+        self.assertEqual(parse_environ_block("a=1\0"),
+                         {k("a"): "1"})
+        self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
+                         {k("a"): "1", k("b"): "2"})
+        self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
+                         {k("a"): "1", k("b"): ""})
+        # ignore everything after \0\0
+        self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
+                         {k("a"): "1", k("b"): "2"})
+        # ignore everything that is not an assignment
+        self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
+        self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
+        # do not fail if the block is incomplete
+        self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
+
+    def test_supports_ipv6(self):
+        self.addCleanup(supports_ipv6.cache_clear)
+        if supports_ipv6():
+            with mock.patch('psutil._common.socket') as s:
+                s.has_ipv6 = False
+                supports_ipv6.cache_clear()
+                assert not supports_ipv6()
+
+            supports_ipv6.cache_clear()
+            with mock.patch('psutil._common.socket.socket',
+                            side_effect=socket.error) as s:
+                assert not supports_ipv6()
+                assert s.called
+
+            supports_ipv6.cache_clear()
+            with mock.patch('psutil._common.socket.socket',
+                            side_effect=socket.gaierror) as s:
+                assert not supports_ipv6()
+                supports_ipv6.cache_clear()
+                assert s.called
+
+            supports_ipv6.cache_clear()
+            with mock.patch('psutil._common.socket.socket.bind',
+                            side_effect=socket.gaierror) as s:
+                assert not supports_ipv6()
+                supports_ipv6.cache_clear()
+                assert s.called
+        else:
+            with self.assertRaises(Exception):
+                sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+                try:
+                    sock.bind(("::1", 0))
+                finally:
+                    sock.close()
+
+    def test_isfile_strict(self):
+        from psutil._common import isfile_strict
+        this_file = os.path.abspath(__file__)
+        assert isfile_strict(this_file)
+        assert not isfile_strict(os.path.dirname(this_file))
+        with mock.patch('psutil._common.os.stat',
+                        side_effect=OSError(errno.EPERM, "foo")):
+            self.assertRaises(OSError, isfile_strict, this_file)
+        with mock.patch('psutil._common.os.stat',
+                        side_effect=OSError(errno.EACCES, "foo")):
+            self.assertRaises(OSError, isfile_strict, this_file)
+        with mock.patch('psutil._common.os.stat',
+                        side_effect=OSError(errno.ENOENT, "foo")):
+            assert not isfile_strict(this_file)
+        with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
+            assert not isfile_strict(this_file)
+
+    def test_serialization(self):
+        def check(ret):
+            if json is not None:
+                json.loads(json.dumps(ret))
+            a = pickle.dumps(ret)
+            b = pickle.loads(a)
+            self.assertEqual(ret, b)
+
+        check(psutil.Process().as_dict())
+        check(psutil.virtual_memory())
+        check(psutil.swap_memory())
+        check(psutil.cpu_times())
+        check(psutil.cpu_times_percent(interval=0))
+        check(psutil.net_io_counters())
+        if LINUX and not os.path.exists('/proc/diskstats'):
+            pass
+        else:
+            if not APPVEYOR:
+                check(psutil.disk_io_counters())
+        check(psutil.disk_partitions())
+        check(psutil.disk_usage(os.getcwd()))
+        check(psutil.users())
+
+    def test_setup_script(self):
+        setup_py = os.path.join(ROOT_DIR, 'setup.py')
+        if CI_TESTING and not os.path.exists(setup_py):
+            return self.skipTest("can't find setup.py")
+        module = import_module_by_path(setup_py)
+        self.assertRaises(SystemExit, module.setup)
+        self.assertEqual(module.get_version(), psutil.__version__)
+
+    def test_ad_on_process_creation(self):
+        # We are supposed to be able to instantiate Process also in case
+        # of zombie processes or access denied.
+        with mock.patch.object(psutil.Process, 'create_time',
+                               side_effect=psutil.AccessDenied) as meth:
+            psutil.Process()
+            assert meth.called
+        with mock.patch.object(psutil.Process, 'create_time',
+                               side_effect=psutil.ZombieProcess(1)) as meth:
+            psutil.Process()
+            assert meth.called
+        with mock.patch.object(psutil.Process, 'create_time',
+                               side_effect=ValueError) as meth:
+            with self.assertRaises(ValueError):
+                psutil.Process()
+            assert meth.called
+
+    def test_sanity_version_check(self):
+        # see: https://github.com/giampaolo/psutil/issues/564
+        with mock.patch(
+                "psutil._psplatform.cext.version", return_value="0.0.0"):
+            with self.assertRaises(ImportError) as cm:
+                reload_module(psutil)
+            self.assertIn("version conflict", str(cm.exception).lower())
+
+
+# ===================================================================
+# --- Tests for wrap_numbers() function.
+# ===================================================================
+
+
+nt = collections.namedtuple('foo', 'a b c')
+
+
+class TestWrapNumbers(PsutilTestCase):
+
+    def setUp(self):
+        wrap_numbers.cache_clear()
+
+    tearDown = setUp
+
+    def test_first_call(self):
+        input = {'disk1': nt(5, 5, 5)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+
+    def test_input_hasnt_changed(self):
+        input = {'disk1': nt(5, 5, 5)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+
+    def test_increase_but_no_wrap(self):
+        input = {'disk1': nt(5, 5, 5)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+        input = {'disk1': nt(10, 15, 20)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+        input = {'disk1': nt(20, 25, 30)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+        input = {'disk1': nt(20, 25, 30)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+
+    def test_wrap(self):
+        # let's say 100 is the threshold
+        input = {'disk1': nt(100, 100, 100)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+        # first wrap restarts from 10
+        input = {'disk1': nt(100, 100, 10)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'),
+                         {'disk1': nt(100, 100, 110)})
+        # then it remains the same
+        input = {'disk1': nt(100, 100, 10)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'),
+                         {'disk1': nt(100, 100, 110)})
+        # then it goes up
+        input = {'disk1': nt(100, 100, 90)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'),
+                         {'disk1': nt(100, 100, 190)})
+        # then it wraps again
+        input = {'disk1': nt(100, 100, 20)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'),
+                         {'disk1': nt(100, 100, 210)})
+        # and remains the same
+        input = {'disk1': nt(100, 100, 20)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'),
+                         {'disk1': nt(100, 100, 210)})
+        # now wrap another num
+        input = {'disk1': nt(50, 100, 20)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'),
+                         {'disk1': nt(150, 100, 210)})
+        # and again
+        input = {'disk1': nt(40, 100, 20)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'),
+                         {'disk1': nt(190, 100, 210)})
+        # keep it the same
+        input = {'disk1': nt(40, 100, 20)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'),
+                         {'disk1': nt(190, 100, 210)})
+
+    def test_changing_keys(self):
+        # Emulate a case where the second call to disk_io()
+        # (or whatever) provides a new disk, then the new disk
+        # disappears on the third call.
+        input = {'disk1': nt(5, 5, 5)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+        input = {'disk1': nt(5, 5, 5),
+                 'disk2': nt(7, 7, 7)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+        input = {'disk1': nt(8, 8, 8)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+
+    def test_changing_keys_w_wrap(self):
+        input = {'disk1': nt(50, 50, 50),
+                 'disk2': nt(100, 100, 100)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+        # disk 2 wraps
+        input = {'disk1': nt(50, 50, 50),
+                 'disk2': nt(100, 100, 10)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'),
+                         {'disk1': nt(50, 50, 50),
+                          'disk2': nt(100, 100, 110)})
+        # disk 2 disappears
+        input = {'disk1': nt(50, 50, 50)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+
+        # then it appears again; the old wrap is supposed to be
+        # gone.
+        input = {'disk1': nt(50, 50, 50),
+                 'disk2': nt(100, 100, 100)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+        # remains the same
+        input = {'disk1': nt(50, 50, 50),
+                 'disk2': nt(100, 100, 100)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'), input)
+        # and then wraps again
+        input = {'disk1': nt(50, 50, 50),
+                 'disk2': nt(100, 100, 10)}
+        self.assertEqual(wrap_numbers(input, 'disk_io'),
+                         {'disk1': nt(50, 50, 50),
+                          'disk2': nt(100, 100, 110)})
+
+    def test_real_data(self):
+        d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
+             'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
+             'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
+             'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
+        self.assertEqual(wrap_numbers(d, 'disk_io'), d)
+        self.assertEqual(wrap_numbers(d, 'disk_io'), d)
+        # decrease this   ↓
+        d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
+             'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
+             'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
+             'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
+        out = wrap_numbers(d, 'disk_io')
+        self.assertEqual(out['nvme0n1'][0], 400)
+
+    # --- cache tests
+
+    def test_cache_first_call(self):
+        input = {'disk1': nt(5, 5, 5)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        self.assertEqual(cache[0], {'disk_io': input})
+        self.assertEqual(cache[1], {'disk_io': {}})
+        self.assertEqual(cache[2], {'disk_io': {}})
+
+    def test_cache_call_twice(self):
+        input = {'disk1': nt(5, 5, 5)}
+        wrap_numbers(input, 'disk_io')
+        input = {'disk1': nt(10, 10, 10)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        self.assertEqual(cache[0], {'disk_io': input})
+        self.assertEqual(
+            cache[1],
+            {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
+        self.assertEqual(cache[2], {'disk_io': {}})
+
+    def test_cache_wrap(self):
+        # let's say 100 is the threshold
+        input = {'disk1': nt(100, 100, 100)}
+        wrap_numbers(input, 'disk_io')
+
+        # first wrap restarts from 10
+        input = {'disk1': nt(100, 100, 10)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        self.assertEqual(cache[0], {'disk_io': input})
+        self.assertEqual(
+            cache[1],
+            {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}})
+        self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
+
+        def assert_():
+            cache = wrap_numbers.cache_info()
+            self.assertEqual(
+                cache[1],
+                {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0,
+                             ('disk1', 2): 100}})
+            self.assertEqual(cache[2],
+                             {'disk_io': {'disk1': set([('disk1', 2)])}})
+
+        # then it remains the same
+        input = {'disk1': nt(100, 100, 10)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        self.assertEqual(cache[0], {'disk_io': input})
+        assert_()
+
+        # then it goes up
+        input = {'disk1': nt(100, 100, 90)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        self.assertEqual(cache[0], {'disk_io': input})
+        assert_()
+
+        # then it wraps again
+        input = {'disk1': nt(100, 100, 20)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        self.assertEqual(cache[0], {'disk_io': input})
+        self.assertEqual(
+            cache[1],
+            {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}})
+        self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
+
+    def test_cache_changing_keys(self):
+        input = {'disk1': nt(5, 5, 5)}
+        wrap_numbers(input, 'disk_io')
+        input = {'disk1': nt(5, 5, 5),
+                 'disk2': nt(7, 7, 7)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        self.assertEqual(cache[0], {'disk_io': input})
+        self.assertEqual(
+            cache[1],
+            {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
+        self.assertEqual(cache[2], {'disk_io': {}})
+
+    def test_cache_clear(self):
+        input = {'disk1': nt(5, 5, 5)}
+        wrap_numbers(input, 'disk_io')
+        wrap_numbers(input, 'disk_io')
+        wrap_numbers.cache_clear('disk_io')
+        self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {}))
+        wrap_numbers.cache_clear('disk_io')
+        wrap_numbers.cache_clear('?!?')
+
+    @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
+    def test_cache_clear_public_apis(self):
+        if not psutil.disk_io_counters() or not psutil.net_io_counters():
+            return self.skipTest("no disks or NICs available")
+        psutil.disk_io_counters()
+        psutil.net_io_counters()
+        caches = wrap_numbers.cache_info()
+        for cache in caches:
+            self.assertIn('psutil.disk_io_counters', cache)
+            self.assertIn('psutil.net_io_counters', cache)
+
+        psutil.disk_io_counters.cache_clear()
+        caches = wrap_numbers.cache_info()
+        for cache in caches:
+            self.assertIn('psutil.net_io_counters', cache)
+            self.assertNotIn('psutil.disk_io_counters', cache)
+
+        psutil.net_io_counters.cache_clear()
+        caches = wrap_numbers.cache_info()
+        self.assertEqual(caches, ({}, {}, {}))
+
+
+# ===================================================================
+# --- Example script tests
+# ===================================================================
+
+
+@unittest.skipIf(not os.path.exists(SCRIPTS_DIR),
+                 "can't locate scripts directory")
+class TestScripts(PsutilTestCase):
+    """Tests for scripts in the "scripts" directory."""
+
+    @staticmethod
+    def assert_stdout(exe, *args, **kwargs):
+        exe = '%s' % os.path.join(SCRIPTS_DIR, exe)
+        cmd = [PYTHON_EXE, exe]
+        for arg in args:
+            cmd.append(arg)
+        try:
+            out = sh(cmd, **kwargs).strip()
+        except RuntimeError as err:
+            if 'AccessDenied' in str(err):
+                return str(err)
+            else:
+                raise
+        assert out, out
+        return out
+
+    @staticmethod
+    def assert_syntax(exe, args=None):
+        exe = os.path.join(SCRIPTS_DIR, exe)
+        if PY3:
+            f = open(exe, 'rt', encoding='utf8')
+        else:
+            f = open(exe, 'rt')
+        with f:
+            src = f.read()
+        ast.parse(src)
+
+    def test_coverage(self):
+        # make sure all example scripts have a test method defined
+        meths = dir(self)
+        for name in os.listdir(SCRIPTS_DIR):
+            if name.endswith('.py'):
+                if 'test_' + os.path.splitext(name)[0] not in meths:
+                    # self.assert_stdout(name)
+                    self.fail('no test defined for %r script'
+                              % os.path.join(SCRIPTS_DIR, name))
+
+    @unittest.skipIf(not POSIX, "POSIX only")
+    def test_executable(self):
+        for name in os.listdir(SCRIPTS_DIR):
+            if name.endswith('.py'):
+                path = os.path.join(SCRIPTS_DIR, name)
+                if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
+                    self.fail('%r is not executable' % path)
+
+    def test_disk_usage(self):
+        self.assert_stdout('disk_usage.py')
+
+    def test_free(self):
+        self.assert_stdout('free.py')
+
+    def test_meminfo(self):
+        self.assert_stdout('meminfo.py')
+
+    def test_procinfo(self):
+        self.assert_stdout('procinfo.py', str(os.getpid()))
+
+    @unittest.skipIf(CI_TESTING and not psutil.users(), "no users")
+    def test_who(self):
+        self.assert_stdout('who.py')
+
+    def test_ps(self):
+        self.assert_stdout('ps.py')
+
+    def test_pstree(self):
+        self.assert_stdout('pstree.py')
+
+    def test_netstat(self):
+        self.assert_stdout('netstat.py')
+
+    # permission denied on travis
+    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
+    def test_ifconfig(self):
+        self.assert_stdout('ifconfig.py')
+
+    @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
+    def test_pmap(self):
+        self.assert_stdout('pmap.py', str(os.getpid()))
+
+    def test_procsmem(self):
+        if 'uss' not in psutil.Process().memory_full_info()._fields:
+            raise self.skipTest("not supported")
+        self.assert_stdout('procsmem.py', stderr=DEVNULL)
+
+    def test_killall(self):
+        self.assert_syntax('killall.py')
+
+    def test_nettop(self):
+        self.assert_syntax('nettop.py')
+
+    def test_top(self):
+        self.assert_syntax('top.py')
+
+    def test_iotop(self):
+        self.assert_syntax('iotop.py')
+
+    def test_pidof(self):
+        output = self.assert_stdout('pidof.py', psutil.Process().name())
+        self.assertIn(str(os.getpid()), output)
+
+    @unittest.skipIf(not WINDOWS, "WINDOWS only")
+    def test_winservices(self):
+        self.assert_stdout('winservices.py')
+
+    def test_cpu_distribution(self):
+        self.assert_syntax('cpu_distribution.py')
+
+    @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
+    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
+    def test_temperatures(self):
+        if not psutil.sensors_temperatures():
+            self.skipTest("no temperatures")
+        self.assert_stdout('temperatures.py')
+
+    @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
+    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
+    def test_fans(self):
+        if not psutil.sensors_fans():
+            self.skipTest("no fans")
+        self.assert_stdout('fans.py')
+
+    @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
+    @unittest.skipIf(not HAS_BATTERY, "no battery")
+    def test_battery(self):
+        self.assert_stdout('battery.py')
+
+    def test_sensors(self):
+        self.assert_stdout('sensors.py')
+
+
+if __name__ == '__main__':
+    from psutil.tests.runner import run_from_name
+    run_from_name(__file__)