Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_posix.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 #!/usr/bin/env python3 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
5 # Use of this source code is governed by a BSD-style license that can be | |
6 # found in the LICENSE file. | |
7 | |
8 """POSIX specific tests.""" | |
9 | |
10 import datetime | |
11 import errno | |
12 import os | |
13 import re | |
14 import subprocess | |
15 import time | |
16 | |
17 import psutil | |
18 from psutil import AIX | |
19 from psutil import BSD | |
20 from psutil import LINUX | |
21 from psutil import MACOS | |
22 from psutil import OPENBSD | |
23 from psutil import POSIX | |
24 from psutil import SUNOS | |
25 from psutil.tests import CI_TESTING | |
26 from psutil.tests import spawn_testproc | |
27 from psutil.tests import HAS_NET_IO_COUNTERS | |
28 from psutil.tests import mock | |
29 from psutil.tests import PsutilTestCase | |
30 from psutil.tests import PYTHON_EXE | |
31 from psutil.tests import retry_on_failure | |
32 from psutil.tests import sh | |
33 from psutil.tests import skip_on_access_denied | |
34 from psutil.tests import terminate | |
35 from psutil.tests import TRAVIS | |
36 from psutil.tests import unittest | |
37 from psutil.tests import which | |
38 | |
39 | |
40 def ps(fmt, pid=None): | |
41 """ | |
42 Wrapper for calling the ps command with a little bit of cross-platform | |
43 support for a narrow range of features. | |
44 """ | |
45 | |
46 cmd = ['ps'] | |
47 | |
48 if LINUX: | |
49 cmd.append('--no-headers') | |
50 | |
51 if pid is not None: | |
52 cmd.extend(['-p', str(pid)]) | |
53 else: | |
54 if SUNOS or AIX: | |
55 cmd.append('-A') | |
56 else: | |
57 cmd.append('ax') | |
58 | |
59 if SUNOS: | |
60 fmt_map = set(('command', 'comm', 'start', 'stime')) | |
61 fmt = fmt_map.get(fmt, fmt) | |
62 | |
63 cmd.extend(['-o', fmt]) | |
64 | |
65 output = sh(cmd) | |
66 | |
67 if LINUX: | |
68 output = output.splitlines() | |
69 else: | |
70 output = output.splitlines()[1:] | |
71 | |
72 all_output = [] | |
73 for line in output: | |
74 line = line.strip() | |
75 | |
76 try: | |
77 line = int(line) | |
78 except ValueError: | |
79 pass | |
80 | |
81 all_output.append(line) | |
82 | |
83 if pid is None: | |
84 return all_output | |
85 else: | |
86 return all_output[0] | |
87 | |
88 # ps "-o" field names differ wildly between platforms. | |
89 # "comm" means "only executable name" but is not available on BSD platforms. | |
90 # "args" means "command with all its arguments", and is also not available | |
91 # on BSD platforms. | |
92 # "command" is like "args" on most platforms, but like "comm" on AIX, | |
93 # and not available on SUNOS. | |
94 # so for the executable name we can use "comm" on Solaris and split "command" | |
95 # on other platforms. | |
96 # to get the cmdline (with args) we have to use "args" on AIX and | |
97 # Solaris, and can use "command" on all others. | |
98 | |
99 | |
100 def ps_name(pid): | |
101 field = "command" | |
102 if SUNOS: | |
103 field = "comm" | |
104 return ps(field, pid).split()[0] | |
105 | |
106 | |
107 def ps_args(pid): | |
108 field = "command" | |
109 if AIX or SUNOS: | |
110 field = "args" | |
111 return ps(field, pid) | |
112 | |
113 | |
114 def ps_rss(pid): | |
115 field = "rss" | |
116 if AIX: | |
117 field = "rssize" | |
118 return ps(field, pid) | |
119 | |
120 | |
121 def ps_vsz(pid): | |
122 field = "vsz" | |
123 if AIX: | |
124 field = "vsize" | |
125 return ps(field, pid) | |
126 | |
127 | |
128 @unittest.skipIf(not POSIX, "POSIX only") | |
129 class TestProcess(PsutilTestCase): | |
130 """Compare psutil results against 'ps' command line utility (mainly).""" | |
131 | |
132 @classmethod | |
133 def setUpClass(cls): | |
134 cls.pid = spawn_testproc([PYTHON_EXE, "-E", "-O"], | |
135 stdin=subprocess.PIPE).pid | |
136 | |
137 @classmethod | |
138 def tearDownClass(cls): | |
139 terminate(cls.pid) | |
140 | |
141 def test_ppid(self): | |
142 ppid_ps = ps('ppid', self.pid) | |
143 ppid_psutil = psutil.Process(self.pid).ppid() | |
144 self.assertEqual(ppid_ps, ppid_psutil) | |
145 | |
146 def test_uid(self): | |
147 uid_ps = ps('uid', self.pid) | |
148 uid_psutil = psutil.Process(self.pid).uids().real | |
149 self.assertEqual(uid_ps, uid_psutil) | |
150 | |
151 def test_gid(self): | |
152 gid_ps = ps('rgid', self.pid) | |
153 gid_psutil = psutil.Process(self.pid).gids().real | |
154 self.assertEqual(gid_ps, gid_psutil) | |
155 | |
156 def test_username(self): | |
157 username_ps = ps('user', self.pid) | |
158 username_psutil = psutil.Process(self.pid).username() | |
159 self.assertEqual(username_ps, username_psutil) | |
160 | |
161 def test_username_no_resolution(self): | |
162 # Emulate a case where the system can't resolve the uid to | |
163 # a username in which case psutil is supposed to return | |
164 # the stringified uid. | |
165 p = psutil.Process() | |
166 with mock.patch("psutil.pwd.getpwuid", side_effect=KeyError) as fun: | |
167 self.assertEqual(p.username(), str(p.uids().real)) | |
168 assert fun.called | |
169 | |
170 @skip_on_access_denied() | |
171 @retry_on_failure() | |
172 def test_rss_memory(self): | |
173 # give python interpreter some time to properly initialize | |
174 # so that the results are the same | |
175 time.sleep(0.1) | |
176 rss_ps = ps_rss(self.pid) | |
177 rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024 | |
178 self.assertEqual(rss_ps, rss_psutil) | |
179 | |
180 @skip_on_access_denied() | |
181 @retry_on_failure() | |
182 def test_vsz_memory(self): | |
183 # give python interpreter some time to properly initialize | |
184 # so that the results are the same | |
185 time.sleep(0.1) | |
186 vsz_ps = ps_vsz(self.pid) | |
187 vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024 | |
188 self.assertEqual(vsz_ps, vsz_psutil) | |
189 | |
190 def test_name(self): | |
191 name_ps = ps_name(self.pid) | |
192 # remove path if there is any, from the command | |
193 name_ps = os.path.basename(name_ps).lower() | |
194 name_psutil = psutil.Process(self.pid).name().lower() | |
195 # ...because of how we calculate PYTHON_EXE; on MACOS this may | |
196 # be "pythonX.Y". | |
197 name_ps = re.sub(r"\d.\d", "", name_ps) | |
198 name_psutil = re.sub(r"\d.\d", "", name_psutil) | |
199 # ...may also be "python.X" | |
200 name_ps = re.sub(r"\d", "", name_ps) | |
201 name_psutil = re.sub(r"\d", "", name_psutil) | |
202 self.assertEqual(name_ps, name_psutil) | |
203 | |
204 def test_name_long(self): | |
205 # On UNIX the kernel truncates the name to the first 15 | |
206 # characters. In such a case psutil tries to determine the | |
207 # full name from the cmdline. | |
208 name = "long-program-name" | |
209 cmdline = ["long-program-name-extended", "foo", "bar"] | |
210 with mock.patch("psutil._psplatform.Process.name", | |
211 return_value=name): | |
212 with mock.patch("psutil._psplatform.Process.cmdline", | |
213 return_value=cmdline): | |
214 p = psutil.Process() | |
215 self.assertEqual(p.name(), "long-program-name-extended") | |
216 | |
217 def test_name_long_cmdline_ad_exc(self): | |
218 # Same as above but emulates a case where cmdline() raises | |
219 # AccessDenied in which case psutil is supposed to return | |
220 # the truncated name instead of crashing. | |
221 name = "long-program-name" | |
222 with mock.patch("psutil._psplatform.Process.name", | |
223 return_value=name): | |
224 with mock.patch("psutil._psplatform.Process.cmdline", | |
225 side_effect=psutil.AccessDenied(0, "")): | |
226 p = psutil.Process() | |
227 self.assertEqual(p.name(), "long-program-name") | |
228 | |
229 def test_name_long_cmdline_nsp_exc(self): | |
230 # Same as above but emulates a case where cmdline() raises NSP | |
231 # which is supposed to propagate. | |
232 name = "long-program-name" | |
233 with mock.patch("psutil._psplatform.Process.name", | |
234 return_value=name): | |
235 with mock.patch("psutil._psplatform.Process.cmdline", | |
236 side_effect=psutil.NoSuchProcess(0, "")): | |
237 p = psutil.Process() | |
238 self.assertRaises(psutil.NoSuchProcess, p.name) | |
239 | |
240 @unittest.skipIf(MACOS or BSD, 'ps -o start not available') | |
241 def test_create_time(self): | |
242 time_ps = ps('start', self.pid) | |
243 time_psutil = psutil.Process(self.pid).create_time() | |
244 time_psutil_tstamp = datetime.datetime.fromtimestamp( | |
245 time_psutil).strftime("%H:%M:%S") | |
246 # sometimes ps shows the time rounded up instead of down, so we check | |
247 # for both possible values | |
248 round_time_psutil = round(time_psutil) | |
249 round_time_psutil_tstamp = datetime.datetime.fromtimestamp( | |
250 round_time_psutil).strftime("%H:%M:%S") | |
251 self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp]) | |
252 | |
253 def test_exe(self): | |
254 ps_pathname = ps_name(self.pid) | |
255 psutil_pathname = psutil.Process(self.pid).exe() | |
256 try: | |
257 self.assertEqual(ps_pathname, psutil_pathname) | |
258 except AssertionError: | |
259 # certain platforms such as BSD are more accurate returning: | |
260 # "/usr/local/bin/python2.7" | |
261 # ...instead of: | |
262 # "/usr/local/bin/python" | |
263 # We do not want to consider this difference in accuracy | |
264 # an error. | |
265 adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] | |
266 self.assertEqual(ps_pathname, adjusted_ps_pathname) | |
267 | |
268 def test_cmdline(self): | |
269 ps_cmdline = ps_args(self.pid) | |
270 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline()) | |
271 self.assertEqual(ps_cmdline, psutil_cmdline) | |
272 | |
273 # On SUNOS "ps" reads niceness /proc/pid/psinfo which returns an | |
274 # incorrect value (20); the real deal is getpriority(2) which | |
275 # returns 0; psutil relies on it, see: | |
276 # https://github.com/giampaolo/psutil/issues/1082 | |
277 # AIX has the same issue | |
278 @unittest.skipIf(SUNOS, "not reliable on SUNOS") | |
279 @unittest.skipIf(AIX, "not reliable on AIX") | |
280 def test_nice(self): | |
281 ps_nice = ps('nice', self.pid) | |
282 psutil_nice = psutil.Process().nice() | |
283 self.assertEqual(ps_nice, psutil_nice) | |
284 | |
285 | |
286 @unittest.skipIf(not POSIX, "POSIX only") | |
287 class TestSystemAPIs(PsutilTestCase): | |
288 """Test some system APIs.""" | |
289 | |
290 @retry_on_failure() | |
291 def test_pids(self): | |
292 # Note: this test might fail if the OS is starting/killing | |
293 # other processes in the meantime | |
294 pids_ps = sorted(ps("pid")) | |
295 pids_psutil = psutil.pids() | |
296 | |
297 # on MACOS and OPENBSD ps doesn't show pid 0 | |
298 if MACOS or OPENBSD and 0 not in pids_ps: | |
299 pids_ps.insert(0, 0) | |
300 | |
301 # There will often be one more process in pids_ps for ps itself | |
302 if len(pids_ps) - len(pids_psutil) > 1: | |
303 difference = [x for x in pids_psutil if x not in pids_ps] + \ | |
304 [x for x in pids_ps if x not in pids_psutil] | |
305 self.fail("difference: " + str(difference)) | |
306 | |
307 # for some reason ifconfig -a does not report all interfaces | |
308 # returned by psutil | |
309 @unittest.skipIf(SUNOS, "unreliable on SUNOS") | |
310 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
311 @unittest.skipIf(not which('ifconfig'), "no ifconfig cmd") | |
312 @unittest.skipIf(not HAS_NET_IO_COUNTERS, "not supported") | |
313 def test_nic_names(self): | |
314 output = sh("ifconfig -a") | |
315 for nic in psutil.net_io_counters(pernic=True).keys(): | |
316 for line in output.split(): | |
317 if line.startswith(nic): | |
318 break | |
319 else: | |
320 self.fail( | |
321 "couldn't find %s nic in 'ifconfig -a' output\n%s" % ( | |
322 nic, output)) | |
323 | |
324 @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI") | |
325 @retry_on_failure() | |
326 def test_users(self): | |
327 out = sh("who") | |
328 if not out.strip(): | |
329 raise self.skipTest("no users on this system") | |
330 lines = out.split('\n') | |
331 users = [x.split()[0] for x in lines] | |
332 terminals = [x.split()[1] for x in lines] | |
333 self.assertEqual(len(users), len(psutil.users())) | |
334 for u in psutil.users(): | |
335 self.assertIn(u.name, users) | |
336 self.assertIn(u.terminal, terminals) | |
337 | |
338 def test_pid_exists_let_raise(self): | |
339 # According to "man 2 kill" possible error values for kill | |
340 # are (EINVAL, EPERM, ESRCH). Test that any other errno | |
341 # results in an exception. | |
342 with mock.patch("psutil._psposix.os.kill", | |
343 side_effect=OSError(errno.EBADF, "")) as m: | |
344 self.assertRaises(OSError, psutil._psposix.pid_exists, os.getpid()) | |
345 assert m.called | |
346 | |
347 def test_os_waitpid_let_raise(self): | |
348 # os.waitpid() is supposed to catch EINTR and ECHILD only. | |
349 # Test that any other errno results in an exception. | |
350 with mock.patch("psutil._psposix.os.waitpid", | |
351 side_effect=OSError(errno.EBADF, "")) as m: | |
352 self.assertRaises(OSError, psutil._psposix.wait_pid, os.getpid()) | |
353 assert m.called | |
354 | |
355 def test_os_waitpid_eintr(self): | |
356 # os.waitpid() is supposed to "retry" on EINTR. | |
357 with mock.patch("psutil._psposix.os.waitpid", | |
358 side_effect=OSError(errno.EINTR, "")) as m: | |
359 self.assertRaises( | |
360 psutil._psposix.TimeoutExpired, | |
361 psutil._psposix.wait_pid, os.getpid(), timeout=0.01) | |
362 assert m.called | |
363 | |
364 def test_os_waitpid_bad_ret_status(self): | |
365 # Simulate os.waitpid() returning a bad status. | |
366 with mock.patch("psutil._psposix.os.waitpid", | |
367 return_value=(1, -1)) as m: | |
368 self.assertRaises(ValueError, | |
369 psutil._psposix.wait_pid, os.getpid()) | |
370 assert m.called | |
371 | |
372 # AIX can return '-' in df output instead of numbers, e.g. for /proc | |
373 @unittest.skipIf(AIX, "unreliable on AIX") | |
374 def test_disk_usage(self): | |
375 def df(device): | |
376 out = sh("df -k %s" % device).strip() | |
377 line = out.split('\n')[1] | |
378 fields = line.split() | |
379 total = int(fields[1]) * 1024 | |
380 used = int(fields[2]) * 1024 | |
381 free = int(fields[3]) * 1024 | |
382 percent = float(fields[4].replace('%', '')) | |
383 return (total, used, free, percent) | |
384 | |
385 tolerance = 4 * 1024 * 1024 # 4MB | |
386 for part in psutil.disk_partitions(all=False): | |
387 usage = psutil.disk_usage(part.mountpoint) | |
388 try: | |
389 total, used, free, percent = df(part.device) | |
390 except RuntimeError as err: | |
391 # see: | |
392 # https://travis-ci.org/giampaolo/psutil/jobs/138338464 | |
393 # https://travis-ci.org/giampaolo/psutil/jobs/138343361 | |
394 err = str(err).lower() | |
395 if "no such file or directory" in err or \ | |
396 "raw devices not supported" in err or \ | |
397 "permission denied" in err: | |
398 continue | |
399 else: | |
400 raise | |
401 else: | |
402 self.assertAlmostEqual(usage.total, total, delta=tolerance) | |
403 self.assertAlmostEqual(usage.used, used, delta=tolerance) | |
404 self.assertAlmostEqual(usage.free, free, delta=tolerance) | |
405 self.assertAlmostEqual(usage.percent, percent, delta=1) | |
406 | |
407 | |
408 if __name__ == '__main__': | |
409 from psutil.tests.runner import run_from_name | |
410 run_from_name(__file__) |