diff env/lib/python3.7/site-packages/psutil/tests/test_linux.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/psutil/tests/test_linux.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,2118 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Linux specific tests."""
+
+from __future__ import division
+import collections
+import contextlib
+import errno
+import glob
+import io
+import os
+import re
+import shutil
+import socket
+import struct
+import tempfile
+import textwrap
+import time
+import warnings
+
+import psutil
+from psutil import LINUX
+from psutil._compat import basestring
+from psutil._compat import FileNotFoundError
+from psutil._compat import PY3
+from psutil._compat import u
+from psutil.tests import call_until
+from psutil.tests import HAS_BATTERY
+from psutil.tests import HAS_CPU_FREQ
+from psutil.tests import HAS_GETLOADAVG
+from psutil.tests import HAS_RLIMIT
+from psutil.tests import MEMORY_TOLERANCE
+from psutil.tests import mock
+from psutil.tests import PYPY
+from psutil.tests import pyrun
+from psutil.tests import reap_children
+from psutil.tests import reload_module
+from psutil.tests import retry_on_failure
+from psutil.tests import safe_rmpath
+from psutil.tests import sh
+from psutil.tests import skip_on_not_implemented
+from psutil.tests import TESTFN
+from psutil.tests import ThreadTask
+from psutil.tests import TRAVIS
+from psutil.tests import unittest
+from psutil.tests import which
+
+
+HERE = os.path.abspath(os.path.dirname(__file__))
+SIOCGIFADDR = 0x8915
+SIOCGIFCONF = 0x8912
+SIOCGIFHWADDR = 0x8927
+if LINUX:
+    SECTOR_SIZE = 512
+EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*')
+
+# =====================================================================
+# --- utils
+# =====================================================================
+
+
+def get_ipv4_address(ifname):
+    import fcntl
+    ifname = ifname[:15]
+    if PY3:
+        ifname = bytes(ifname, 'ascii')
+    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    with contextlib.closing(s):
+        return socket.inet_ntoa(
+            fcntl.ioctl(s.fileno(),
+                        SIOCGIFADDR,
+                        struct.pack('256s', ifname))[20:24])
+
+
+def get_mac_address(ifname):
+    import fcntl
+    ifname = ifname[:15]
+    if PY3:
+        ifname = bytes(ifname, 'ascii')
+    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    with contextlib.closing(s):
+        info = fcntl.ioctl(
+            s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
+        if PY3:
+            def ord(x):
+                return x
+        else:
+            import __builtin__
+            ord = __builtin__.ord
+        return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
+
+
+def free_swap():
+    """Parse 'free' cmd and return swap memory's s total, used and free
+    values.
+    """
+    out = sh('free -b', env={"LANG": "C.UTF-8"})
+    lines = out.split('\n')
+    for line in lines:
+        if line.startswith('Swap'):
+            _, total, used, free = line.split()
+            nt = collections.namedtuple('free', 'total used free')
+            return nt(int(total), int(used), int(free))
+    raise ValueError(
+        "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines))
+
+
+def free_physmem():
+    """Parse 'free' cmd and return physical memory's total, used
+    and free values.
+    """
+    # Note: free can have 2 different formats, invalidating 'shared'
+    # and 'cached' memory which may have different positions so we
+    # do not return them.
+    # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946
+    out = sh('free -b', env={"LANG": "C.UTF-8"})
+    lines = out.split('\n')
+    for line in lines:
+        if line.startswith('Mem'):
+            total, used, free, shared = \
+                [int(x) for x in line.split()[1:5]]
+            nt = collections.namedtuple(
+                'free', 'total used free shared output')
+            return nt(total, used, free, shared, out)
+    raise ValueError(
+        "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines))
+
+
+def vmstat(stat):
+    out = sh("vmstat -s", env={"LANG": "C.UTF-8"})
+    for line in out.split("\n"):
+        line = line.strip()
+        if stat in line:
+            return int(line.split(' ')[0])
+    raise ValueError("can't find %r in 'vmstat' output" % stat)
+
+
+def get_free_version_info():
+    out = sh("free -V").strip()
+    return tuple(map(int, out.split()[-1].split('.')))
+
+
+@contextlib.contextmanager
+def mock_open_content(for_path, content):
+    """Mock open() builtin and forces it to return a certain `content`
+    on read() if the path being opened matches `for_path`.
+    """
+    def open_mock(name, *args, **kwargs):
+        if name == for_path:
+            if PY3:
+                if isinstance(content, basestring):
+                    return io.StringIO(content)
+                else:
+                    return io.BytesIO(content)
+            else:
+                return io.BytesIO(content)
+        else:
+            return orig_open(name, *args, **kwargs)
+
+    orig_open = open
+    patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+    with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
+        yield m
+
+
+@contextlib.contextmanager
+def mock_open_exception(for_path, exc):
+    """Mock open() builtin and raises `exc` if the path being opened
+    matches `for_path`.
+    """
+    def open_mock(name, *args, **kwargs):
+        if name == for_path:
+            raise exc
+        else:
+            return orig_open(name, *args, **kwargs)
+
+    orig_open = open
+    patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+    with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
+        yield m
+
+
+# =====================================================================
+# --- system virtual memory
+# =====================================================================
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemVirtualMemory(unittest.TestCase):
+
+    def test_total(self):
+        # free_value = free_physmem().total
+        # psutil_value = psutil.virtual_memory().total
+        # self.assertEqual(free_value, psutil_value)
+        vmstat_value = vmstat('total memory') * 1024
+        psutil_value = psutil.virtual_memory().total
+        self.assertAlmostEqual(vmstat_value, psutil_value)
+
+    # Older versions of procps used slab memory to calculate used memory.
+    # This got changed in:
+    # https://gitlab.com/procps-ng/procps/commit/
+    #     05d751c4f076a2f0118b914c5e51cfbb4762ad8e
+    @unittest.skipIf(LINUX and get_free_version_info() < (3, 3, 12),
+                     "old free version")
+    @retry_on_failure()
+    def test_used(self):
+        free = free_physmem()
+        free_value = free.used
+        psutil_value = psutil.virtual_memory().used
+        self.assertAlmostEqual(
+            free_value, psutil_value, delta=MEMORY_TOLERANCE,
+            msg='%s %s \n%s' % (free_value, psutil_value, free.output))
+
+    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
+    @retry_on_failure()
+    def test_free(self):
+        vmstat_value = vmstat('free memory') * 1024
+        psutil_value = psutil.virtual_memory().free
+        self.assertAlmostEqual(
+            vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
+
+    @retry_on_failure()
+    def test_buffers(self):
+        vmstat_value = vmstat('buffer memory') * 1024
+        psutil_value = psutil.virtual_memory().buffers
+        self.assertAlmostEqual(
+            vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
+
+    # https://travis-ci.org/giampaolo/psutil/jobs/226719664
+    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
+    @retry_on_failure()
+    def test_active(self):
+        vmstat_value = vmstat('active memory') * 1024
+        psutil_value = psutil.virtual_memory().active
+        self.assertAlmostEqual(
+            vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
+
+    # https://travis-ci.org/giampaolo/psutil/jobs/227242952
+    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
+    @retry_on_failure()
+    def test_inactive(self):
+        vmstat_value = vmstat('inactive memory') * 1024
+        psutil_value = psutil.virtual_memory().inactive
+        self.assertAlmostEqual(
+            vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
+
+    @retry_on_failure()
+    def test_shared(self):
+        free = free_physmem()
+        free_value = free.shared
+        if free_value == 0:
+            raise unittest.SkipTest("free does not support 'shared' column")
+        psutil_value = psutil.virtual_memory().shared
+        self.assertAlmostEqual(
+            free_value, psutil_value, delta=MEMORY_TOLERANCE,
+            msg='%s %s \n%s' % (free_value, psutil_value, free.output))
+
+    @retry_on_failure()
+    def test_available(self):
+        # "free" output format has changed at some point:
+        # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098
+        out = sh("free -b")
+        lines = out.split('\n')
+        if 'available' not in lines[0]:
+            raise unittest.SkipTest("free does not support 'available' column")
+        else:
+            free_value = int(lines[1].split()[-1])
+            psutil_value = psutil.virtual_memory().available
+            self.assertAlmostEqual(
+                free_value, psutil_value, delta=MEMORY_TOLERANCE,
+                msg='%s %s \n%s' % (free_value, psutil_value, out))
+
+    def test_warnings_on_misses(self):
+        # Emulate a case where /proc/meminfo provides few info.
+        # psutil is supposed to set the missing fields to 0 and
+        # raise a warning.
+        with mock_open_content(
+            '/proc/meminfo',
+            textwrap.dedent("""\
+                Active(anon):    6145416 kB
+                Active(file):    2950064 kB
+                Inactive(anon):   574764 kB
+                Inactive(file):  1567648 kB
+                MemAvailable:         -1 kB
+                MemFree:         2057400 kB
+                MemTotal:       16325648 kB
+                SReclaimable:     346648 kB
+                """).encode()) as m:
+            with warnings.catch_warnings(record=True) as ws:
+                warnings.simplefilter("always")
+                ret = psutil.virtual_memory()
+                assert m.called
+                self.assertEqual(len(ws), 1)
+                w = ws[0]
+                assert w.filename.endswith('psutil/_pslinux.py')
+                self.assertIn(
+                    "memory stats couldn't be determined", str(w.message))
+                self.assertIn("cached", str(w.message))
+                self.assertIn("shared", str(w.message))
+                self.assertIn("active", str(w.message))
+                self.assertIn("inactive", str(w.message))
+                self.assertIn("buffers", str(w.message))
+                self.assertIn("available", str(w.message))
+                self.assertEqual(ret.cached, 0)
+                self.assertEqual(ret.active, 0)
+                self.assertEqual(ret.inactive, 0)
+                self.assertEqual(ret.shared, 0)
+                self.assertEqual(ret.buffers, 0)
+                self.assertEqual(ret.available, 0)
+                self.assertEqual(ret.slab, 0)
+
+    @retry_on_failure()
+    def test_avail_old_percent(self):
+        # Make sure that our calculation of avail mem for old kernels
+        # is off by max 15%.
+        from psutil._pslinux import calculate_avail_vmem
+        from psutil._pslinux import open_binary
+
+        mems = {}
+        with open_binary('/proc/meminfo') as f:
+            for line in f:
+                fields = line.split()
+                mems[fields[0]] = int(fields[1]) * 1024
+
+        a = calculate_avail_vmem(mems)
+        if b'MemAvailable:' in mems:
+            b = mems[b'MemAvailable:']
+            diff_percent = abs(a - b) / a * 100
+            self.assertLess(diff_percent, 15)
+
+    def test_avail_old_comes_from_kernel(self):
+        # Make sure "MemAvailable:" coluimn is used instead of relying
+        # on our internal algorithm to calculate avail mem.
+        with mock_open_content(
+            '/proc/meminfo',
+            textwrap.dedent("""\
+                Active:          9444728 kB
+                Active(anon):    6145416 kB
+                Active(file):    2950064 kB
+                Buffers:          287952 kB
+                Cached:          4818144 kB
+                Inactive(file):  1578132 kB
+                Inactive(anon):   574764 kB
+                Inactive(file):  1567648 kB
+                MemAvailable:    6574984 kB
+                MemFree:         2057400 kB
+                MemTotal:       16325648 kB
+                Shmem:            577588 kB
+                SReclaimable:     346648 kB
+                """).encode()) as m:
+            with warnings.catch_warnings(record=True) as ws:
+                ret = psutil.virtual_memory()
+            assert m.called
+            self.assertEqual(ret.available, 6574984 * 1024)
+            w = ws[0]
+            self.assertIn(
+                "inactive memory stats couldn't be determined", str(w.message))
+
+    def test_avail_old_missing_fields(self):
+        # Remove Active(file), Inactive(file) and SReclaimable
+        # from /proc/meminfo and make sure the fallback is used
+        # (free + cached),
+        with mock_open_content(
+            "/proc/meminfo",
+            textwrap.dedent("""\
+                    Active:          9444728 kB
+                    Active(anon):    6145416 kB
+                    Buffers:          287952 kB
+                    Cached:          4818144 kB
+                    Inactive(file):  1578132 kB
+                    Inactive(anon):   574764 kB
+                    MemFree:         2057400 kB
+                    MemTotal:       16325648 kB
+                    Shmem:            577588 kB
+                    """).encode()) as m:
+            with warnings.catch_warnings(record=True) as ws:
+                ret = psutil.virtual_memory()
+            assert m.called
+            self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024)
+            w = ws[0]
+            self.assertIn(
+                "inactive memory stats couldn't be determined", str(w.message))
+
+    def test_avail_old_missing_zoneinfo(self):
+        # Remove /proc/zoneinfo file. Make sure fallback is used
+        # (free + cached).
+        with mock_open_content(
+                "/proc/meminfo",
+                textwrap.dedent("""\
+                    Active:          9444728 kB
+                    Active(anon):    6145416 kB
+                    Active(file):    2950064 kB
+                    Buffers:          287952 kB
+                    Cached:          4818144 kB
+                    Inactive(file):  1578132 kB
+                    Inactive(anon):   574764 kB
+                    Inactive(file):  1567648 kB
+                    MemFree:         2057400 kB
+                    MemTotal:       16325648 kB
+                    Shmem:            577588 kB
+                    SReclaimable:     346648 kB
+                    """).encode()):
+            with mock_open_exception(
+                    "/proc/zoneinfo",
+                    IOError(errno.ENOENT, 'no such file or directory')):
+                with warnings.catch_warnings(record=True) as ws:
+                    ret = psutil.virtual_memory()
+                    self.assertEqual(
+                        ret.available, 2057400 * 1024 + 4818144 * 1024)
+                    w = ws[0]
+                    self.assertIn(
+                        "inactive memory stats couldn't be determined",
+                        str(w.message))
+
+    def test_virtual_memory_mocked(self):
+        # Emulate /proc/meminfo because neither vmstat nor free return slab.
+        def open_mock(name, *args, **kwargs):
+            if name == '/proc/meminfo':
+                return io.BytesIO(textwrap.dedent("""\
+                    MemTotal:              100 kB
+                    MemFree:               2 kB
+                    MemAvailable:          3 kB
+                    Buffers:               4 kB
+                    Cached:                5 kB
+                    SwapCached:            6 kB
+                    Active:                7 kB
+                    Inactive:              8 kB
+                    Active(anon):          9 kB
+                    Inactive(anon):        10 kB
+                    Active(file):          11 kB
+                    Inactive(file):        12 kB
+                    Unevictable:           13 kB
+                    Mlocked:               14 kB
+                    SwapTotal:             15 kB
+                    SwapFree:              16 kB
+                    Dirty:                 17 kB
+                    Writeback:             18 kB
+                    AnonPages:             19 kB
+                    Mapped:                20 kB
+                    Shmem:                 21 kB
+                    Slab:                  22 kB
+                    SReclaimable:          23 kB
+                    SUnreclaim:            24 kB
+                    KernelStack:           25 kB
+                    PageTables:            26 kB
+                    NFS_Unstable:          27 kB
+                    Bounce:                28 kB
+                    WritebackTmp:          29 kB
+                    CommitLimit:           30 kB
+                    Committed_AS:          31 kB
+                    VmallocTotal:          32 kB
+                    VmallocUsed:           33 kB
+                    VmallocChunk:          34 kB
+                    HardwareCorrupted:     35 kB
+                    AnonHugePages:         36 kB
+                    ShmemHugePages:        37 kB
+                    ShmemPmdMapped:        38 kB
+                    CmaTotal:              39 kB
+                    CmaFree:               40 kB
+                    HugePages_Total:       41 kB
+                    HugePages_Free:        42 kB
+                    HugePages_Rsvd:        43 kB
+                    HugePages_Surp:        44 kB
+                    Hugepagesize:          45 kB
+                    DirectMap46k:          46 kB
+                    DirectMap47M:          47 kB
+                    DirectMap48G:          48 kB
+                    """).encode())
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
+            mem = psutil.virtual_memory()
+            assert m.called
+            self.assertEqual(mem.total, 100 * 1024)
+            self.assertEqual(mem.free, 2 * 1024)
+            self.assertEqual(mem.buffers, 4 * 1024)
+            # cached mem also includes reclaimable memory
+            self.assertEqual(mem.cached, (5 + 23) * 1024)
+            self.assertEqual(mem.shared, 21 * 1024)
+            self.assertEqual(mem.active, 7 * 1024)
+            self.assertEqual(mem.inactive, 8 * 1024)
+            self.assertEqual(mem.slab, 22 * 1024)
+            self.assertEqual(mem.available, 3 * 1024)
+
+
+# =====================================================================
+# --- system swap memory
+# =====================================================================
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemSwapMemory(unittest.TestCase):
+
+    @staticmethod
+    def meminfo_has_swap_info():
+        """Return True if /proc/meminfo provides swap metrics."""
+        with open("/proc/meminfo") as f:
+            data = f.read()
+        return 'SwapTotal:' in data and 'SwapFree:' in data
+
+    def test_total(self):
+        free_value = free_swap().total
+        psutil_value = psutil.swap_memory().total
+        return self.assertAlmostEqual(
+            free_value, psutil_value, delta=MEMORY_TOLERANCE)
+
+    @retry_on_failure()
+    def test_used(self):
+        free_value = free_swap().used
+        psutil_value = psutil.swap_memory().used
+        return self.assertAlmostEqual(
+            free_value, psutil_value, delta=MEMORY_TOLERANCE)
+
+    @retry_on_failure()
+    def test_free(self):
+        free_value = free_swap().free
+        psutil_value = psutil.swap_memory().free
+        return self.assertAlmostEqual(
+            free_value, psutil_value, delta=MEMORY_TOLERANCE)
+
+    def test_missing_sin_sout(self):
+        with mock.patch('psutil._common.open', create=True) as m:
+            with warnings.catch_warnings(record=True) as ws:
+                warnings.simplefilter("always")
+                ret = psutil.swap_memory()
+                assert m.called
+                self.assertEqual(len(ws), 1)
+                w = ws[0]
+                assert w.filename.endswith('psutil/_pslinux.py')
+                self.assertIn(
+                    "'sin' and 'sout' swap memory stats couldn't "
+                    "be determined", str(w.message))
+                self.assertEqual(ret.sin, 0)
+                self.assertEqual(ret.sout, 0)
+
+    def test_no_vmstat_mocked(self):
+        # see https://github.com/giampaolo/psutil/issues/722
+        with mock_open_exception(
+                "/proc/vmstat",
+                IOError(errno.ENOENT, 'no such file or directory')) as m:
+            with warnings.catch_warnings(record=True) as ws:
+                warnings.simplefilter("always")
+                ret = psutil.swap_memory()
+                assert m.called
+                self.assertEqual(len(ws), 1)
+                w = ws[0]
+                assert w.filename.endswith('psutil/_pslinux.py')
+                self.assertIn(
+                    "'sin' and 'sout' swap memory stats couldn't "
+                    "be determined and were set to 0",
+                    str(w.message))
+                self.assertEqual(ret.sin, 0)
+                self.assertEqual(ret.sout, 0)
+
+    def test_meminfo_against_sysinfo(self):
+        # Make sure the content of /proc/meminfo about swap memory
+        # matches sysinfo() syscall, see:
+        # https://github.com/giampaolo/psutil/issues/1015
+        if not self.meminfo_has_swap_info():
+            return unittest.skip("/proc/meminfo has no swap metrics")
+        with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m:
+            swap = psutil.swap_memory()
+        assert not m.called
+        import psutil._psutil_linux as cext
+        _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo()
+        total *= unit_multiplier
+        free *= unit_multiplier
+        self.assertEqual(swap.total, total)
+        self.assertAlmostEqual(swap.free, free, delta=MEMORY_TOLERANCE)
+
+    def test_emulate_meminfo_has_no_metrics(self):
+        # Emulate a case where /proc/meminfo provides no swap metrics
+        # in which case sysinfo() syscall is supposed to be used
+        # as a fallback.
+        with mock_open_content("/proc/meminfo", b"") as m:
+            psutil.swap_memory()
+            assert m.called
+
+
+# =====================================================================
+# --- system CPU
+# =====================================================================
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemCPUTimes(unittest.TestCase):
+
+    @unittest.skipIf(TRAVIS, "unknown failure on travis")
+    def test_fields(self):
+        fields = psutil.cpu_times()._fields
+        kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0]
+        kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
+        if kernel_ver_info >= (2, 6, 11):
+            self.assertIn('steal', fields)
+        else:
+            self.assertNotIn('steal', fields)
+        if kernel_ver_info >= (2, 6, 24):
+            self.assertIn('guest', fields)
+        else:
+            self.assertNotIn('guest', fields)
+        if kernel_ver_info >= (3, 2, 0):
+            self.assertIn('guest_nice', fields)
+        else:
+            self.assertNotIn('guest_nice', fields)
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemCPUCountLogical(unittest.TestCase):
+
+    @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"),
+                     "/sys/devices/system/cpu/online does not exist")
+    def test_against_sysdev_cpu_online(self):
+        with open("/sys/devices/system/cpu/online") as f:
+            value = f.read().strip()
+        if "-" in str(value):
+            value = int(value.split('-')[1]) + 1
+            self.assertEqual(psutil.cpu_count(), value)
+
+    @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"),
+                     "/sys/devices/system/cpu does not exist")
+    def test_against_sysdev_cpu_num(self):
+        ls = os.listdir("/sys/devices/system/cpu")
+        count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None])
+        self.assertEqual(psutil.cpu_count(), count)
+
+    @unittest.skipIf(not which("nproc"), "nproc utility not available")
+    def test_against_nproc(self):
+        num = int(sh("nproc --all"))
+        self.assertEqual(psutil.cpu_count(logical=True), num)
+
+    @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
+    def test_against_lscpu(self):
+        out = sh("lscpu -p")
+        num = len([x for x in out.split('\n') if not x.startswith('#')])
+        self.assertEqual(psutil.cpu_count(logical=True), num)
+
+    def test_emulate_fallbacks(self):
+        import psutil._pslinux
+        original = psutil._pslinux.cpu_count_logical()
+        # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
+        # order to cause the parsing of /proc/cpuinfo and /proc/stat.
+        with mock.patch(
+                'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
+            self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
+            assert m.called
+
+            # Let's have open() return emtpy data and make sure None is
+            # returned ('cause we mimick os.cpu_count()).
+            with mock.patch('psutil._common.open', create=True) as m:
+                self.assertIsNone(psutil._pslinux.cpu_count_logical())
+                self.assertEqual(m.call_count, 2)
+                # /proc/stat should be the last one
+                self.assertEqual(m.call_args[0][0], '/proc/stat')
+
+            # Let's push this a bit further and make sure /proc/cpuinfo
+            # parsing works as expected.
+            with open('/proc/cpuinfo', 'rb') as f:
+                cpuinfo_data = f.read()
+            fake_file = io.BytesIO(cpuinfo_data)
+            with mock.patch('psutil._common.open',
+                            return_value=fake_file, create=True) as m:
+                self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
+
+            # Finally, let's make /proc/cpuinfo return meaningless data;
+            # this way we'll fall back on relying on /proc/stat
+            with mock_open_content('/proc/cpuinfo', b"") as m:
+                self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
+                m.called
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemCPUCountPhysical(unittest.TestCase):
+
+    @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
+    def test_against_lscpu(self):
+        out = sh("lscpu -p")
+        core_ids = set()
+        for line in out.split('\n'):
+            if not line.startswith('#'):
+                fields = line.split(',')
+                core_ids.add(fields[1])
+        self.assertEqual(psutil.cpu_count(logical=False), len(core_ids))
+
+    def test_emulate_none(self):
+        with mock.patch('glob.glob', return_value=[]) as m1:
+            with mock.patch('psutil._common.open', create=True) as m2:
+                self.assertIsNone(psutil._pslinux.cpu_count_physical())
+        assert m1.called
+        assert m2.called
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemCPUFrequency(unittest.TestCase):
+
+    @unittest.skipIf(TRAVIS, "fails on Travis")
+    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
+    def test_emulate_use_second_file(self):
+        # https://github.com/giampaolo/psutil/issues/981
+        def path_exists_mock(path):
+            if path.startswith("/sys/devices/system/cpu/cpufreq/policy"):
+                return False
+            else:
+                return orig_exists(path)
+
+        orig_exists = os.path.exists
+        with mock.patch("os.path.exists", side_effect=path_exists_mock,
+                        create=True):
+            assert psutil.cpu_freq()
+
+    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
+    def test_emulate_use_cpuinfo(self):
+        # Emulate a case where /sys/devices/system/cpu/cpufreq* does not
+        # exist and /proc/cpuinfo is used instead.
+        def path_exists_mock(path):
+            if path.startswith('/sys/devices/system/cpu/'):
+                return False
+            else:
+                if path == "/proc/cpuinfo":
+                    flags.append(None)
+                return os_path_exists(path)
+
+        flags = []
+        os_path_exists = os.path.exists
+        try:
+            with mock.patch("os.path.exists", side_effect=path_exists_mock):
+                reload_module(psutil._pslinux)
+                ret = psutil.cpu_freq()
+                assert ret
+                assert flags
+                self.assertEqual(ret.max, 0.0)
+                self.assertEqual(ret.min, 0.0)
+                for freq in psutil.cpu_freq(percpu=True):
+                    self.assertEqual(ret.max, 0.0)
+                    self.assertEqual(ret.min, 0.0)
+        finally:
+            reload_module(psutil._pslinux)
+            reload_module(psutil)
+
+    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
+    def test_emulate_data(self):
+        def open_mock(name, *args, **kwargs):
+            if (name.endswith('/scaling_cur_freq') and
+                    name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
+                return io.BytesIO(b"500000")
+            elif (name.endswith('/scaling_min_freq') and
+                    name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
+                return io.BytesIO(b"600000")
+            elif (name.endswith('/scaling_max_freq') and
+                    name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
+                return io.BytesIO(b"700000")
+            elif name == '/proc/cpuinfo':
+                return io.BytesIO(b"cpu MHz		: 500")
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock):
+            with mock.patch(
+                    'os.path.exists', return_value=True):
+                freq = psutil.cpu_freq()
+                self.assertEqual(freq.current, 500.0)
+                # when /proc/cpuinfo is used min and max frequencies are not
+                # available and are set to 0.
+                if freq.min != 0.0:
+                    self.assertEqual(freq.min, 600.0)
+                if freq.max != 0.0:
+                    self.assertEqual(freq.max, 700.0)
+
+    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
+    def test_emulate_multi_cpu(self):
+        def open_mock(name, *args, **kwargs):
+            n = name
+            if (n.endswith('/scaling_cur_freq') and
+                    n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
+                return io.BytesIO(b"100000")
+            elif (n.endswith('/scaling_min_freq') and
+                    n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
+                return io.BytesIO(b"200000")
+            elif (n.endswith('/scaling_max_freq') and
+                    n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
+                return io.BytesIO(b"300000")
+            elif (n.endswith('/scaling_cur_freq') and
+                    n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
+                return io.BytesIO(b"400000")
+            elif (n.endswith('/scaling_min_freq') and
+                    n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
+                return io.BytesIO(b"500000")
+            elif (n.endswith('/scaling_max_freq') and
+                    n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
+                return io.BytesIO(b"600000")
+            elif name == '/proc/cpuinfo':
+                return io.BytesIO(b"cpu MHz		: 100\n"
+                                  b"cpu MHz		: 400")
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock):
+            with mock.patch('os.path.exists', return_value=True):
+                with mock.patch('psutil._pslinux.cpu_count_logical',
+                                return_value=2):
+                    freq = psutil.cpu_freq(percpu=True)
+                    self.assertEqual(freq[0].current, 100.0)
+                    if freq[0].min != 0.0:
+                        self.assertEqual(freq[0].min, 200.0)
+                    if freq[0].max != 0.0:
+                        self.assertEqual(freq[0].max, 300.0)
+                    self.assertEqual(freq[1].current, 400.0)
+                    if freq[1].min != 0.0:
+                        self.assertEqual(freq[1].min, 500.0)
+                    if freq[1].max != 0.0:
+                        self.assertEqual(freq[1].max, 600.0)
+
+    @unittest.skipIf(TRAVIS, "fails on Travis")
+    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
+    def test_emulate_no_scaling_cur_freq_file(self):
+        # See: https://github.com/giampaolo/psutil/issues/1071
+        def open_mock(name, *args, **kwargs):
+            if name.endswith('/scaling_cur_freq'):
+                raise IOError(errno.ENOENT, "")
+            elif name.endswith('/cpuinfo_cur_freq'):
+                return io.BytesIO(b"200000")
+            elif name == '/proc/cpuinfo':
+                return io.BytesIO(b"cpu MHz		: 200")
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock):
+            with mock.patch('os.path.exists', return_value=True):
+                with mock.patch('psutil._pslinux.cpu_count_logical',
+                                return_value=1):
+                    freq = psutil.cpu_freq()
+                    self.assertEqual(freq.current, 200)
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemCPUStats(unittest.TestCase):
+
+    @unittest.skipIf(TRAVIS, "fails on Travis")
+    def test_ctx_switches(self):
+        vmstat_value = vmstat("context switches")
+        psutil_value = psutil.cpu_stats().ctx_switches
+        self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
+
+    @unittest.skipIf(TRAVIS, "fails on Travis")
+    def test_interrupts(self):
+        vmstat_value = vmstat("interrupts")
+        psutil_value = psutil.cpu_stats().interrupts
+        self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestLoadAvg(unittest.TestCase):
+
+    @unittest.skipIf(not HAS_GETLOADAVG, "not supported")
+    def test_getloadavg(self):
+        psutil_value = psutil.getloadavg()
+        with open("/proc/loadavg", "r") as f:
+            proc_value = f.read().split()
+
+        self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1)
+        self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1)
+        self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1)
+
+
+# =====================================================================
+# --- system network
+# =====================================================================
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemNetIfAddrs(unittest.TestCase):
+
+    def test_ips(self):
+        for name, addrs in psutil.net_if_addrs().items():
+            for addr in addrs:
+                if addr.family == psutil.AF_LINK:
+                    self.assertEqual(addr.address, get_mac_address(name))
+                elif addr.family == socket.AF_INET:
+                    self.assertEqual(addr.address, get_ipv4_address(name))
+                # TODO: test for AF_INET6 family
+
+    # XXX - not reliable when having virtual NICs installed by Docker.
+    # @unittest.skipIf(not which('ip'), "'ip' utility not available")
+    # @unittest.skipIf(TRAVIS, "skipped on Travis")
+    # def test_net_if_names(self):
+    #     out = sh("ip addr").strip()
+    #     nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x]
+    #     found = 0
+    #     for line in out.split('\n'):
+    #         line = line.strip()
+    #         if re.search(r"^\d+:", line):
+    #             found += 1
+    #             name = line.split(':')[1].strip()
+    #             self.assertIn(name, nics)
+    #     self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
+    #         pprint.pformat(nics), out))
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemNetIfStats(unittest.TestCase):
+
+    def test_against_ifconfig(self):
+        for name, stats in psutil.net_if_stats().items():
+            try:
+                out = sh("ifconfig %s" % name)
+            except RuntimeError:
+                pass
+            else:
+                # Not always reliable.
+                # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
+                self.assertEqual(stats.mtu,
+                                 int(re.findall(r'(?i)MTU[: ](\d+)', out)[0]))
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemNetIOCounters(unittest.TestCase):
+
+    @retry_on_failure()
+    def test_against_ifconfig(self):
+        def ifconfig(nic):
+            ret = {}
+            out = sh("ifconfig %s" % name)
+            ret['packets_recv'] = int(
+                re.findall(r'RX packets[: ](\d+)', out)[0])
+            ret['packets_sent'] = int(
+                re.findall(r'TX packets[: ](\d+)', out)[0])
+            ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0])
+            ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1])
+            ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0])
+            ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1])
+            ret['bytes_recv'] = int(
+                re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
+            ret['bytes_sent'] = int(
+                re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
+            return ret
+
+        nio = psutil.net_io_counters(pernic=True, nowrap=False)
+        for name, stats in nio.items():
+            try:
+                ifconfig_ret = ifconfig(name)
+            except RuntimeError:
+                continue
+            self.assertAlmostEqual(
+                stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5)
+            self.assertAlmostEqual(
+                stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5)
+            self.assertAlmostEqual(
+                stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024)
+            self.assertAlmostEqual(
+                stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024)
+            self.assertAlmostEqual(
+                stats.errin, ifconfig_ret['errin'], delta=10)
+            self.assertAlmostEqual(
+                stats.errout, ifconfig_ret['errout'], delta=10)
+            self.assertAlmostEqual(
+                stats.dropin, ifconfig_ret['dropin'], delta=10)
+            self.assertAlmostEqual(
+                stats.dropout, ifconfig_ret['dropout'], delta=10)
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemNetConnections(unittest.TestCase):
+
+    @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError)
+    @mock.patch('psutil._pslinux.supports_ipv6', return_value=False)
+    def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop):
+        # see: https://github.com/giampaolo/psutil/issues/623
+        try:
+            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+            self.addCleanup(s.close)
+            s.bind(("::1", 0))
+        except socket.error:
+            pass
+        psutil.net_connections(kind='inet6')
+
+    def test_emulate_unix(self):
+        with mock_open_content(
+            '/proc/net/unix',
+            textwrap.dedent("""\
+                0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
+                0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
+                0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
+                000000000000000000000000000000000000000000000000000000
+                """)) as m:
+            psutil.net_connections(kind='unix')
+            assert m.called
+
+
+# =====================================================================
+# --- system disks
+# =====================================================================
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemDiskPartitions(unittest.TestCase):
+
+    @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available")
+    @skip_on_not_implemented()
+    def test_against_df(self):
+        # test psutil.disk_usage() and psutil.disk_partitions()
+        # against "df -a"
+        def df(path):
+            out = sh('df -P -B 1 "%s"' % path).strip()
+            lines = out.split('\n')
+            lines.pop(0)
+            line = lines.pop(0)
+            dev, total, used, free = line.split()[:4]
+            if dev == 'none':
+                dev = ''
+            total, used, free = int(total), int(used), int(free)
+            return dev, total, used, free
+
+        for part in psutil.disk_partitions(all=False):
+            usage = psutil.disk_usage(part.mountpoint)
+            dev, total, used, free = df(part.mountpoint)
+            self.assertEqual(usage.total, total)
+            # 10 MB tollerance
+            if abs(usage.free - free) > 10 * 1024 * 1024:
+                self.fail("psutil=%s, df=%s" % (usage.free, free))
+            if abs(usage.used - used) > 10 * 1024 * 1024:
+                self.fail("psutil=%s, df=%s" % (usage.used, used))
+
+    def test_zfs_fs(self):
+        # Test that ZFS partitions are returned.
+        with open("/proc/filesystems", "r") as f:
+            data = f.read()
+        if 'zfs' in data:
+            for part in psutil.disk_partitions():
+                if part.fstype == 'zfs':
+                    break
+            else:
+                self.fail("couldn't find any ZFS partition")
+        else:
+            # No ZFS partitions on this system. Let's fake one.
+            fake_file = io.StringIO(u("nodev\tzfs\n"))
+            with mock.patch('psutil._common.open',
+                            return_value=fake_file, create=True) as m1:
+                with mock.patch(
+                        'psutil._pslinux.cext.disk_partitions',
+                        return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
+                    ret = psutil.disk_partitions()
+                    assert m1.called
+                    assert m2.called
+                    assert ret
+                    self.assertEqual(ret[0].fstype, 'zfs')
+
+    def test_emulate_realpath_fail(self):
+        # See: https://github.com/giampaolo/psutil/issues/1307
+        try:
+            with mock.patch('os.path.realpath',
+                            return_value='/non/existent') as m:
+                with self.assertRaises(FileNotFoundError):
+                    psutil.disk_partitions()
+                assert m.called
+        finally:
+            psutil.PROCFS_PATH = "/proc"
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSystemDiskIoCounters(unittest.TestCase):
+
+    def test_emulate_kernel_2_4(self):
+        # Tests /proc/diskstats parsing format for 2.4 kernels, see:
+        # https://github.com/giampaolo/psutil/issues/767
+        with mock_open_content(
+                '/proc/diskstats',
+                "   3     0   1 hda 2 3 4 5 6 7 8 9 10 11 12"):
+            with mock.patch('psutil._pslinux.is_storage_device',
+                            return_value=True):
+                ret = psutil.disk_io_counters(nowrap=False)
+                self.assertEqual(ret.read_count, 1)
+                self.assertEqual(ret.read_merged_count, 2)
+                self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
+                self.assertEqual(ret.read_time, 4)
+                self.assertEqual(ret.write_count, 5)
+                self.assertEqual(ret.write_merged_count, 6)
+                self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
+                self.assertEqual(ret.write_time, 8)
+                self.assertEqual(ret.busy_time, 10)
+
+    def test_emulate_kernel_2_6_full(self):
+        # Tests /proc/diskstats parsing format for 2.6 kernels,
+        # lines reporting all metrics:
+        # https://github.com/giampaolo/psutil/issues/767
+        with mock_open_content(
+                '/proc/diskstats',
+                "   3    0   hda 1 2 3 4 5 6 7 8 9 10 11"):
+            with mock.patch('psutil._pslinux.is_storage_device',
+                            return_value=True):
+                ret = psutil.disk_io_counters(nowrap=False)
+                self.assertEqual(ret.read_count, 1)
+                self.assertEqual(ret.read_merged_count, 2)
+                self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
+                self.assertEqual(ret.read_time, 4)
+                self.assertEqual(ret.write_count, 5)
+                self.assertEqual(ret.write_merged_count, 6)
+                self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
+                self.assertEqual(ret.write_time, 8)
+                self.assertEqual(ret.busy_time, 10)
+
+    def test_emulate_kernel_2_6_limited(self):
+        # Tests /proc/diskstats parsing format for 2.6 kernels,
+        # where one line of /proc/partitions return a limited
+        # amount of metrics when it bumps into a partition
+        # (instead of a disk). See:
+        # https://github.com/giampaolo/psutil/issues/767
+        with mock_open_content(
+                '/proc/diskstats',
+                "   3    1   hda 1 2 3 4"):
+            with mock.patch('psutil._pslinux.is_storage_device',
+                            return_value=True):
+                ret = psutil.disk_io_counters(nowrap=False)
+                self.assertEqual(ret.read_count, 1)
+                self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
+                self.assertEqual(ret.write_count, 3)
+                self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
+
+                self.assertEqual(ret.read_merged_count, 0)
+                self.assertEqual(ret.read_time, 0)
+                self.assertEqual(ret.write_merged_count, 0)
+                self.assertEqual(ret.write_time, 0)
+                self.assertEqual(ret.busy_time, 0)
+
+    def test_emulate_include_partitions(self):
+        # Make sure that when perdisk=True disk partitions are returned,
+        # see:
+        # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
+        with mock_open_content(
+                '/proc/diskstats',
+                textwrap.dedent("""\
+                    3    0   nvme0n1 1 2 3 4 5 6 7 8 9 10 11
+                    3    0   nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
+                    """)):
+            with mock.patch('psutil._pslinux.is_storage_device',
+                            return_value=False):
+                ret = psutil.disk_io_counters(perdisk=True, nowrap=False)
+                self.assertEqual(len(ret), 2)
+                self.assertEqual(ret['nvme0n1'].read_count, 1)
+                self.assertEqual(ret['nvme0n1p1'].read_count, 1)
+                self.assertEqual(ret['nvme0n1'].write_count, 5)
+                self.assertEqual(ret['nvme0n1p1'].write_count, 5)
+
+    def test_emulate_exclude_partitions(self):
+        # Make sure that when perdisk=False partitions (e.g. 'sda1',
+        # 'nvme0n1p1') are skipped and not included in the total count.
+        # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
+        with mock_open_content(
+                '/proc/diskstats',
+                textwrap.dedent("""\
+                    3    0   nvme0n1 1 2 3 4 5 6 7 8 9 10 11
+                    3    0   nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
+                    """)):
+            with mock.patch('psutil._pslinux.is_storage_device',
+                            return_value=False):
+                ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
+                self.assertIsNone(ret)
+
+        #
+        def is_storage_device(name):
+            return name == 'nvme0n1'
+
+        with mock_open_content(
+                '/proc/diskstats',
+                textwrap.dedent("""\
+                    3    0   nvme0n1 1 2 3 4 5 6 7 8 9 10 11
+                    3    0   nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
+                    """)):
+            with mock.patch('psutil._pslinux.is_storage_device',
+                            create=True, side_effect=is_storage_device):
+                ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
+                self.assertEqual(ret.read_count, 1)
+                self.assertEqual(ret.write_count, 5)
+
+    def test_emulate_use_sysfs(self):
+        def exists(path):
+            if path == '/proc/diskstats':
+                return False
+            return True
+
+        wprocfs = psutil.disk_io_counters(perdisk=True)
+        with mock.patch('psutil._pslinux.os.path.exists',
+                        create=True, side_effect=exists):
+            wsysfs = psutil.disk_io_counters(perdisk=True)
+        self.assertEqual(len(wprocfs), len(wsysfs))
+
+    def test_emulate_not_impl(self):
+        def exists(path):
+            return False
+
+        with mock.patch('psutil._pslinux.os.path.exists',
+                        create=True, side_effect=exists):
+            self.assertRaises(NotImplementedError, psutil.disk_io_counters)
+
+
+# =====================================================================
+# --- misc
+# =====================================================================
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestMisc(unittest.TestCase):
+
+    def test_boot_time(self):
+        vmstat_value = vmstat('boot time')
+        psutil_value = psutil.boot_time()
+        self.assertEqual(int(vmstat_value), int(psutil_value))
+
+    def test_no_procfs_on_import(self):
+        my_procfs = tempfile.mkdtemp()
+
+        with open(os.path.join(my_procfs, 'stat'), 'w') as f:
+            f.write('cpu   0 0 0 0 0 0 0 0 0 0\n')
+            f.write('cpu0  0 0 0 0 0 0 0 0 0 0\n')
+            f.write('cpu1  0 0 0 0 0 0 0 0 0 0\n')
+
+        try:
+            orig_open = open
+
+            def open_mock(name, *args, **kwargs):
+                if name.startswith('/proc'):
+                    raise IOError(errno.ENOENT, 'rejecting access for test')
+                return orig_open(name, *args, **kwargs)
+
+            patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+            with mock.patch(patch_point, side_effect=open_mock):
+                reload_module(psutil)
+
+                self.assertRaises(IOError, psutil.cpu_times)
+                self.assertRaises(IOError, psutil.cpu_times, percpu=True)
+                self.assertRaises(IOError, psutil.cpu_percent)
+                self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
+                self.assertRaises(IOError, psutil.cpu_times_percent)
+                self.assertRaises(
+                    IOError, psutil.cpu_times_percent, percpu=True)
+
+                psutil.PROCFS_PATH = my_procfs
+
+                self.assertEqual(psutil.cpu_percent(), 0)
+                self.assertEqual(sum(psutil.cpu_times_percent()), 0)
+
+                # since we don't know the number of CPUs at import time,
+                # we awkwardly say there are none until the second call
+                per_cpu_percent = psutil.cpu_percent(percpu=True)
+                self.assertEqual(sum(per_cpu_percent), 0)
+
+                # ditto awkward length
+                per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
+                self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
+
+                # much user, very busy
+                with open(os.path.join(my_procfs, 'stat'), 'w') as f:
+                    f.write('cpu   1 0 0 0 0 0 0 0 0 0\n')
+                    f.write('cpu0  1 0 0 0 0 0 0 0 0 0\n')
+                    f.write('cpu1  1 0 0 0 0 0 0 0 0 0\n')
+
+                self.assertNotEqual(psutil.cpu_percent(), 0)
+                self.assertNotEqual(
+                    sum(psutil.cpu_percent(percpu=True)), 0)
+                self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
+                self.assertNotEqual(
+                    sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
+        finally:
+            shutil.rmtree(my_procfs)
+            reload_module(psutil)
+
+        self.assertEqual(psutil.PROCFS_PATH, '/proc')
+
+    def test_cpu_steal_decrease(self):
+        # Test cumulative cpu stats decrease. We should ignore this.
+        # See issue #1210.
+        with mock_open_content(
+            "/proc/stat",
+            textwrap.dedent("""\
+                cpu   0 0 0 0 0 0 0 1 0 0
+                cpu0  0 0 0 0 0 0 0 1 0 0
+                cpu1  0 0 0 0 0 0 0 1 0 0
+                """).encode()) as m:
+            # first call to "percent" functions should read the new stat file
+            # and compare to the "real" file read at import time - so the
+            # values are meaningless
+            psutil.cpu_percent()
+            assert m.called
+            psutil.cpu_percent(percpu=True)
+            psutil.cpu_times_percent()
+            psutil.cpu_times_percent(percpu=True)
+
+        with mock_open_content(
+            "/proc/stat",
+            textwrap.dedent("""\
+                cpu   1 0 0 0 0 0 0 0 0 0
+                cpu0  1 0 0 0 0 0 0 0 0 0
+                cpu1  1 0 0 0 0 0 0 0 0 0
+                """).encode()) as m:
+            # Increase "user" while steal goes "backwards" to zero.
+            cpu_percent = psutil.cpu_percent()
+            assert m.called
+            cpu_percent_percpu = psutil.cpu_percent(percpu=True)
+            cpu_times_percent = psutil.cpu_times_percent()
+            cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True)
+            self.assertNotEqual(cpu_percent, 0)
+            self.assertNotEqual(sum(cpu_percent_percpu), 0)
+            self.assertNotEqual(sum(cpu_times_percent), 0)
+            self.assertNotEqual(sum(cpu_times_percent), 100.0)
+            self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0)
+            self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0)
+            self.assertEqual(cpu_times_percent.steal, 0)
+            self.assertNotEqual(cpu_times_percent.user, 0)
+
+    def test_boot_time_mocked(self):
+        with mock.patch('psutil._common.open', create=True) as m:
+            self.assertRaises(
+                RuntimeError,
+                psutil._pslinux.boot_time)
+            assert m.called
+
+    def test_users_mocked(self):
+        # Make sure ':0' and ':0.0' (returned by C ext) are converted
+        # to 'localhost'.
+        with mock.patch('psutil._pslinux.cext.users',
+                        return_value=[('giampaolo', 'pts/2', ':0',
+                                       1436573184.0, True, 2)]) as m:
+            self.assertEqual(psutil.users()[0].host, 'localhost')
+            assert m.called
+        with mock.patch('psutil._pslinux.cext.users',
+                        return_value=[('giampaolo', 'pts/2', ':0.0',
+                                       1436573184.0, True, 2)]) as m:
+            self.assertEqual(psutil.users()[0].host, 'localhost')
+            assert m.called
+        # ...otherwise it should be returned as-is
+        with mock.patch('psutil._pslinux.cext.users',
+                        return_value=[('giampaolo', 'pts/2', 'foo',
+                                       1436573184.0, True, 2)]) as m:
+            self.assertEqual(psutil.users()[0].host, 'foo')
+            assert m.called
+
+    def test_procfs_path(self):
+        tdir = tempfile.mkdtemp()
+        try:
+            psutil.PROCFS_PATH = tdir
+            self.assertRaises(IOError, psutil.virtual_memory)
+            self.assertRaises(IOError, psutil.cpu_times)
+            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
+            self.assertRaises(IOError, psutil.boot_time)
+            # self.assertRaises(IOError, psutil.pids)
+            self.assertRaises(IOError, psutil.net_connections)
+            self.assertRaises(IOError, psutil.net_io_counters)
+            self.assertRaises(IOError, psutil.net_if_stats)
+            # self.assertRaises(IOError, psutil.disk_io_counters)
+            self.assertRaises(IOError, psutil.disk_partitions)
+            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
+        finally:
+            psutil.PROCFS_PATH = "/proc"
+            os.rmdir(tdir)
+
+    def test_issue_687(self):
+        # In case of thread ID:
+        # - pid_exists() is supposed to return False
+        # - Process(tid) is supposed to work
+        # - pids() should not return the TID
+        # See: https://github.com/giampaolo/psutil/issues/687
+        t = ThreadTask()
+        t.start()
+        try:
+            p = psutil.Process()
+            tid = p.threads()[1].id
+            assert not psutil.pid_exists(tid), tid
+            pt = psutil.Process(tid)
+            pt.as_dict()
+            self.assertNotIn(tid, psutil.pids())
+        finally:
+            t.stop()
+
+    def test_pid_exists_no_proc_status(self):
+        # Internally pid_exists relies on /proc/{pid}/status.
+        # Emulate a case where this file is empty in which case
+        # psutil is supposed to fall back on using pids().
+        with mock_open_content("/proc/%s/status", "") as m:
+            assert psutil.pid_exists(os.getpid())
+            assert m.called
+
+
+# =====================================================================
+# --- sensors
+# =====================================================================
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+@unittest.skipIf(not HAS_BATTERY, "no battery")
+class TestSensorsBattery(unittest.TestCase):
+
+    @unittest.skipIf(not which("acpi"), "acpi utility not available")
+    def test_percent(self):
+        out = sh("acpi -b")
+        acpi_value = int(out.split(",")[1].strip().replace('%', ''))
+        psutil_value = psutil.sensors_battery().percent
+        self.assertAlmostEqual(acpi_value, psutil_value, delta=1)
+
+    @unittest.skipIf(not which("acpi"), "acpi utility not available")
+    def test_power_plugged(self):
+        out = sh("acpi -b")
+        if 'unknown' in out.lower():
+            return unittest.skip("acpi output not reliable")
+        if 'discharging at zero rate' in out:
+            plugged = True
+        else:
+            plugged = "Charging" in out.split('\n')[0]
+        self.assertEqual(psutil.sensors_battery().power_plugged, plugged)
+
+    def test_emulate_power_plugged(self):
+        # Pretend the AC power cable is connected.
+        def open_mock(name, *args, **kwargs):
+            if name.endswith("AC0/online") or name.endswith("AC/online"):
+                return io.BytesIO(b"1")
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock) as m:
+            self.assertEqual(psutil.sensors_battery().power_plugged, True)
+            self.assertEqual(
+                psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED)
+            assert m.called
+
+    def test_emulate_power_plugged_2(self):
+        # Same as above but pretend /AC0/online does not exist in which
+        # case code relies on /status file.
+        def open_mock(name, *args, **kwargs):
+            if name.endswith("AC0/online") or name.endswith("AC/online"):
+                raise IOError(errno.ENOENT, "")
+            elif name.endswith("/status"):
+                return io.StringIO(u("charging"))
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock) as m:
+            self.assertEqual(psutil.sensors_battery().power_plugged, True)
+            assert m.called
+
+    def test_emulate_power_not_plugged(self):
+        # Pretend the AC power cable is not connected.
+        def open_mock(name, *args, **kwargs):
+            if name.endswith("AC0/online") or name.endswith("AC/online"):
+                return io.BytesIO(b"0")
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock) as m:
+            self.assertEqual(psutil.sensors_battery().power_plugged, False)
+            assert m.called
+
+    def test_emulate_power_not_plugged_2(self):
+        # Same as above but pretend /AC0/online does not exist in which
+        # case code relies on /status file.
+        def open_mock(name, *args, **kwargs):
+            if name.endswith("AC0/online") or name.endswith("AC/online"):
+                raise IOError(errno.ENOENT, "")
+            elif name.endswith("/status"):
+                return io.StringIO(u("discharging"))
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock) as m:
+            self.assertEqual(psutil.sensors_battery().power_plugged, False)
+            assert m.called
+
+    def test_emulate_power_undetermined(self):
+        # Pretend we can't know whether the AC power cable not
+        # connected (assert fallback to False).
+        def open_mock(name, *args, **kwargs):
+            if name.startswith("/sys/class/power_supply/AC0/online") or \
+                    name.startswith("/sys/class/power_supply/AC/online"):
+                raise IOError(errno.ENOENT, "")
+            elif name.startswith("/sys/class/power_supply/BAT0/status"):
+                return io.BytesIO(b"???")
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock) as m:
+            self.assertIsNone(psutil.sensors_battery().power_plugged)
+            assert m.called
+
+    def test_emulate_no_base_files(self):
+        # Emulate a case where base metrics files are not present,
+        # in which case we're supposed to get None.
+        with mock_open_exception(
+                "/sys/class/power_supply/BAT0/energy_now",
+                IOError(errno.ENOENT, "")):
+            with mock_open_exception(
+                    "/sys/class/power_supply/BAT0/charge_now",
+                    IOError(errno.ENOENT, "")):
+                self.assertIsNone(psutil.sensors_battery())
+
+    def test_emulate_energy_full_0(self):
+        # Emulate a case where energy_full files returns 0.
+        with mock_open_content(
+                "/sys/class/power_supply/BAT0/energy_full", b"0") as m:
+            self.assertEqual(psutil.sensors_battery().percent, 0)
+            assert m.called
+
+    def test_emulate_energy_full_not_avail(self):
+        # Emulate a case where energy_full file does not exist.
+        # Expected fallback on /capacity.
+        with mock_open_exception(
+                "/sys/class/power_supply/BAT0/energy_full",
+                IOError(errno.ENOENT, "")):
+            with mock_open_exception(
+                    "/sys/class/power_supply/BAT0/charge_full",
+                    IOError(errno.ENOENT, "")):
+                with mock_open_content(
+                        "/sys/class/power_supply/BAT0/capacity", b"88"):
+                    self.assertEqual(psutil.sensors_battery().percent, 88)
+
+    def test_emulate_no_power(self):
+        # Emulate a case where /AC0/online file nor /BAT0/status exist.
+        with mock_open_exception(
+                "/sys/class/power_supply/AC/online",
+                IOError(errno.ENOENT, "")):
+            with mock_open_exception(
+                    "/sys/class/power_supply/AC0/online",
+                    IOError(errno.ENOENT, "")):
+                with mock_open_exception(
+                        "/sys/class/power_supply/BAT0/status",
+                        IOError(errno.ENOENT, "")):
+                    self.assertIsNone(psutil.sensors_battery().power_plugged)
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSensorsTemperatures(unittest.TestCase):
+
+    def test_emulate_class_hwmon(self):
+        def open_mock(name, *args, **kwargs):
+            if name.endswith('/name'):
+                return io.StringIO(u("name"))
+            elif name.endswith('/temp1_label'):
+                return io.StringIO(u("label"))
+            elif name.endswith('/temp1_input'):
+                return io.BytesIO(b"30000")
+            elif name.endswith('/temp1_max'):
+                return io.BytesIO(b"40000")
+            elif name.endswith('/temp1_crit'):
+                return io.BytesIO(b"50000")
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock):
+            # Test case with /sys/class/hwmon
+            with mock.patch('glob.glob',
+                            return_value=['/sys/class/hwmon/hwmon0/temp1']):
+                temp = psutil.sensors_temperatures()['name'][0]
+                self.assertEqual(temp.label, 'label')
+                self.assertEqual(temp.current, 30.0)
+                self.assertEqual(temp.high, 40.0)
+                self.assertEqual(temp.critical, 50.0)
+
+    def test_emulate_class_thermal(self):
+        def open_mock(name, *args, **kwargs):
+            if name.endswith('0_temp'):
+                return io.BytesIO(b"50000")
+            elif name.endswith('temp'):
+                return io.BytesIO(b"30000")
+            elif name.endswith('0_type'):
+                return io.StringIO(u("critical"))
+            elif name.endswith('type'):
+                return io.StringIO(u("name"))
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        def glob_mock(path):
+            if path == '/sys/class/hwmon/hwmon*/temp*_*':
+                return []
+            elif path == '/sys/class/hwmon/hwmon*/device/temp*_*':
+                return []
+            elif path == '/sys/class/thermal/thermal_zone*':
+                return ['/sys/class/thermal/thermal_zone0']
+            elif path == '/sys/class/thermal/thermal_zone0/trip_point*':
+                return ['/sys/class/thermal/thermal_zone1/trip_point_0_type',
+                        '/sys/class/thermal/thermal_zone1/trip_point_0_temp']
+            return []
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock):
+            with mock.patch('glob.glob', create=True, side_effect=glob_mock):
+                temp = psutil.sensors_temperatures()['name'][0]
+                self.assertEqual(temp.label, '')
+                self.assertEqual(temp.current, 30.0)
+                self.assertEqual(temp.high, 50.0)
+                self.assertEqual(temp.critical, 50.0)
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestSensorsFans(unittest.TestCase):
+
+    def test_emulate_data(self):
+        def open_mock(name, *args, **kwargs):
+            if name.endswith('/name'):
+                return io.StringIO(u("name"))
+            elif name.endswith('/fan1_label'):
+                return io.StringIO(u("label"))
+            elif name.endswith('/fan1_input'):
+                return io.StringIO(u("2000"))
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock):
+            with mock.patch('glob.glob',
+                            return_value=['/sys/class/hwmon/hwmon2/fan1']):
+                fan = psutil.sensors_fans()['name'][0]
+                self.assertEqual(fan.label, 'label')
+                self.assertEqual(fan.current, 2000)
+
+
+# =====================================================================
+# --- test process
+# =====================================================================
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestProcess(unittest.TestCase):
+
+    def setUp(self):
+        safe_rmpath(TESTFN)
+
+    tearDown = setUp
+
+    def test_memory_full_info(self):
+        src = textwrap.dedent("""
+            import time
+            with open("%s", "w") as f:
+                time.sleep(10)
+            """ % TESTFN)
+        sproc = pyrun(src)
+        self.addCleanup(reap_children)
+        call_until(lambda: os.listdir('.'), "'%s' not in ret" % TESTFN)
+        p = psutil.Process(sproc.pid)
+        time.sleep(.1)
+        mem = p.memory_full_info()
+        maps = p.memory_maps(grouped=False)
+        self.assertAlmostEqual(
+            mem.uss, sum([x.private_dirty + x.private_clean for x in maps]),
+            delta=4096)
+        self.assertAlmostEqual(
+            mem.pss, sum([x.pss for x in maps]), delta=4096)
+        self.assertAlmostEqual(
+            mem.swap, sum([x.swap for x in maps]), delta=4096)
+
+    def test_memory_full_info_mocked(self):
+        # See: https://github.com/giampaolo/psutil/issues/1222
+        with mock_open_content(
+            "/proc/%s/smaps" % os.getpid(),
+            textwrap.dedent("""\
+                fffff0 r-xp 00000000 00:00 0                  [vsyscall]
+                Size:                  1 kB
+                Rss:                   2 kB
+                Pss:                   3 kB
+                Shared_Clean:          4 kB
+                Shared_Dirty:          5 kB
+                Private_Clean:         6 kB
+                Private_Dirty:         7 kB
+                Referenced:            8 kB
+                Anonymous:             9 kB
+                LazyFree:              10 kB
+                AnonHugePages:         11 kB
+                ShmemPmdMapped:        12 kB
+                Shared_Hugetlb:        13 kB
+                Private_Hugetlb:       14 kB
+                Swap:                  15 kB
+                SwapPss:               16 kB
+                KernelPageSize:        17 kB
+                MMUPageSize:           18 kB
+                Locked:                19 kB
+                VmFlags: rd ex
+                """).encode()) as m:
+            p = psutil.Process()
+            mem = p.memory_full_info()
+            assert m.called
+            self.assertEqual(mem.uss, (6 + 7 + 14) * 1024)
+            self.assertEqual(mem.pss, 3 * 1024)
+            self.assertEqual(mem.swap, 15 * 1024)
+
+    # On PYPY file descriptors are not closed fast enough.
+    @unittest.skipIf(PYPY, "unreliable on PYPY")
+    def test_open_files_mode(self):
+        def get_test_file():
+            p = psutil.Process()
+            giveup_at = time.time() + 2
+            while True:
+                for file in p.open_files():
+                    if file.path == os.path.abspath(TESTFN):
+                        return file
+                    elif time.time() > giveup_at:
+                        break
+            raise RuntimeError("timeout looking for test file")
+
+        #
+        with open(TESTFN, "w"):
+            self.assertEqual(get_test_file().mode, "w")
+        with open(TESTFN, "r"):
+            self.assertEqual(get_test_file().mode, "r")
+        with open(TESTFN, "a"):
+            self.assertEqual(get_test_file().mode, "a")
+        #
+        with open(TESTFN, "r+"):
+            self.assertEqual(get_test_file().mode, "r+")
+        with open(TESTFN, "w+"):
+            self.assertEqual(get_test_file().mode, "r+")
+        with open(TESTFN, "a+"):
+            self.assertEqual(get_test_file().mode, "a+")
+        # note: "x" bit is not supported
+        if PY3:
+            safe_rmpath(TESTFN)
+            with open(TESTFN, "x"):
+                self.assertEqual(get_test_file().mode, "w")
+            safe_rmpath(TESTFN)
+            with open(TESTFN, "x+"):
+                self.assertEqual(get_test_file().mode, "r+")
+
+    def test_open_files_file_gone(self):
+        # simulates a file which gets deleted during open_files()
+        # execution
+        p = psutil.Process()
+        files = p.open_files()
+        with tempfile.NamedTemporaryFile():
+            # give the kernel some time to see the new file
+            call_until(p.open_files, "len(ret) != %i" % len(files))
+            with mock.patch('psutil._pslinux.os.readlink',
+                            side_effect=OSError(errno.ENOENT, "")) as m:
+                files = p.open_files()
+                assert not files
+                assert m.called
+            # also simulate the case where os.readlink() returns EINVAL
+            # in which case psutil is supposed to 'continue'
+            with mock.patch('psutil._pslinux.os.readlink',
+                            side_effect=OSError(errno.EINVAL, "")) as m:
+                self.assertEqual(p.open_files(), [])
+                assert m.called
+
+    def test_open_files_fd_gone(self):
+        # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears
+        # while iterating through fds.
+        # https://travis-ci.org/giampaolo/psutil/jobs/225694530
+        p = psutil.Process()
+        files = p.open_files()
+        with tempfile.NamedTemporaryFile():
+            # give the kernel some time to see the new file
+            call_until(p.open_files, "len(ret) != %i" % len(files))
+            patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+            with mock.patch(patch_point,
+                            side_effect=IOError(errno.ENOENT, "")) as m:
+                files = p.open_files()
+                assert not files
+                assert m.called
+
+    # --- mocked tests
+
+    def test_terminal_mocked(self):
+        with mock.patch('psutil._pslinux._psposix.get_terminal_map',
+                        return_value={}) as m:
+            self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
+            assert m.called
+
+    # TODO: re-enable this test.
+    # def test_num_ctx_switches_mocked(self):
+    #     with mock.patch('psutil._common.open', create=True) as m:
+    #         self.assertRaises(
+    #             NotImplementedError,
+    #             psutil._pslinux.Process(os.getpid()).num_ctx_switches)
+    #         assert m.called
+
+    def test_cmdline_mocked(self):
+        # see: https://github.com/giampaolo/psutil/issues/639
+        p = psutil.Process()
+        fake_file = io.StringIO(u('foo\x00bar\x00'))
+        with mock.patch('psutil._common.open',
+                        return_value=fake_file, create=True) as m:
+            self.assertEqual(p.cmdline(), ['foo', 'bar'])
+            assert m.called
+        fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
+        with mock.patch('psutil._common.open',
+                        return_value=fake_file, create=True) as m:
+            self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
+            assert m.called
+
+    def test_cmdline_spaces_mocked(self):
+        # see: https://github.com/giampaolo/psutil/issues/1179
+        p = psutil.Process()
+        fake_file = io.StringIO(u('foo bar '))
+        with mock.patch('psutil._common.open',
+                        return_value=fake_file, create=True) as m:
+            self.assertEqual(p.cmdline(), ['foo', 'bar'])
+            assert m.called
+        fake_file = io.StringIO(u('foo bar  '))
+        with mock.patch('psutil._common.open',
+                        return_value=fake_file, create=True) as m:
+            self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
+            assert m.called
+
+    def test_cmdline_mixed_separators(self):
+        # https://github.com/giampaolo/psutil/issues/
+        #    1179#issuecomment-552984549
+        p = psutil.Process()
+        fake_file = io.StringIO(u('foo\x20bar\x00'))
+        with mock.patch('psutil._common.open',
+                        return_value=fake_file, create=True) as m:
+            self.assertEqual(p.cmdline(), ['foo', 'bar'])
+            assert m.called
+
+    def test_readlink_path_deleted_mocked(self):
+        with mock.patch('psutil._pslinux.os.readlink',
+                        return_value='/home/foo (deleted)'):
+            self.assertEqual(psutil.Process().exe(), "/home/foo")
+            self.assertEqual(psutil.Process().cwd(), "/home/foo")
+
+    def test_threads_mocked(self):
+        # Test the case where os.listdir() returns a file (thread)
+        # which no longer exists by the time we open() it (race
+        # condition). threads() is supposed to ignore that instead
+        # of raising NSP.
+        def open_mock(name, *args, **kwargs):
+            if name.startswith('/proc/%s/task' % os.getpid()):
+                raise IOError(errno.ENOENT, "")
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        orig_open = open
+        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
+        with mock.patch(patch_point, side_effect=open_mock) as m:
+            ret = psutil.Process().threads()
+            assert m.called
+            self.assertEqual(ret, [])
+
+        # ...but if it bumps into something != ENOENT we want an
+        # exception.
+        def open_mock(name, *args, **kwargs):
+            if name.startswith('/proc/%s/task' % os.getpid()):
+                raise IOError(errno.EPERM, "")
+            else:
+                return orig_open(name, *args, **kwargs)
+
+        with mock.patch(patch_point, side_effect=open_mock):
+            self.assertRaises(psutil.AccessDenied, psutil.Process().threads)
+
+    def test_exe_mocked(self):
+        with mock.patch('psutil._pslinux.readlink',
+                        side_effect=OSError(errno.ENOENT, "")) as m1:
+            with mock.patch('psutil.Process.cmdline',
+                            side_effect=psutil.AccessDenied(0, "")) as m2:
+                # No such file error; might be raised also if /proc/pid/exe
+                # path actually exists for system processes with low pids
+                # (about 0-20). In this case psutil is supposed to return
+                # an empty string.
+                ret = psutil.Process().exe()
+                assert m1.called
+                assert m2.called
+                self.assertEqual(ret, "")
+
+                # ...but if /proc/pid no longer exist we're supposed to treat
+                # it as an alias for zombie process
+                with mock.patch('psutil._pslinux.os.path.lexists',
+                                return_value=False):
+                    self.assertRaises(
+                        psutil.ZombieProcess, psutil.Process().exe)
+
+    def test_issue_1014(self):
+        # Emulates a case where smaps file does not exist. In this case
+        # wrap_exception decorator should not raise NoSuchProcess.
+        with mock_open_exception(
+                '/proc/%s/smaps' % os.getpid(),
+                IOError(errno.ENOENT, "")) as m:
+            p = psutil.Process()
+            with self.assertRaises(FileNotFoundError):
+                p.memory_maps()
+            assert m.called
+
+    @unittest.skipIf(not HAS_RLIMIT, "not supported")
+    def test_rlimit_zombie(self):
+        # Emulate a case where rlimit() raises ENOSYS, which may
+        # happen in case of zombie process:
+        # https://travis-ci.org/giampaolo/psutil/jobs/51368273
+        with mock.patch("psutil._pslinux.cext.linux_prlimit",
+                        side_effect=OSError(errno.ENOSYS, "")) as m:
+            p = psutil.Process()
+            p.name()
+            with self.assertRaises(psutil.ZombieProcess) as exc:
+                p.rlimit(psutil.RLIMIT_NOFILE)
+            assert m.called
+        self.assertEqual(exc.exception.pid, p.pid)
+        self.assertEqual(exc.exception.name, p.name())
+
+    def test_cwd_zombie(self):
+        with mock.patch("psutil._pslinux.os.readlink",
+                        side_effect=OSError(errno.ENOENT, "")) as m:
+            p = psutil.Process()
+            p.name()
+            with self.assertRaises(psutil.ZombieProcess) as exc:
+                p.cwd()
+            assert m.called
+        self.assertEqual(exc.exception.pid, p.pid)
+        self.assertEqual(exc.exception.name, p.name())
+
+    def test_stat_file_parsing(self):
+        from psutil._pslinux import CLOCK_TICKS
+
+        args = [
+            "0",      # pid
+            "(cat)",  # name
+            "Z",      # status
+            "1",      # ppid
+            "0",      # pgrp
+            "0",      # session
+            "0",      # tty
+            "0",      # tpgid
+            "0",      # flags
+            "0",      # minflt
+            "0",      # cminflt
+            "0",      # majflt
+            "0",      # cmajflt
+            "2",      # utime
+            "3",      # stime
+            "4",      # cutime
+            "5",      # cstime
+            "0",      # priority
+            "0",      # nice
+            "0",      # num_threads
+            "0",      # itrealvalue
+            "6",      # starttime
+            "0",      # vsize
+            "0",      # rss
+            "0",      # rsslim
+            "0",      # startcode
+            "0",      # endcode
+            "0",      # startstack
+            "0",      # kstkesp
+            "0",      # kstkeip
+            "0",      # signal
+            "0",      # blocked
+            "0",      # sigignore
+            "0",      # sigcatch
+            "0",      # wchan
+            "0",      # nswap
+            "0",      # cnswap
+            "0",      # exit_signal
+            "6",      # processor
+            "0",      # rt priority
+            "0",      # policy
+            "7",      # delayacct_blkio_ticks
+        ]
+        content = " ".join(args).encode()
+        with mock_open_content('/proc/%s/stat' % os.getpid(), content):
+            p = psutil.Process()
+            self.assertEqual(p.name(), 'cat')
+            self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
+            self.assertEqual(p.ppid(), 1)
+            self.assertEqual(
+                p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time())
+            cpu = p.cpu_times()
+            self.assertEqual(cpu.user, 2 / CLOCK_TICKS)
+            self.assertEqual(cpu.system, 3 / CLOCK_TICKS)
+            self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS)
+            self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS)
+            self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS)
+            self.assertEqual(p.cpu_num(), 6)
+
+    def test_status_file_parsing(self):
+        with mock_open_content(
+            '/proc/%s/status' % os.getpid(),
+            textwrap.dedent("""\
+                Uid:\t1000\t1001\t1002\t1003
+                Gid:\t1004\t1005\t1006\t1007
+                Threads:\t66
+                Cpus_allowed:\tf
+                Cpus_allowed_list:\t0-7
+                voluntary_ctxt_switches:\t12
+                nonvoluntary_ctxt_switches:\t13""").encode()):
+            p = psutil.Process()
+            self.assertEqual(p.num_ctx_switches().voluntary, 12)
+            self.assertEqual(p.num_ctx_switches().involuntary, 13)
+            self.assertEqual(p.num_threads(), 66)
+            uids = p.uids()
+            self.assertEqual(uids.real, 1000)
+            self.assertEqual(uids.effective, 1001)
+            self.assertEqual(uids.saved, 1002)
+            gids = p.gids()
+            self.assertEqual(gids.real, 1004)
+            self.assertEqual(gids.effective, 1005)
+            self.assertEqual(gids.saved, 1006)
+            self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8)))
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestProcessAgainstStatus(unittest.TestCase):
+    """/proc/pid/stat and /proc/pid/status have many values in common.
+    Whenever possible, psutil uses /proc/pid/stat (it's faster).
+    For all those cases we check that the value found in
+    /proc/pid/stat (by psutil) matches the one found in
+    /proc/pid/status.
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        cls.proc = psutil.Process()
+
+    def read_status_file(self, linestart):
+        with psutil._psplatform.open_text(
+                '/proc/%s/status' % self.proc.pid) as f:
+            for line in f:
+                line = line.strip()
+                if line.startswith(linestart):
+                    value = line.partition('\t')[2]
+                    try:
+                        return int(value)
+                    except ValueError:
+                        return value
+            raise ValueError("can't find %r" % linestart)
+
+    def test_name(self):
+        value = self.read_status_file("Name:")
+        self.assertEqual(self.proc.name(), value)
+
+    def test_status(self):
+        value = self.read_status_file("State:")
+        value = value[value.find('(') + 1:value.rfind(')')]
+        value = value.replace(' ', '-')
+        self.assertEqual(self.proc.status(), value)
+
+    def test_ppid(self):
+        value = self.read_status_file("PPid:")
+        self.assertEqual(self.proc.ppid(), value)
+
+    def test_num_threads(self):
+        value = self.read_status_file("Threads:")
+        self.assertEqual(self.proc.num_threads(), value)
+
+    def test_uids(self):
+        value = self.read_status_file("Uid:")
+        value = tuple(map(int, value.split()[1:4]))
+        self.assertEqual(self.proc.uids(), value)
+
+    def test_gids(self):
+        value = self.read_status_file("Gid:")
+        value = tuple(map(int, value.split()[1:4]))
+        self.assertEqual(self.proc.gids(), value)
+
+    @retry_on_failure()
+    def test_num_ctx_switches(self):
+        value = self.read_status_file("voluntary_ctxt_switches:")
+        self.assertEqual(self.proc.num_ctx_switches().voluntary, value)
+        value = self.read_status_file("nonvoluntary_ctxt_switches:")
+        self.assertEqual(self.proc.num_ctx_switches().involuntary, value)
+
+    def test_cpu_affinity(self):
+        value = self.read_status_file("Cpus_allowed_list:")
+        if '-' in str(value):
+            min_, max_ = map(int, value.split('-'))
+            self.assertEqual(
+                self.proc.cpu_affinity(), list(range(min_, max_ + 1)))
+
+    def test_cpu_affinity_eligible_cpus(self):
+        value = self.read_status_file("Cpus_allowed_list:")
+        with mock.patch("psutil._pslinux.per_cpu_times") as m:
+            self.proc._proc._get_eligible_cpus()
+        if '-' in str(value):
+            assert not m.called
+        else:
+            assert m.called
+
+
+# =====================================================================
+# --- test utils
+# =====================================================================
+
+
+@unittest.skipIf(not LINUX, "LINUX only")
+class TestUtils(unittest.TestCase):
+
+    def test_readlink(self):
+        with mock.patch("os.readlink", return_value="foo (deleted)") as m:
+            self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
+            assert m.called
+
+    def test_cat(self):
+        fname = os.path.abspath(TESTFN)
+        with open(fname, "wt") as f:
+            f.write("foo ")
+        self.assertEqual(psutil._psplatform.cat(TESTFN, binary=False), "foo")
+        self.assertEqual(psutil._psplatform.cat(TESTFN, binary=True), b"foo")
+        self.assertEqual(
+            psutil._psplatform.cat(TESTFN + '??', fallback="bar"), "bar")
+
+
+if __name__ == '__main__':
+    from psutil.tests.runner import run
+    run(__file__)