Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/psutil/tests/test_posix.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
| author | shellac |
|---|---|
| date | Thu, 14 May 2020 14:56:58 -0400 |
| parents | 26e78fe6e8c4 |
| children |
comparison
equal
deleted
inserted
replaced
| 1:75ca89e9b81c | 2:6af9afd405e9 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 # -*- coding: utf-8 -*- | |
| 3 | |
| 4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 5 # Use of this source code is governed by a BSD-style license that can be | |
| 6 # found in the LICENSE file. | |
| 7 | |
| 8 """POSIX specific tests.""" | |
| 9 | |
| 10 import datetime | |
| 11 import errno | |
| 12 import os | |
| 13 import re | |
| 14 import subprocess | |
| 15 import time | |
| 16 | |
| 17 import psutil | |
| 18 from psutil import AIX | |
| 19 from psutil import BSD | |
| 20 from psutil import LINUX | |
| 21 from psutil import MACOS | |
| 22 from psutil import OPENBSD | |
| 23 from psutil import POSIX | |
| 24 from psutil import SUNOS | |
| 25 from psutil.tests import CI_TESTING | |
| 26 from psutil.tests import get_kernel_version | |
| 27 from psutil.tests import get_test_subprocess | |
| 28 from psutil.tests import HAS_NET_IO_COUNTERS | |
| 29 from psutil.tests import mock | |
| 30 from psutil.tests import PYTHON_EXE | |
| 31 from psutil.tests import reap_children | |
| 32 from psutil.tests import retry_on_failure | |
| 33 from psutil.tests import sh | |
| 34 from psutil.tests import skip_on_access_denied | |
| 35 from psutil.tests import TRAVIS | |
| 36 from psutil.tests import unittest | |
| 37 from psutil.tests import wait_for_pid | |
| 38 from psutil.tests import which | |
| 39 | |
| 40 | |
| 41 def ps(fmt, pid=None): | |
| 42 """ | |
| 43 Wrapper for calling the ps command with a little bit of cross-platform | |
| 44 support for a narrow range of features. | |
| 45 """ | |
| 46 | |
| 47 cmd = ['ps'] | |
| 48 | |
| 49 if LINUX: | |
| 50 cmd.append('--no-headers') | |
| 51 | |
| 52 if pid is not None: | |
| 53 cmd.extend(['-p', str(pid)]) | |
| 54 else: | |
| 55 if SUNOS or AIX: | |
| 56 cmd.append('-A') | |
| 57 else: | |
| 58 cmd.append('ax') | |
| 59 | |
| 60 if SUNOS: | |
| 61 fmt_map = set(('command', 'comm', 'start', 'stime')) | |
| 62 fmt = fmt_map.get(fmt, fmt) | |
| 63 | |
| 64 cmd.extend(['-o', fmt]) | |
| 65 | |
| 66 output = sh(cmd) | |
| 67 | |
| 68 if LINUX: | |
| 69 output = output.splitlines() | |
| 70 else: | |
| 71 output = output.splitlines()[1:] | |
| 72 | |
| 73 all_output = [] | |
| 74 for line in output: | |
| 75 line = line.strip() | |
| 76 | |
| 77 try: | |
| 78 line = int(line) | |
| 79 except ValueError: | |
| 80 pass | |
| 81 | |
| 82 all_output.append(line) | |
| 83 | |
| 84 if pid is None: | |
| 85 return all_output | |
| 86 else: | |
| 87 return all_output[0] | |
| 88 | |
| 89 # ps "-o" field names differ wildly between platforms. | |
| 90 # "comm" means "only executable name" but is not available on BSD platforms. | |
| 91 # "args" means "command with all its arguments", and is also not available | |
| 92 # on BSD platforms. | |
| 93 # "command" is like "args" on most platforms, but like "comm" on AIX, | |
| 94 # and not available on SUNOS. | |
| 95 # so for the executable name we can use "comm" on Solaris and split "command" | |
| 96 # on other platforms. | |
| 97 # to get the cmdline (with args) we have to use "args" on AIX and | |
| 98 # Solaris, and can use "command" on all others. | |
| 99 | |
| 100 | |
| 101 def ps_name(pid): | |
| 102 field = "command" | |
| 103 if SUNOS: | |
| 104 field = "comm" | |
| 105 return ps(field, pid).split()[0] | |
| 106 | |
| 107 | |
| 108 def ps_args(pid): | |
| 109 field = "command" | |
| 110 if AIX or SUNOS: | |
| 111 field = "args" | |
| 112 return ps(field, pid) | |
| 113 | |
| 114 | |
| 115 def ps_rss(pid): | |
| 116 field = "rss" | |
| 117 if AIX: | |
| 118 field = "rssize" | |
| 119 return ps(field, pid) | |
| 120 | |
| 121 | |
| 122 def ps_vsz(pid): | |
| 123 field = "vsz" | |
| 124 if AIX: | |
| 125 field = "vsize" | |
| 126 return ps(field, pid) | |
| 127 | |
| 128 | |
| 129 @unittest.skipIf(not POSIX, "POSIX only") | |
| 130 class TestProcess(unittest.TestCase): | |
| 131 """Compare psutil results against 'ps' command line utility (mainly).""" | |
| 132 | |
| 133 @classmethod | |
| 134 def setUpClass(cls): | |
| 135 cls.pid = get_test_subprocess([PYTHON_EXE, "-E", "-O"], | |
| 136 stdin=subprocess.PIPE).pid | |
| 137 wait_for_pid(cls.pid) | |
| 138 | |
| 139 @classmethod | |
| 140 def tearDownClass(cls): | |
| 141 reap_children() | |
| 142 | |
| 143 def test_ppid(self): | |
| 144 ppid_ps = ps('ppid', self.pid) | |
| 145 ppid_psutil = psutil.Process(self.pid).ppid() | |
| 146 self.assertEqual(ppid_ps, ppid_psutil) | |
| 147 | |
| 148 def test_uid(self): | |
| 149 uid_ps = ps('uid', self.pid) | |
| 150 uid_psutil = psutil.Process(self.pid).uids().real | |
| 151 self.assertEqual(uid_ps, uid_psutil) | |
| 152 | |
| 153 def test_gid(self): | |
| 154 gid_ps = ps('rgid', self.pid) | |
| 155 gid_psutil = psutil.Process(self.pid).gids().real | |
| 156 self.assertEqual(gid_ps, gid_psutil) | |
| 157 | |
| 158 def test_username(self): | |
| 159 username_ps = ps('user', self.pid) | |
| 160 username_psutil = psutil.Process(self.pid).username() | |
| 161 self.assertEqual(username_ps, username_psutil) | |
| 162 | |
| 163 def test_username_no_resolution(self): | |
| 164 # Emulate a case where the system can't resolve the uid to | |
| 165 # a username in which case psutil is supposed to return | |
| 166 # the stringified uid. | |
| 167 p = psutil.Process() | |
| 168 with mock.patch("psutil.pwd.getpwuid", side_effect=KeyError) as fun: | |
| 169 self.assertEqual(p.username(), str(p.uids().real)) | |
| 170 assert fun.called | |
| 171 | |
| 172 @skip_on_access_denied() | |
| 173 @retry_on_failure() | |
| 174 def test_rss_memory(self): | |
| 175 # give python interpreter some time to properly initialize | |
| 176 # so that the results are the same | |
| 177 time.sleep(0.1) | |
| 178 rss_ps = ps_rss(self.pid) | |
| 179 rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024 | |
| 180 self.assertEqual(rss_ps, rss_psutil) | |
| 181 | |
| 182 @skip_on_access_denied() | |
| 183 @retry_on_failure() | |
| 184 def test_vsz_memory(self): | |
| 185 # give python interpreter some time to properly initialize | |
| 186 # so that the results are the same | |
| 187 time.sleep(0.1) | |
| 188 vsz_ps = ps_vsz(self.pid) | |
| 189 vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024 | |
| 190 self.assertEqual(vsz_ps, vsz_psutil) | |
| 191 | |
| 192 def test_name(self): | |
| 193 name_ps = ps_name(self.pid) | |
| 194 # remove path if there is any, from the command | |
| 195 name_ps = os.path.basename(name_ps).lower() | |
| 196 name_psutil = psutil.Process(self.pid).name().lower() | |
| 197 # ...because of how we calculate PYTHON_EXE; on MACOS this may | |
| 198 # be "pythonX.Y". | |
| 199 name_ps = re.sub(r"\d.\d", "", name_ps) | |
| 200 name_psutil = re.sub(r"\d.\d", "", name_psutil) | |
| 201 # ...may also be "python.X" | |
| 202 name_ps = re.sub(r"\d", "", name_ps) | |
| 203 name_psutil = re.sub(r"\d", "", name_psutil) | |
| 204 self.assertEqual(name_ps, name_psutil) | |
| 205 | |
| 206 def test_name_long(self): | |
| 207 # On UNIX the kernel truncates the name to the first 15 | |
| 208 # characters. In such a case psutil tries to determine the | |
| 209 # full name from the cmdline. | |
| 210 name = "long-program-name" | |
| 211 cmdline = ["long-program-name-extended", "foo", "bar"] | |
| 212 with mock.patch("psutil._psplatform.Process.name", | |
| 213 return_value=name): | |
| 214 with mock.patch("psutil._psplatform.Process.cmdline", | |
| 215 return_value=cmdline): | |
| 216 p = psutil.Process() | |
| 217 self.assertEqual(p.name(), "long-program-name-extended") | |
| 218 | |
| 219 def test_name_long_cmdline_ad_exc(self): | |
| 220 # Same as above but emulates a case where cmdline() raises | |
| 221 # AccessDenied in which case psutil is supposed to return | |
| 222 # the truncated name instead of crashing. | |
| 223 name = "long-program-name" | |
| 224 with mock.patch("psutil._psplatform.Process.name", | |
| 225 return_value=name): | |
| 226 with mock.patch("psutil._psplatform.Process.cmdline", | |
| 227 side_effect=psutil.AccessDenied(0, "")): | |
| 228 p = psutil.Process() | |
| 229 self.assertEqual(p.name(), "long-program-name") | |
| 230 | |
| 231 def test_name_long_cmdline_nsp_exc(self): | |
| 232 # Same as above but emulates a case where cmdline() raises NSP | |
| 233 # which is supposed to propagate. | |
| 234 name = "long-program-name" | |
| 235 with mock.patch("psutil._psplatform.Process.name", | |
| 236 return_value=name): | |
| 237 with mock.patch("psutil._psplatform.Process.cmdline", | |
| 238 side_effect=psutil.NoSuchProcess(0, "")): | |
| 239 p = psutil.Process() | |
| 240 self.assertRaises(psutil.NoSuchProcess, p.name) | |
| 241 | |
| 242 @unittest.skipIf(MACOS or BSD, 'ps -o start not available') | |
| 243 def test_create_time(self): | |
| 244 time_ps = ps('start', self.pid) | |
| 245 time_psutil = psutil.Process(self.pid).create_time() | |
| 246 time_psutil_tstamp = datetime.datetime.fromtimestamp( | |
| 247 time_psutil).strftime("%H:%M:%S") | |
| 248 # sometimes ps shows the time rounded up instead of down, so we check | |
| 249 # for both possible values | |
| 250 round_time_psutil = round(time_psutil) | |
| 251 round_time_psutil_tstamp = datetime.datetime.fromtimestamp( | |
| 252 round_time_psutil).strftime("%H:%M:%S") | |
| 253 self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp]) | |
| 254 | |
| 255 def test_exe(self): | |
| 256 ps_pathname = ps_name(self.pid) | |
| 257 psutil_pathname = psutil.Process(self.pid).exe() | |
| 258 try: | |
| 259 self.assertEqual(ps_pathname, psutil_pathname) | |
| 260 except AssertionError: | |
| 261 # certain platforms such as BSD are more accurate returning: | |
| 262 # "/usr/local/bin/python2.7" | |
| 263 # ...instead of: | |
| 264 # "/usr/local/bin/python" | |
| 265 # We do not want to consider this difference in accuracy | |
| 266 # an error. | |
| 267 adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] | |
| 268 self.assertEqual(ps_pathname, adjusted_ps_pathname) | |
| 269 | |
| 270 def test_cmdline(self): | |
| 271 ps_cmdline = ps_args(self.pid) | |
| 272 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline()) | |
| 273 self.assertEqual(ps_cmdline, psutil_cmdline) | |
| 274 | |
| 275 # On SUNOS "ps" reads niceness /proc/pid/psinfo which returns an | |
| 276 # incorrect value (20); the real deal is getpriority(2) which | |
| 277 # returns 0; psutil relies on it, see: | |
| 278 # https://github.com/giampaolo/psutil/issues/1082 | |
| 279 # AIX has the same issue | |
| 280 @unittest.skipIf(SUNOS, "not reliable on SUNOS") | |
| 281 @unittest.skipIf(AIX, "not reliable on AIX") | |
| 282 def test_nice(self): | |
| 283 ps_nice = ps('nice', self.pid) | |
| 284 psutil_nice = psutil.Process().nice() | |
| 285 self.assertEqual(ps_nice, psutil_nice) | |
| 286 | |
| 287 def test_num_fds(self): | |
| 288 # Note: this fails from time to time; I'm keen on thinking | |
| 289 # it doesn't mean something is broken | |
| 290 def call(p, attr): | |
| 291 args = () | |
| 292 attr = getattr(p, name, None) | |
| 293 if attr is not None and callable(attr): | |
| 294 if name == 'rlimit': | |
| 295 args = (psutil.RLIMIT_NOFILE,) | |
| 296 attr(*args) | |
| 297 else: | |
| 298 attr | |
| 299 | |
| 300 p = psutil.Process(os.getpid()) | |
| 301 failures = [] | |
| 302 ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice', | |
| 303 'send_signal', 'wait', 'children', 'as_dict', | |
| 304 'memory_info_ex', 'parent', 'parents'] | |
| 305 if LINUX and get_kernel_version() < (2, 6, 36): | |
| 306 ignored_names.append('rlimit') | |
| 307 if LINUX and get_kernel_version() < (2, 6, 23): | |
| 308 ignored_names.append('num_ctx_switches') | |
| 309 for name in dir(psutil.Process): | |
| 310 if (name.startswith('_') or name in ignored_names): | |
| 311 continue | |
| 312 else: | |
| 313 try: | |
| 314 num1 = p.num_fds() | |
| 315 for x in range(2): | |
| 316 call(p, name) | |
| 317 num2 = p.num_fds() | |
| 318 except psutil.AccessDenied: | |
| 319 pass | |
| 320 else: | |
| 321 if abs(num2 - num1) > 1: | |
| 322 fail = "failure while processing Process.%s method " \ | |
| 323 "(before=%s, after=%s)" % (name, num1, num2) | |
| 324 failures.append(fail) | |
| 325 if failures: | |
| 326 self.fail('\n' + '\n'.join(failures)) | |
| 327 | |
| 328 | |
| 329 @unittest.skipIf(not POSIX, "POSIX only") | |
| 330 class TestSystemAPIs(unittest.TestCase): | |
| 331 """Test some system APIs.""" | |
| 332 | |
| 333 @retry_on_failure() | |
| 334 def test_pids(self): | |
| 335 # Note: this test might fail if the OS is starting/killing | |
| 336 # other processes in the meantime | |
| 337 pids_ps = sorted(ps("pid")) | |
| 338 pids_psutil = psutil.pids() | |
| 339 | |
| 340 # on MACOS and OPENBSD ps doesn't show pid 0 | |
| 341 if MACOS or OPENBSD and 0 not in pids_ps: | |
| 342 pids_ps.insert(0, 0) | |
| 343 | |
| 344 # There will often be one more process in pids_ps for ps itself | |
| 345 if len(pids_ps) - len(pids_psutil) > 1: | |
| 346 difference = [x for x in pids_psutil if x not in pids_ps] + \ | |
| 347 [x for x in pids_ps if x not in pids_psutil] | |
| 348 self.fail("difference: " + str(difference)) | |
| 349 | |
| 350 # for some reason ifconfig -a does not report all interfaces | |
| 351 # returned by psutil | |
| 352 @unittest.skipIf(SUNOS, "unreliable on SUNOS") | |
| 353 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 354 @unittest.skipIf(not which('ifconfig'), "no ifconfig cmd") | |
| 355 @unittest.skipIf(not HAS_NET_IO_COUNTERS, "not supported") | |
| 356 def test_nic_names(self): | |
| 357 output = sh("ifconfig -a") | |
| 358 for nic in psutil.net_io_counters(pernic=True).keys(): | |
| 359 for line in output.split(): | |
| 360 if line.startswith(nic): | |
| 361 break | |
| 362 else: | |
| 363 self.fail( | |
| 364 "couldn't find %s nic in 'ifconfig -a' output\n%s" % ( | |
| 365 nic, output)) | |
| 366 | |
| 367 @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI") | |
| 368 @retry_on_failure() | |
| 369 def test_users(self): | |
| 370 out = sh("who") | |
| 371 lines = out.split('\n') | |
| 372 if not lines: | |
| 373 raise self.skipTest("no users on this system") | |
| 374 users = [x.split()[0] for x in lines] | |
| 375 terminals = [x.split()[1] for x in lines] | |
| 376 self.assertEqual(len(users), len(psutil.users())) | |
| 377 for u in psutil.users(): | |
| 378 self.assertIn(u.name, users) | |
| 379 self.assertIn(u.terminal, terminals) | |
| 380 | |
| 381 def test_pid_exists_let_raise(self): | |
| 382 # According to "man 2 kill" possible error values for kill | |
| 383 # are (EINVAL, EPERM, ESRCH). Test that any other errno | |
| 384 # results in an exception. | |
| 385 with mock.patch("psutil._psposix.os.kill", | |
| 386 side_effect=OSError(errno.EBADF, "")) as m: | |
| 387 self.assertRaises(OSError, psutil._psposix.pid_exists, os.getpid()) | |
| 388 assert m.called | |
| 389 | |
| 390 def test_os_waitpid_let_raise(self): | |
| 391 # os.waitpid() is supposed to catch EINTR and ECHILD only. | |
| 392 # Test that any other errno results in an exception. | |
| 393 with mock.patch("psutil._psposix.os.waitpid", | |
| 394 side_effect=OSError(errno.EBADF, "")) as m: | |
| 395 self.assertRaises(OSError, psutil._psposix.wait_pid, os.getpid()) | |
| 396 assert m.called | |
| 397 | |
| 398 def test_os_waitpid_eintr(self): | |
| 399 # os.waitpid() is supposed to "retry" on EINTR. | |
| 400 with mock.patch("psutil._psposix.os.waitpid", | |
| 401 side_effect=OSError(errno.EINTR, "")) as m: | |
| 402 self.assertRaises( | |
| 403 psutil._psposix.TimeoutExpired, | |
| 404 psutil._psposix.wait_pid, os.getpid(), timeout=0.01) | |
| 405 assert m.called | |
| 406 | |
| 407 def test_os_waitpid_bad_ret_status(self): | |
| 408 # Simulate os.waitpid() returning a bad status. | |
| 409 with mock.patch("psutil._psposix.os.waitpid", | |
| 410 return_value=(1, -1)) as m: | |
| 411 self.assertRaises(ValueError, | |
| 412 psutil._psposix.wait_pid, os.getpid()) | |
| 413 assert m.called | |
| 414 | |
| 415 # AIX can return '-' in df output instead of numbers, e.g. for /proc | |
| 416 @unittest.skipIf(AIX, "unreliable on AIX") | |
| 417 def test_disk_usage(self): | |
| 418 def df(device): | |
| 419 out = sh("df -k %s" % device).strip() | |
| 420 line = out.split('\n')[1] | |
| 421 fields = line.split() | |
| 422 total = int(fields[1]) * 1024 | |
| 423 used = int(fields[2]) * 1024 | |
| 424 free = int(fields[3]) * 1024 | |
| 425 percent = float(fields[4].replace('%', '')) | |
| 426 return (total, used, free, percent) | |
| 427 | |
| 428 tolerance = 4 * 1024 * 1024 # 4MB | |
| 429 for part in psutil.disk_partitions(all=False): | |
| 430 usage = psutil.disk_usage(part.mountpoint) | |
| 431 try: | |
| 432 total, used, free, percent = df(part.device) | |
| 433 except RuntimeError as err: | |
| 434 # see: | |
| 435 # https://travis-ci.org/giampaolo/psutil/jobs/138338464 | |
| 436 # https://travis-ci.org/giampaolo/psutil/jobs/138343361 | |
| 437 err = str(err).lower() | |
| 438 if "no such file or directory" in err or \ | |
| 439 "raw devices not supported" in err or \ | |
| 440 "permission denied" in err: | |
| 441 continue | |
| 442 else: | |
| 443 raise | |
| 444 else: | |
| 445 self.assertAlmostEqual(usage.total, total, delta=tolerance) | |
| 446 self.assertAlmostEqual(usage.used, used, delta=tolerance) | |
| 447 self.assertAlmostEqual(usage.free, free, delta=tolerance) | |
| 448 self.assertAlmostEqual(usage.percent, percent, delta=1) | |
| 449 | |
| 450 | |
| 451 if __name__ == '__main__': | |
| 452 from psutil.tests.runner import run | |
| 453 run(__file__) |
