Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_osx.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:32:28 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 0:d30785e31577 | 1:56ad4e20f292 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 4 # Use of this source code is governed by a BSD-style license that can be | |
| 5 # found in the LICENSE file. | |
| 6 | |
| 7 """MACOS specific tests.""" | |
| 8 | |
| 9 import os | |
| 10 import re | |
| 11 import time | |
| 12 | |
| 13 import psutil | |
| 14 from psutil import MACOS | |
| 15 from psutil.tests import HAS_BATTERY | |
| 16 from psutil.tests import PsutilTestCase | |
| 17 from psutil.tests import retry_on_failure | |
| 18 from psutil.tests import sh | |
| 19 from psutil.tests import spawn_testproc | |
| 20 from psutil.tests import terminate | |
| 21 from psutil.tests import TOLERANCE_DISK_USAGE | |
| 22 from psutil.tests import TOLERANCE_SYS_MEM | |
| 23 from psutil.tests import unittest | |
| 24 | |
| 25 | |
| 26 PAGESIZE = os.sysconf("SC_PAGE_SIZE") if MACOS else None | |
| 27 | |
| 28 | |
| 29 def sysctl(cmdline): | |
| 30 """Expects a sysctl command with an argument and parse the result | |
| 31 returning only the value of interest. | |
| 32 """ | |
| 33 out = sh(cmdline) | |
| 34 result = out.split()[1] | |
| 35 try: | |
| 36 return int(result) | |
| 37 except ValueError: | |
| 38 return result | |
| 39 | |
| 40 | |
| 41 def vm_stat(field): | |
| 42 """Wrapper around 'vm_stat' cmdline utility.""" | |
| 43 out = sh('vm_stat') | |
| 44 for line in out.split('\n'): | |
| 45 if field in line: | |
| 46 break | |
| 47 else: | |
| 48 raise ValueError("line not found") | |
| 49 return int(re.search(r'\d+', line).group(0)) * PAGESIZE | |
| 50 | |
| 51 | |
| 52 # http://code.activestate.com/recipes/578019/ | |
| 53 def human2bytes(s): | |
| 54 SYMBOLS = { | |
| 55 'customary': ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'), | |
| 56 } | |
| 57 init = s | |
| 58 num = "" | |
| 59 while s and s[0:1].isdigit() or s[0:1] == '.': | |
| 60 num += s[0] | |
| 61 s = s[1:] | |
| 62 num = float(num) | |
| 63 letter = s.strip() | |
| 64 for name, sset in SYMBOLS.items(): | |
| 65 if letter in sset: | |
| 66 break | |
| 67 else: | |
| 68 if letter == 'k': | |
| 69 sset = SYMBOLS['customary'] | |
| 70 letter = letter.upper() | |
| 71 else: | |
| 72 raise ValueError("can't interpret %r" % init) | |
| 73 prefix = {sset[0]: 1} | |
| 74 for i, s in enumerate(sset[1:]): | |
| 75 prefix[s] = 1 << (i + 1) * 10 | |
| 76 return int(num * prefix[letter]) | |
| 77 | |
| 78 | |
| 79 @unittest.skipIf(not MACOS, "MACOS only") | |
| 80 class TestProcess(PsutilTestCase): | |
| 81 | |
| 82 @classmethod | |
| 83 def setUpClass(cls): | |
| 84 cls.pid = spawn_testproc().pid | |
| 85 | |
| 86 @classmethod | |
| 87 def tearDownClass(cls): | |
| 88 terminate(cls.pid) | |
| 89 | |
| 90 def test_process_create_time(self): | |
| 91 output = sh("ps -o lstart -p %s" % self.pid) | |
| 92 start_ps = output.replace('STARTED', '').strip() | |
| 93 hhmmss = start_ps.split(' ')[-2] | |
| 94 year = start_ps.split(' ')[-1] | |
| 95 start_psutil = psutil.Process(self.pid).create_time() | |
| 96 self.assertEqual( | |
| 97 hhmmss, | |
| 98 time.strftime("%H:%M:%S", time.localtime(start_psutil))) | |
| 99 self.assertEqual( | |
| 100 year, | |
| 101 time.strftime("%Y", time.localtime(start_psutil))) | |
| 102 | |
| 103 | |
| 104 @unittest.skipIf(not MACOS, "MACOS only") | |
| 105 class TestSystemAPIs(PsutilTestCase): | |
| 106 | |
| 107 # --- disk | |
| 108 | |
| 109 @retry_on_failure() | |
| 110 def test_disks(self): | |
| 111 # test psutil.disk_usage() and psutil.disk_partitions() | |
| 112 # against "df -a" | |
| 113 def df(path): | |
| 114 out = sh('df -k "%s"' % path).strip() | |
| 115 lines = out.split('\n') | |
| 116 lines.pop(0) | |
| 117 line = lines.pop(0) | |
| 118 dev, total, used, free = line.split()[:4] | |
| 119 if dev == 'none': | |
| 120 dev = '' | |
| 121 total = int(total) * 1024 | |
| 122 used = int(used) * 1024 | |
| 123 free = int(free) * 1024 | |
| 124 return dev, total, used, free | |
| 125 | |
| 126 for part in psutil.disk_partitions(all=False): | |
| 127 usage = psutil.disk_usage(part.mountpoint) | |
| 128 dev, total, used, free = df(part.mountpoint) | |
| 129 self.assertEqual(part.device, dev) | |
| 130 self.assertEqual(usage.total, total) | |
| 131 self.assertAlmostEqual(usage.free, free, | |
| 132 delta=TOLERANCE_DISK_USAGE) | |
| 133 self.assertAlmostEqual(usage.used, used, | |
| 134 delta=TOLERANCE_DISK_USAGE) | |
| 135 | |
| 136 # --- cpu | |
| 137 | |
| 138 def test_cpu_count_logical(self): | |
| 139 num = sysctl("sysctl hw.logicalcpu") | |
| 140 self.assertEqual(num, psutil.cpu_count(logical=True)) | |
| 141 | |
| 142 def test_cpu_count_physical(self): | |
| 143 num = sysctl("sysctl hw.physicalcpu") | |
| 144 self.assertEqual(num, psutil.cpu_count(logical=False)) | |
| 145 | |
| 146 def test_cpu_freq(self): | |
| 147 freq = psutil.cpu_freq() | |
| 148 self.assertEqual( | |
| 149 freq.current * 1000 * 1000, sysctl("sysctl hw.cpufrequency")) | |
| 150 self.assertEqual( | |
| 151 freq.min * 1000 * 1000, sysctl("sysctl hw.cpufrequency_min")) | |
| 152 self.assertEqual( | |
| 153 freq.max * 1000 * 1000, sysctl("sysctl hw.cpufrequency_max")) | |
| 154 | |
| 155 # --- virtual mem | |
| 156 | |
| 157 def test_vmem_total(self): | |
| 158 sysctl_hwphymem = sysctl('sysctl hw.memsize') | |
| 159 self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total) | |
| 160 | |
| 161 @retry_on_failure() | |
| 162 def test_vmem_free(self): | |
| 163 vmstat_val = vm_stat("free") | |
| 164 psutil_val = psutil.virtual_memory().free | |
| 165 self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM) | |
| 166 | |
| 167 @retry_on_failure() | |
| 168 def test_vmem_active(self): | |
| 169 vmstat_val = vm_stat("active") | |
| 170 psutil_val = psutil.virtual_memory().active | |
| 171 self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM) | |
| 172 | |
| 173 @retry_on_failure() | |
| 174 def test_vmem_inactive(self): | |
| 175 vmstat_val = vm_stat("inactive") | |
| 176 psutil_val = psutil.virtual_memory().inactive | |
| 177 self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM) | |
| 178 | |
| 179 @retry_on_failure() | |
| 180 def test_vmem_wired(self): | |
| 181 vmstat_val = vm_stat("wired") | |
| 182 psutil_val = psutil.virtual_memory().wired | |
| 183 self.assertAlmostEqual(psutil_val, vmstat_val, delta=TOLERANCE_SYS_MEM) | |
| 184 | |
| 185 # --- swap mem | |
| 186 | |
| 187 @retry_on_failure() | |
| 188 def test_swapmem_sin(self): | |
| 189 vmstat_val = vm_stat("Pageins") | |
| 190 psutil_val = psutil.swap_memory().sin | |
| 191 self.assertEqual(psutil_val, vmstat_val) | |
| 192 | |
| 193 @retry_on_failure() | |
| 194 def test_swapmem_sout(self): | |
| 195 vmstat_val = vm_stat("Pageout") | |
| 196 psutil_val = psutil.swap_memory().sout | |
| 197 self.assertEqual(psutil_val, vmstat_val) | |
| 198 | |
| 199 # Not very reliable. | |
| 200 # def test_swapmem_total(self): | |
| 201 # out = sh('sysctl vm.swapusage') | |
| 202 # out = out.replace('vm.swapusage: ', '') | |
| 203 # total, used, free = re.findall('\d+.\d+\w', out) | |
| 204 # psutil_smem = psutil.swap_memory() | |
| 205 # self.assertEqual(psutil_smem.total, human2bytes(total)) | |
| 206 # self.assertEqual(psutil_smem.used, human2bytes(used)) | |
| 207 # self.assertEqual(psutil_smem.free, human2bytes(free)) | |
| 208 | |
| 209 # --- network | |
| 210 | |
| 211 def test_net_if_stats(self): | |
| 212 for name, stats in psutil.net_if_stats().items(): | |
| 213 try: | |
| 214 out = sh("ifconfig %s" % name) | |
| 215 except RuntimeError: | |
| 216 pass | |
| 217 else: | |
| 218 self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) | |
| 219 self.assertEqual(stats.mtu, | |
| 220 int(re.findall(r'mtu (\d+)', out)[0])) | |
| 221 | |
| 222 # --- sensors_battery | |
| 223 | |
| 224 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 225 def test_sensors_battery(self): | |
| 226 out = sh("pmset -g batt") | |
| 227 percent = re.search(r"(\d+)%", out).group(1) | |
| 228 drawing_from = re.search("Now drawing from '([^']+)'", out).group(1) | |
| 229 power_plugged = drawing_from == "AC Power" | |
| 230 psutil_result = psutil.sensors_battery() | |
| 231 self.assertEqual(psutil_result.power_plugged, power_plugged) | |
| 232 self.assertEqual(psutil_result.percent, int(percent)) | |
| 233 | |
| 234 | |
| 235 if __name__ == '__main__': | |
| 236 from psutil.tests.runner import run_from_name | |
| 237 run_from_name(__file__) |
