Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/psutil/tests/test_osx.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/psutil/tests/test_osx.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""MACOS specific tests.""" + +import os +import re +import time + +import psutil +from psutil import MACOS +from psutil.tests import HAS_BATTERY +from psutil.tests import PsutilTestCase +from psutil.tests import retry_on_failure +from psutil.tests import sh +from psutil.tests import spawn_testproc +from psutil.tests import terminate +from psutil.tests import TOLERANCE_DISK_USAGE +from psutil.tests import TOLERANCE_SYS_MEM +from psutil.tests import unittest + + +PAGESIZE = os.sysconf("SC_PAGE_SIZE") if MACOS else None + + +def sysctl(cmdline): + """Expects a sysctl command with an argument and parse the result + returning only the value of interest. + """ + out = sh(cmdline) + result = out.split()[1] + try: + return int(result) + except ValueError: + return result + + +def vm_stat(field): + """Wrapper around 'vm_stat' cmdline utility.""" + out = sh('vm_stat') + for line in out.split('\n'): + if field in line: + break + else: + raise ValueError("line not found") + return int(re.search(r'\d+', line).group(0)) * PAGESIZE + + +# http://code.activestate.com/recipes/578019/ +def human2bytes(s): + SYMBOLS = { + 'customary': ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'), + } + init = s + num = "" + while s and s[0:1].isdigit() or s[0:1] == '.': + num += s[0] + s = s[1:] + num = float(num) + letter = s.strip() + for name, sset in SYMBOLS.items(): + if letter in sset: + break + else: + if letter == 'k': + sset = SYMBOLS['customary'] + letter = letter.upper() + else: + raise ValueError("can't interpret %r" % init) + prefix = {sset[0]: 1} + for i, s in enumerate(sset[1:]): + prefix[s] = 1 << (i + 1) * 10 + return int(num * prefix[letter]) + + +@unittest.skipIf(not MACOS, "MACOS only") +class TestProcess(PsutilTestCase): + + @classmethod + def setUpClass(cls): + cls.pid = spawn_testproc().pid + + @classmethod + def tearDownClass(cls): + terminate(cls.pid) + + def test_process_create_time(self): + output = sh("ps -o lstart -p %s" % self.pid) + start_ps = output.replace('STARTED', '').strip() + hhmmss = start_ps.split(' ')[-2] + year = start_ps.split(' ')[-1] + start_psutil = psutil.Process(self.pid).create_time() + self.assertEqual( + hhmmss, + time.strftime("%H:%M:%S", time.localtime(start_psutil))) + self.assertEqual( + year, + time.strftime("%Y", time.localtime(start_psutil))) + + +@unittest.skipIf(not MACOS, "MACOS only") +class TestSystemAPIs(PsutilTestCase): + + # --- disk + + @retry_on_failure() + def test_disks(self): + # test psutil.disk_usage() and psutil.disk_partitions() + # against "df -a" + def df(path): + out = sh('df -k "%s"' % path).strip() + lines = out.split('\n') + lines.pop(0) + line = lines.pop(0) + dev, total, used, free = line.split()[:4] + if dev == 'none': + dev = '' + total = int(total) * 1024 + used = int(used) * 1024 + free = int(free) * 1024 + return dev, total, used, free + + for part in psutil.disk_partitions(all=False): + usage = psutil.disk_usage(part.mountpoint) + dev, total, used, free = df(part.mountpoint) + self.assertEqual(part.device, dev) + self.assertEqual(usage.total, total) + self.assertAlmostEqual(usage.free, free, + delta=TOLERANCE_DISK_USAGE) + self.assertAlmostEqual(usage.used, used, + delta=TOLERANCE_DISK_USAGE) + + # --- cpu + + def test_cpu_count_logical(self): + num = sysctl("sysctl hw.logicalcpu") + self.assertEqual(num, psutil.cpu_count(logical=True)) + + def test_cpu_count_physical(self): + num = sysctl("sysctl hw.physicalcpu") + self.assertEqual(num, psutil.cpu_count(logical=False)) + + def test_cpu_freq(self): + freq = psutil.cpu_freq() + self.assertEqual( + freq.current * 1000 * 1000, sysctl("sysctl hw.cpufrequency")) + self.assertEqual( + freq.min * 1000 * 1000, sysctl("sysctl hw.cpufrequency_min")) + self.assertEqual( + freq.max * 1000 * 1000, sysctl("sysctl hw.cpufrequency_max")) + + # --- virtual mem + + def test_vmem_total(self): + sysctl_hwphymem = sysctl('sysctl hw.memsize') + self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total) + + @retry_on_failure() + def test_vmem_free(self): + vmstat_val = vm_stat("free") + psutil_val = psutil.virtual_memory().free + self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM) + + @retry_on_failure() + def test_vmem_active(self): + vmstat_val = vm_stat("active") + psutil_val = psutil.virtual_memory().active + self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM) + + @retry_on_failure() + def test_vmem_inactive(self): + vmstat_val = vm_stat("inactive") + psutil_val = psutil.virtual_memory().inactive + self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM) + + @retry_on_failure() + def test_vmem_wired(self): + vmstat_val = vm_stat("wired") + psutil_val = psutil.virtual_memory().wired + self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM) + + # --- swap mem + + @retry_on_failure() + def test_swapmem_sin(self): + vmstat_val = vm_stat("Pageins") + psutil_val = psutil.swap_memory().sin + self.assertEqual(psutil_val, vmstat_val) + + @retry_on_failure() + def test_swapmem_sout(self): + vmstat_val = vm_stat("Pageout") + psutil_val = psutil.swap_memory().sout + self.assertEqual(psutil_val, vmstat_val) + + # Not very reliable. + # def test_swapmem_total(self): + # out = sh('sysctl vm.swapusage') + # out = out.replace('vm.swapusage: ', '') + # total, used, free = re.findall('\d+.\d+\w', out) + # psutil_smem = psutil.swap_memory() + # self.assertEqual(psutil_smem.total, human2bytes(total)) + # self.assertEqual(psutil_smem.used, human2bytes(used)) + # self.assertEqual(psutil_smem.free, human2bytes(free)) + + # --- network + + def test_net_if_stats(self): + for name, stats in psutil.net_if_stats().items(): + try: + out = sh("ifconfig %s" % name) + except RuntimeError: + pass + else: + self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) + self.assertEqual(stats.mtu, + int(re.findall(r'mtu (\d+)', out)[0])) + + # --- sensors_battery + + @unittest.skipIf(not HAS_BATTERY, "no battery") + def test_sensors_battery(self): + out = sh("pmset -g batt") + percent = re.search(r"(\d+)%", out).group(1) + drawing_from = re.search("Now drawing from '([^']+)'", out).group(1) + power_plugged = drawing_from == "AC Power" + psutil_result = psutil.sensors_battery() + self.assertEqual(psutil_result.power_plugged, power_plugged) + self.assertEqual(psutil_result.percent, int(percent)) + + +if __name__ == '__main__': + from psutil.tests.runner import run_from_name + run_from_name(__file__)