comparison env/lib/python3.7/site-packages/psutil/tests/test_posix.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
5 # Use of this source code is governed by a BSD-style license that can be
6 # found in the LICENSE file.
7
8 """POSIX specific tests."""
9
10 import datetime
11 import errno
12 import os
13 import re
14 import subprocess
15 import time
16
17 import psutil
18 from psutil import AIX
19 from psutil import BSD
20 from psutil import LINUX
21 from psutil import MACOS
22 from psutil import OPENBSD
23 from psutil import POSIX
24 from psutil import SUNOS
25 from psutil.tests import CI_TESTING
26 from psutil.tests import get_kernel_version
27 from psutil.tests import get_test_subprocess
28 from psutil.tests import HAS_NET_IO_COUNTERS
29 from psutil.tests import mock
30 from psutil.tests import PYTHON_EXE
31 from psutil.tests import reap_children
32 from psutil.tests import retry_on_failure
33 from psutil.tests import sh
34 from psutil.tests import skip_on_access_denied
35 from psutil.tests import TRAVIS
36 from psutil.tests import unittest
37 from psutil.tests import wait_for_pid
38 from psutil.tests import which
39
40
41 def ps(fmt, pid=None):
42 """
43 Wrapper for calling the ps command with a little bit of cross-platform
44 support for a narrow range of features.
45 """
46
47 cmd = ['ps']
48
49 if LINUX:
50 cmd.append('--no-headers')
51
52 if pid is not None:
53 cmd.extend(['-p', str(pid)])
54 else:
55 if SUNOS or AIX:
56 cmd.append('-A')
57 else:
58 cmd.append('ax')
59
60 if SUNOS:
61 fmt_map = set(('command', 'comm', 'start', 'stime'))
62 fmt = fmt_map.get(fmt, fmt)
63
64 cmd.extend(['-o', fmt])
65
66 output = sh(cmd)
67
68 if LINUX:
69 output = output.splitlines()
70 else:
71 output = output.splitlines()[1:]
72
73 all_output = []
74 for line in output:
75 line = line.strip()
76
77 try:
78 line = int(line)
79 except ValueError:
80 pass
81
82 all_output.append(line)
83
84 if pid is None:
85 return all_output
86 else:
87 return all_output[0]
88
89 # ps "-o" field names differ wildly between platforms.
90 # "comm" means "only executable name" but is not available on BSD platforms.
91 # "args" means "command with all its arguments", and is also not available
92 # on BSD platforms.
93 # "command" is like "args" on most platforms, but like "comm" on AIX,
94 # and not available on SUNOS.
95 # so for the executable name we can use "comm" on Solaris and split "command"
96 # on other platforms.
97 # to get the cmdline (with args) we have to use "args" on AIX and
98 # Solaris, and can use "command" on all others.
99
100
101 def ps_name(pid):
102 field = "command"
103 if SUNOS:
104 field = "comm"
105 return ps(field, pid).split()[0]
106
107
108 def ps_args(pid):
109 field = "command"
110 if AIX or SUNOS:
111 field = "args"
112 return ps(field, pid)
113
114
115 def ps_rss(pid):
116 field = "rss"
117 if AIX:
118 field = "rssize"
119 return ps(field, pid)
120
121
122 def ps_vsz(pid):
123 field = "vsz"
124 if AIX:
125 field = "vsize"
126 return ps(field, pid)
127
128
129 @unittest.skipIf(not POSIX, "POSIX only")
130 class TestProcess(unittest.TestCase):
131 """Compare psutil results against 'ps' command line utility (mainly)."""
132
133 @classmethod
134 def setUpClass(cls):
135 cls.pid = get_test_subprocess([PYTHON_EXE, "-E", "-O"],
136 stdin=subprocess.PIPE).pid
137 wait_for_pid(cls.pid)
138
139 @classmethod
140 def tearDownClass(cls):
141 reap_children()
142
143 def test_ppid(self):
144 ppid_ps = ps('ppid', self.pid)
145 ppid_psutil = psutil.Process(self.pid).ppid()
146 self.assertEqual(ppid_ps, ppid_psutil)
147
148 def test_uid(self):
149 uid_ps = ps('uid', self.pid)
150 uid_psutil = psutil.Process(self.pid).uids().real
151 self.assertEqual(uid_ps, uid_psutil)
152
153 def test_gid(self):
154 gid_ps = ps('rgid', self.pid)
155 gid_psutil = psutil.Process(self.pid).gids().real
156 self.assertEqual(gid_ps, gid_psutil)
157
158 def test_username(self):
159 username_ps = ps('user', self.pid)
160 username_psutil = psutil.Process(self.pid).username()
161 self.assertEqual(username_ps, username_psutil)
162
163 def test_username_no_resolution(self):
164 # Emulate a case where the system can't resolve the uid to
165 # a username in which case psutil is supposed to return
166 # the stringified uid.
167 p = psutil.Process()
168 with mock.patch("psutil.pwd.getpwuid", side_effect=KeyError) as fun:
169 self.assertEqual(p.username(), str(p.uids().real))
170 assert fun.called
171
172 @skip_on_access_denied()
173 @retry_on_failure()
174 def test_rss_memory(self):
175 # give python interpreter some time to properly initialize
176 # so that the results are the same
177 time.sleep(0.1)
178 rss_ps = ps_rss(self.pid)
179 rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
180 self.assertEqual(rss_ps, rss_psutil)
181
182 @skip_on_access_denied()
183 @retry_on_failure()
184 def test_vsz_memory(self):
185 # give python interpreter some time to properly initialize
186 # so that the results are the same
187 time.sleep(0.1)
188 vsz_ps = ps_vsz(self.pid)
189 vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
190 self.assertEqual(vsz_ps, vsz_psutil)
191
192 def test_name(self):
193 name_ps = ps_name(self.pid)
194 # remove path if there is any, from the command
195 name_ps = os.path.basename(name_ps).lower()
196 name_psutil = psutil.Process(self.pid).name().lower()
197 # ...because of how we calculate PYTHON_EXE; on MACOS this may
198 # be "pythonX.Y".
199 name_ps = re.sub(r"\d.\d", "", name_ps)
200 name_psutil = re.sub(r"\d.\d", "", name_psutil)
201 # ...may also be "python.X"
202 name_ps = re.sub(r"\d", "", name_ps)
203 name_psutil = re.sub(r"\d", "", name_psutil)
204 self.assertEqual(name_ps, name_psutil)
205
206 def test_name_long(self):
207 # On UNIX the kernel truncates the name to the first 15
208 # characters. In such a case psutil tries to determine the
209 # full name from the cmdline.
210 name = "long-program-name"
211 cmdline = ["long-program-name-extended", "foo", "bar"]
212 with mock.patch("psutil._psplatform.Process.name",
213 return_value=name):
214 with mock.patch("psutil._psplatform.Process.cmdline",
215 return_value=cmdline):
216 p = psutil.Process()
217 self.assertEqual(p.name(), "long-program-name-extended")
218
219 def test_name_long_cmdline_ad_exc(self):
220 # Same as above but emulates a case where cmdline() raises
221 # AccessDenied in which case psutil is supposed to return
222 # the truncated name instead of crashing.
223 name = "long-program-name"
224 with mock.patch("psutil._psplatform.Process.name",
225 return_value=name):
226 with mock.patch("psutil._psplatform.Process.cmdline",
227 side_effect=psutil.AccessDenied(0, "")):
228 p = psutil.Process()
229 self.assertEqual(p.name(), "long-program-name")
230
231 def test_name_long_cmdline_nsp_exc(self):
232 # Same as above but emulates a case where cmdline() raises NSP
233 # which is supposed to propagate.
234 name = "long-program-name"
235 with mock.patch("psutil._psplatform.Process.name",
236 return_value=name):
237 with mock.patch("psutil._psplatform.Process.cmdline",
238 side_effect=psutil.NoSuchProcess(0, "")):
239 p = psutil.Process()
240 self.assertRaises(psutil.NoSuchProcess, p.name)
241
242 @unittest.skipIf(MACOS or BSD, 'ps -o start not available')
243 def test_create_time(self):
244 time_ps = ps('start', self.pid)
245 time_psutil = psutil.Process(self.pid).create_time()
246 time_psutil_tstamp = datetime.datetime.fromtimestamp(
247 time_psutil).strftime("%H:%M:%S")
248 # sometimes ps shows the time rounded up instead of down, so we check
249 # for both possible values
250 round_time_psutil = round(time_psutil)
251 round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
252 round_time_psutil).strftime("%H:%M:%S")
253 self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
254
255 def test_exe(self):
256 ps_pathname = ps_name(self.pid)
257 psutil_pathname = psutil.Process(self.pid).exe()
258 try:
259 self.assertEqual(ps_pathname, psutil_pathname)
260 except AssertionError:
261 # certain platforms such as BSD are more accurate returning:
262 # "/usr/local/bin/python2.7"
263 # ...instead of:
264 # "/usr/local/bin/python"
265 # We do not want to consider this difference in accuracy
266 # an error.
267 adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
268 self.assertEqual(ps_pathname, adjusted_ps_pathname)
269
270 def test_cmdline(self):
271 ps_cmdline = ps_args(self.pid)
272 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
273 self.assertEqual(ps_cmdline, psutil_cmdline)
274
275 # On SUNOS "ps" reads niceness /proc/pid/psinfo which returns an
276 # incorrect value (20); the real deal is getpriority(2) which
277 # returns 0; psutil relies on it, see:
278 # https://github.com/giampaolo/psutil/issues/1082
279 # AIX has the same issue
280 @unittest.skipIf(SUNOS, "not reliable on SUNOS")
281 @unittest.skipIf(AIX, "not reliable on AIX")
282 def test_nice(self):
283 ps_nice = ps('nice', self.pid)
284 psutil_nice = psutil.Process().nice()
285 self.assertEqual(ps_nice, psutil_nice)
286
287 def test_num_fds(self):
288 # Note: this fails from time to time; I'm keen on thinking
289 # it doesn't mean something is broken
290 def call(p, attr):
291 args = ()
292 attr = getattr(p, name, None)
293 if attr is not None and callable(attr):
294 if name == 'rlimit':
295 args = (psutil.RLIMIT_NOFILE,)
296 attr(*args)
297 else:
298 attr
299
300 p = psutil.Process(os.getpid())
301 failures = []
302 ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
303 'send_signal', 'wait', 'children', 'as_dict',
304 'memory_info_ex', 'parent', 'parents']
305 if LINUX and get_kernel_version() < (2, 6, 36):
306 ignored_names.append('rlimit')
307 if LINUX and get_kernel_version() < (2, 6, 23):
308 ignored_names.append('num_ctx_switches')
309 for name in dir(psutil.Process):
310 if (name.startswith('_') or name in ignored_names):
311 continue
312 else:
313 try:
314 num1 = p.num_fds()
315 for x in range(2):
316 call(p, name)
317 num2 = p.num_fds()
318 except psutil.AccessDenied:
319 pass
320 else:
321 if abs(num2 - num1) > 1:
322 fail = "failure while processing Process.%s method " \
323 "(before=%s, after=%s)" % (name, num1, num2)
324 failures.append(fail)
325 if failures:
326 self.fail('\n' + '\n'.join(failures))
327
328
329 @unittest.skipIf(not POSIX, "POSIX only")
330 class TestSystemAPIs(unittest.TestCase):
331 """Test some system APIs."""
332
333 @retry_on_failure()
334 def test_pids(self):
335 # Note: this test might fail if the OS is starting/killing
336 # other processes in the meantime
337 pids_ps = sorted(ps("pid"))
338 pids_psutil = psutil.pids()
339
340 # on MACOS and OPENBSD ps doesn't show pid 0
341 if MACOS or OPENBSD and 0 not in pids_ps:
342 pids_ps.insert(0, 0)
343
344 # There will often be one more process in pids_ps for ps itself
345 if len(pids_ps) - len(pids_psutil) > 1:
346 difference = [x for x in pids_psutil if x not in pids_ps] + \
347 [x for x in pids_ps if x not in pids_psutil]
348 self.fail("difference: " + str(difference))
349
350 # for some reason ifconfig -a does not report all interfaces
351 # returned by psutil
352 @unittest.skipIf(SUNOS, "unreliable on SUNOS")
353 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
354 @unittest.skipIf(not which('ifconfig'), "no ifconfig cmd")
355 @unittest.skipIf(not HAS_NET_IO_COUNTERS, "not supported")
356 def test_nic_names(self):
357 output = sh("ifconfig -a")
358 for nic in psutil.net_io_counters(pernic=True).keys():
359 for line in output.split():
360 if line.startswith(nic):
361 break
362 else:
363 self.fail(
364 "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
365 nic, output))
366
367 @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI")
368 @retry_on_failure()
369 def test_users(self):
370 out = sh("who")
371 lines = out.split('\n')
372 if not lines:
373 raise self.skipTest("no users on this system")
374 users = [x.split()[0] for x in lines]
375 terminals = [x.split()[1] for x in lines]
376 self.assertEqual(len(users), len(psutil.users()))
377 for u in psutil.users():
378 self.assertIn(u.name, users)
379 self.assertIn(u.terminal, terminals)
380
381 def test_pid_exists_let_raise(self):
382 # According to "man 2 kill" possible error values for kill
383 # are (EINVAL, EPERM, ESRCH). Test that any other errno
384 # results in an exception.
385 with mock.patch("psutil._psposix.os.kill",
386 side_effect=OSError(errno.EBADF, "")) as m:
387 self.assertRaises(OSError, psutil._psposix.pid_exists, os.getpid())
388 assert m.called
389
390 def test_os_waitpid_let_raise(self):
391 # os.waitpid() is supposed to catch EINTR and ECHILD only.
392 # Test that any other errno results in an exception.
393 with mock.patch("psutil._psposix.os.waitpid",
394 side_effect=OSError(errno.EBADF, "")) as m:
395 self.assertRaises(OSError, psutil._psposix.wait_pid, os.getpid())
396 assert m.called
397
398 def test_os_waitpid_eintr(self):
399 # os.waitpid() is supposed to "retry" on EINTR.
400 with mock.patch("psutil._psposix.os.waitpid",
401 side_effect=OSError(errno.EINTR, "")) as m:
402 self.assertRaises(
403 psutil._psposix.TimeoutExpired,
404 psutil._psposix.wait_pid, os.getpid(), timeout=0.01)
405 assert m.called
406
407 def test_os_waitpid_bad_ret_status(self):
408 # Simulate os.waitpid() returning a bad status.
409 with mock.patch("psutil._psposix.os.waitpid",
410 return_value=(1, -1)) as m:
411 self.assertRaises(ValueError,
412 psutil._psposix.wait_pid, os.getpid())
413 assert m.called
414
415 # AIX can return '-' in df output instead of numbers, e.g. for /proc
416 @unittest.skipIf(AIX, "unreliable on AIX")
417 def test_disk_usage(self):
418 def df(device):
419 out = sh("df -k %s" % device).strip()
420 line = out.split('\n')[1]
421 fields = line.split()
422 total = int(fields[1]) * 1024
423 used = int(fields[2]) * 1024
424 free = int(fields[3]) * 1024
425 percent = float(fields[4].replace('%', ''))
426 return (total, used, free, percent)
427
428 tolerance = 4 * 1024 * 1024 # 4MB
429 for part in psutil.disk_partitions(all=False):
430 usage = psutil.disk_usage(part.mountpoint)
431 try:
432 total, used, free, percent = df(part.device)
433 except RuntimeError as err:
434 # see:
435 # https://travis-ci.org/giampaolo/psutil/jobs/138338464
436 # https://travis-ci.org/giampaolo/psutil/jobs/138343361
437 err = str(err).lower()
438 if "no such file or directory" in err or \
439 "raw devices not supported" in err or \
440 "permission denied" in err:
441 continue
442 else:
443 raise
444 else:
445 self.assertAlmostEqual(usage.total, total, delta=tolerance)
446 self.assertAlmostEqual(usage.used, used, delta=tolerance)
447 self.assertAlmostEqual(usage.free, free, delta=tolerance)
448 self.assertAlmostEqual(usage.percent, percent, delta=1)
449
450
451 if __name__ == '__main__':
452 from psutil.tests.runner import run
453 run(__file__)