comparison planemo/lib/python3.7/site-packages/psutil/tests/test_posix.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
5 # Use of this source code is governed by a BSD-style license that can be
6 # found in the LICENSE file.
7
8 """POSIX specific tests."""
9
10 import datetime
11 import errno
12 import os
13 import re
14 import subprocess
15 import time
16
17 import psutil
18 from psutil import AIX
19 from psutil import BSD
20 from psutil import LINUX
21 from psutil import MACOS
22 from psutil import OPENBSD
23 from psutil import POSIX
24 from psutil import SUNOS
25 from psutil.tests import CI_TESTING
26 from psutil.tests import spawn_testproc
27 from psutil.tests import HAS_NET_IO_COUNTERS
28 from psutil.tests import mock
29 from psutil.tests import PsutilTestCase
30 from psutil.tests import PYTHON_EXE
31 from psutil.tests import retry_on_failure
32 from psutil.tests import sh
33 from psutil.tests import skip_on_access_denied
34 from psutil.tests import terminate
35 from psutil.tests import TRAVIS
36 from psutil.tests import unittest
37 from psutil.tests import which
38
39
40 def ps(fmt, pid=None):
41 """
42 Wrapper for calling the ps command with a little bit of cross-platform
43 support for a narrow range of features.
44 """
45
46 cmd = ['ps']
47
48 if LINUX:
49 cmd.append('--no-headers')
50
51 if pid is not None:
52 cmd.extend(['-p', str(pid)])
53 else:
54 if SUNOS or AIX:
55 cmd.append('-A')
56 else:
57 cmd.append('ax')
58
59 if SUNOS:
60 fmt_map = set(('command', 'comm', 'start', 'stime'))
61 fmt = fmt_map.get(fmt, fmt)
62
63 cmd.extend(['-o', fmt])
64
65 output = sh(cmd)
66
67 if LINUX:
68 output = output.splitlines()
69 else:
70 output = output.splitlines()[1:]
71
72 all_output = []
73 for line in output:
74 line = line.strip()
75
76 try:
77 line = int(line)
78 except ValueError:
79 pass
80
81 all_output.append(line)
82
83 if pid is None:
84 return all_output
85 else:
86 return all_output[0]
87
88 # ps "-o" field names differ wildly between platforms.
89 # "comm" means "only executable name" but is not available on BSD platforms.
90 # "args" means "command with all its arguments", and is also not available
91 # on BSD platforms.
92 # "command" is like "args" on most platforms, but like "comm" on AIX,
93 # and not available on SUNOS.
94 # so for the executable name we can use "comm" on Solaris and split "command"
95 # on other platforms.
96 # to get the cmdline (with args) we have to use "args" on AIX and
97 # Solaris, and can use "command" on all others.
98
99
100 def ps_name(pid):
101 field = "command"
102 if SUNOS:
103 field = "comm"
104 return ps(field, pid).split()[0]
105
106
107 def ps_args(pid):
108 field = "command"
109 if AIX or SUNOS:
110 field = "args"
111 return ps(field, pid)
112
113
114 def ps_rss(pid):
115 field = "rss"
116 if AIX:
117 field = "rssize"
118 return ps(field, pid)
119
120
121 def ps_vsz(pid):
122 field = "vsz"
123 if AIX:
124 field = "vsize"
125 return ps(field, pid)
126
127
128 @unittest.skipIf(not POSIX, "POSIX only")
129 class TestProcess(PsutilTestCase):
130 """Compare psutil results against 'ps' command line utility (mainly)."""
131
132 @classmethod
133 def setUpClass(cls):
134 cls.pid = spawn_testproc([PYTHON_EXE, "-E", "-O"],
135 stdin=subprocess.PIPE).pid
136
137 @classmethod
138 def tearDownClass(cls):
139 terminate(cls.pid)
140
141 def test_ppid(self):
142 ppid_ps = ps('ppid', self.pid)
143 ppid_psutil = psutil.Process(self.pid).ppid()
144 self.assertEqual(ppid_ps, ppid_psutil)
145
146 def test_uid(self):
147 uid_ps = ps('uid', self.pid)
148 uid_psutil = psutil.Process(self.pid).uids().real
149 self.assertEqual(uid_ps, uid_psutil)
150
151 def test_gid(self):
152 gid_ps = ps('rgid', self.pid)
153 gid_psutil = psutil.Process(self.pid).gids().real
154 self.assertEqual(gid_ps, gid_psutil)
155
156 def test_username(self):
157 username_ps = ps('user', self.pid)
158 username_psutil = psutil.Process(self.pid).username()
159 self.assertEqual(username_ps, username_psutil)
160
161 def test_username_no_resolution(self):
162 # Emulate a case where the system can't resolve the uid to
163 # a username in which case psutil is supposed to return
164 # the stringified uid.
165 p = psutil.Process()
166 with mock.patch("psutil.pwd.getpwuid", side_effect=KeyError) as fun:
167 self.assertEqual(p.username(), str(p.uids().real))
168 assert fun.called
169
170 @skip_on_access_denied()
171 @retry_on_failure()
172 def test_rss_memory(self):
173 # give python interpreter some time to properly initialize
174 # so that the results are the same
175 time.sleep(0.1)
176 rss_ps = ps_rss(self.pid)
177 rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
178 self.assertEqual(rss_ps, rss_psutil)
179
180 @skip_on_access_denied()
181 @retry_on_failure()
182 def test_vsz_memory(self):
183 # give python interpreter some time to properly initialize
184 # so that the results are the same
185 time.sleep(0.1)
186 vsz_ps = ps_vsz(self.pid)
187 vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
188 self.assertEqual(vsz_ps, vsz_psutil)
189
190 def test_name(self):
191 name_ps = ps_name(self.pid)
192 # remove path if there is any, from the command
193 name_ps = os.path.basename(name_ps).lower()
194 name_psutil = psutil.Process(self.pid).name().lower()
195 # ...because of how we calculate PYTHON_EXE; on MACOS this may
196 # be "pythonX.Y".
197 name_ps = re.sub(r"\d.\d", "", name_ps)
198 name_psutil = re.sub(r"\d.\d", "", name_psutil)
199 # ...may also be "python.X"
200 name_ps = re.sub(r"\d", "", name_ps)
201 name_psutil = re.sub(r"\d", "", name_psutil)
202 self.assertEqual(name_ps, name_psutil)
203
204 def test_name_long(self):
205 # On UNIX the kernel truncates the name to the first 15
206 # characters. In such a case psutil tries to determine the
207 # full name from the cmdline.
208 name = "long-program-name"
209 cmdline = ["long-program-name-extended", "foo", "bar"]
210 with mock.patch("psutil._psplatform.Process.name",
211 return_value=name):
212 with mock.patch("psutil._psplatform.Process.cmdline",
213 return_value=cmdline):
214 p = psutil.Process()
215 self.assertEqual(p.name(), "long-program-name-extended")
216
217 def test_name_long_cmdline_ad_exc(self):
218 # Same as above but emulates a case where cmdline() raises
219 # AccessDenied in which case psutil is supposed to return
220 # the truncated name instead of crashing.
221 name = "long-program-name"
222 with mock.patch("psutil._psplatform.Process.name",
223 return_value=name):
224 with mock.patch("psutil._psplatform.Process.cmdline",
225 side_effect=psutil.AccessDenied(0, "")):
226 p = psutil.Process()
227 self.assertEqual(p.name(), "long-program-name")
228
229 def test_name_long_cmdline_nsp_exc(self):
230 # Same as above but emulates a case where cmdline() raises NSP
231 # which is supposed to propagate.
232 name = "long-program-name"
233 with mock.patch("psutil._psplatform.Process.name",
234 return_value=name):
235 with mock.patch("psutil._psplatform.Process.cmdline",
236 side_effect=psutil.NoSuchProcess(0, "")):
237 p = psutil.Process()
238 self.assertRaises(psutil.NoSuchProcess, p.name)
239
240 @unittest.skipIf(MACOS or BSD, 'ps -o start not available')
241 def test_create_time(self):
242 time_ps = ps('start', self.pid)
243 time_psutil = psutil.Process(self.pid).create_time()
244 time_psutil_tstamp = datetime.datetime.fromtimestamp(
245 time_psutil).strftime("%H:%M:%S")
246 # sometimes ps shows the time rounded up instead of down, so we check
247 # for both possible values
248 round_time_psutil = round(time_psutil)
249 round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
250 round_time_psutil).strftime("%H:%M:%S")
251 self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
252
253 def test_exe(self):
254 ps_pathname = ps_name(self.pid)
255 psutil_pathname = psutil.Process(self.pid).exe()
256 try:
257 self.assertEqual(ps_pathname, psutil_pathname)
258 except AssertionError:
259 # certain platforms such as BSD are more accurate returning:
260 # "/usr/local/bin/python2.7"
261 # ...instead of:
262 # "/usr/local/bin/python"
263 # We do not want to consider this difference in accuracy
264 # an error.
265 adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
266 self.assertEqual(ps_pathname, adjusted_ps_pathname)
267
268 def test_cmdline(self):
269 ps_cmdline = ps_args(self.pid)
270 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
271 self.assertEqual(ps_cmdline, psutil_cmdline)
272
273 # On SUNOS "ps" reads niceness /proc/pid/psinfo which returns an
274 # incorrect value (20); the real deal is getpriority(2) which
275 # returns 0; psutil relies on it, see:
276 # https://github.com/giampaolo/psutil/issues/1082
277 # AIX has the same issue
278 @unittest.skipIf(SUNOS, "not reliable on SUNOS")
279 @unittest.skipIf(AIX, "not reliable on AIX")
280 def test_nice(self):
281 ps_nice = ps('nice', self.pid)
282 psutil_nice = psutil.Process().nice()
283 self.assertEqual(ps_nice, psutil_nice)
284
285
286 @unittest.skipIf(not POSIX, "POSIX only")
287 class TestSystemAPIs(PsutilTestCase):
288 """Test some system APIs."""
289
290 @retry_on_failure()
291 def test_pids(self):
292 # Note: this test might fail if the OS is starting/killing
293 # other processes in the meantime
294 pids_ps = sorted(ps("pid"))
295 pids_psutil = psutil.pids()
296
297 # on MACOS and OPENBSD ps doesn't show pid 0
298 if MACOS or OPENBSD and 0 not in pids_ps:
299 pids_ps.insert(0, 0)
300
301 # There will often be one more process in pids_ps for ps itself
302 if len(pids_ps) - len(pids_psutil) > 1:
303 difference = [x for x in pids_psutil if x not in pids_ps] + \
304 [x for x in pids_ps if x not in pids_psutil]
305 self.fail("difference: " + str(difference))
306
307 # for some reason ifconfig -a does not report all interfaces
308 # returned by psutil
309 @unittest.skipIf(SUNOS, "unreliable on SUNOS")
310 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
311 @unittest.skipIf(not which('ifconfig'), "no ifconfig cmd")
312 @unittest.skipIf(not HAS_NET_IO_COUNTERS, "not supported")
313 def test_nic_names(self):
314 output = sh("ifconfig -a")
315 for nic in psutil.net_io_counters(pernic=True).keys():
316 for line in output.split():
317 if line.startswith(nic):
318 break
319 else:
320 self.fail(
321 "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
322 nic, output))
323
324 @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI")
325 @retry_on_failure()
326 def test_users(self):
327 out = sh("who")
328 if not out.strip():
329 raise self.skipTest("no users on this system")
330 lines = out.split('\n')
331 users = [x.split()[0] for x in lines]
332 terminals = [x.split()[1] for x in lines]
333 self.assertEqual(len(users), len(psutil.users()))
334 for u in psutil.users():
335 self.assertIn(u.name, users)
336 self.assertIn(u.terminal, terminals)
337
338 def test_pid_exists_let_raise(self):
339 # According to "man 2 kill" possible error values for kill
340 # are (EINVAL, EPERM, ESRCH). Test that any other errno
341 # results in an exception.
342 with mock.patch("psutil._psposix.os.kill",
343 side_effect=OSError(errno.EBADF, "")) as m:
344 self.assertRaises(OSError, psutil._psposix.pid_exists, os.getpid())
345 assert m.called
346
347 def test_os_waitpid_let_raise(self):
348 # os.waitpid() is supposed to catch EINTR and ECHILD only.
349 # Test that any other errno results in an exception.
350 with mock.patch("psutil._psposix.os.waitpid",
351 side_effect=OSError(errno.EBADF, "")) as m:
352 self.assertRaises(OSError, psutil._psposix.wait_pid, os.getpid())
353 assert m.called
354
355 def test_os_waitpid_eintr(self):
356 # os.waitpid() is supposed to "retry" on EINTR.
357 with mock.patch("psutil._psposix.os.waitpid",
358 side_effect=OSError(errno.EINTR, "")) as m:
359 self.assertRaises(
360 psutil._psposix.TimeoutExpired,
361 psutil._psposix.wait_pid, os.getpid(), timeout=0.01)
362 assert m.called
363
364 def test_os_waitpid_bad_ret_status(self):
365 # Simulate os.waitpid() returning a bad status.
366 with mock.patch("psutil._psposix.os.waitpid",
367 return_value=(1, -1)) as m:
368 self.assertRaises(ValueError,
369 psutil._psposix.wait_pid, os.getpid())
370 assert m.called
371
372 # AIX can return '-' in df output instead of numbers, e.g. for /proc
373 @unittest.skipIf(AIX, "unreliable on AIX")
374 def test_disk_usage(self):
375 def df(device):
376 out = sh("df -k %s" % device).strip()
377 line = out.split('\n')[1]
378 fields = line.split()
379 total = int(fields[1]) * 1024
380 used = int(fields[2]) * 1024
381 free = int(fields[3]) * 1024
382 percent = float(fields[4].replace('%', ''))
383 return (total, used, free, percent)
384
385 tolerance = 4 * 1024 * 1024 # 4MB
386 for part in psutil.disk_partitions(all=False):
387 usage = psutil.disk_usage(part.mountpoint)
388 try:
389 total, used, free, percent = df(part.device)
390 except RuntimeError as err:
391 # see:
392 # https://travis-ci.org/giampaolo/psutil/jobs/138338464
393 # https://travis-ci.org/giampaolo/psutil/jobs/138343361
394 err = str(err).lower()
395 if "no such file or directory" in err or \
396 "raw devices not supported" in err or \
397 "permission denied" in err:
398 continue
399 else:
400 raise
401 else:
402 self.assertAlmostEqual(usage.total, total, delta=tolerance)
403 self.assertAlmostEqual(usage.used, used, delta=tolerance)
404 self.assertAlmostEqual(usage.free, free, delta=tolerance)
405 self.assertAlmostEqual(usage.percent, percent, delta=1)
406
407
408 if __name__ == '__main__':
409 from psutil.tests.runner import run_from_name
410 run_from_name(__file__)