Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_linux.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
4 # Use of this source code is governed by a BSD-style license that can be | |
5 # found in the LICENSE file. | |
6 | |
7 """Linux specific tests.""" | |
8 | |
9 from __future__ import division | |
10 import collections | |
11 import contextlib | |
12 import errno | |
13 import glob | |
14 import io | |
15 import os | |
16 import re | |
17 import shutil | |
18 import socket | |
19 import struct | |
20 import textwrap | |
21 import time | |
22 import warnings | |
23 | |
24 import psutil | |
25 from psutil import LINUX | |
26 from psutil._compat import basestring | |
27 from psutil._compat import FileNotFoundError | |
28 from psutil._compat import PY3 | |
29 from psutil._compat import u | |
30 from psutil.tests import call_until | |
31 from psutil.tests import GLOBAL_TIMEOUT | |
32 from psutil.tests import HAS_BATTERY | |
33 from psutil.tests import HAS_CPU_FREQ | |
34 from psutil.tests import HAS_GETLOADAVG | |
35 from psutil.tests import HAS_RLIMIT | |
36 from psutil.tests import mock | |
37 from psutil.tests import PsutilTestCase | |
38 from psutil.tests import PYPY | |
39 from psutil.tests import reload_module | |
40 from psutil.tests import retry_on_failure | |
41 from psutil.tests import safe_rmpath | |
42 from psutil.tests import sh | |
43 from psutil.tests import skip_on_not_implemented | |
44 from psutil.tests import ThreadTask | |
45 from psutil.tests import TOLERANCE_DISK_USAGE | |
46 from psutil.tests import TOLERANCE_SYS_MEM | |
47 from psutil.tests import TRAVIS | |
48 from psutil.tests import unittest | |
49 from psutil.tests import which | |
50 | |
51 | |
52 HERE = os.path.abspath(os.path.dirname(__file__)) | |
53 SIOCGIFADDR = 0x8915 | |
54 SIOCGIFCONF = 0x8912 | |
55 SIOCGIFHWADDR = 0x8927 | |
56 if LINUX: | |
57 SECTOR_SIZE = 512 | |
58 EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*') | |
59 | |
60 # ===================================================================== | |
61 # --- utils | |
62 # ===================================================================== | |
63 | |
64 | |
65 def get_ipv4_address(ifname): | |
66 import fcntl | |
67 ifname = ifname[:15] | |
68 if PY3: | |
69 ifname = bytes(ifname, 'ascii') | |
70 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
71 with contextlib.closing(s): | |
72 return socket.inet_ntoa( | |
73 fcntl.ioctl(s.fileno(), | |
74 SIOCGIFADDR, | |
75 struct.pack('256s', ifname))[20:24]) | |
76 | |
77 | |
78 def get_mac_address(ifname): | |
79 import fcntl | |
80 ifname = ifname[:15] | |
81 if PY3: | |
82 ifname = bytes(ifname, 'ascii') | |
83 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
84 with contextlib.closing(s): | |
85 info = fcntl.ioctl( | |
86 s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname)) | |
87 if PY3: | |
88 def ord(x): | |
89 return x | |
90 else: | |
91 import __builtin__ | |
92 ord = __builtin__.ord | |
93 return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1] | |
94 | |
95 | |
96 def free_swap(): | |
97 """Parse 'free' cmd and return swap memory's s total, used and free | |
98 values. | |
99 """ | |
100 out = sh('free -b', env={"LANG": "C.UTF-8"}) | |
101 lines = out.split('\n') | |
102 for line in lines: | |
103 if line.startswith('Swap'): | |
104 _, total, used, free = line.split() | |
105 nt = collections.namedtuple('free', 'total used free') | |
106 return nt(int(total), int(used), int(free)) | |
107 raise ValueError( | |
108 "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines)) | |
109 | |
110 | |
111 def free_physmem(): | |
112 """Parse 'free' cmd and return physical memory's total, used | |
113 and free values. | |
114 """ | |
115 # Note: free can have 2 different formats, invalidating 'shared' | |
116 # and 'cached' memory which may have different positions so we | |
117 # do not return them. | |
118 # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946 | |
119 out = sh('free -b', env={"LANG": "C.UTF-8"}) | |
120 lines = out.split('\n') | |
121 for line in lines: | |
122 if line.startswith('Mem'): | |
123 total, used, free, shared = \ | |
124 [int(x) for x in line.split()[1:5]] | |
125 nt = collections.namedtuple( | |
126 'free', 'total used free shared output') | |
127 return nt(total, used, free, shared, out) | |
128 raise ValueError( | |
129 "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines)) | |
130 | |
131 | |
132 def vmstat(stat): | |
133 out = sh("vmstat -s", env={"LANG": "C.UTF-8"}) | |
134 for line in out.split("\n"): | |
135 line = line.strip() | |
136 if stat in line: | |
137 return int(line.split(' ')[0]) | |
138 raise ValueError("can't find %r in 'vmstat' output" % stat) | |
139 | |
140 | |
141 def get_free_version_info(): | |
142 out = sh("free -V").strip() | |
143 if 'UNKNOWN' in out: | |
144 raise unittest.SkipTest("can't determine free version") | |
145 return tuple(map(int, out.split()[-1].split('.'))) | |
146 | |
147 | |
148 @contextlib.contextmanager | |
149 def mock_open_content(for_path, content): | |
150 """Mock open() builtin and forces it to return a certain `content` | |
151 on read() if the path being opened matches `for_path`. | |
152 """ | |
153 def open_mock(name, *args, **kwargs): | |
154 if name == for_path: | |
155 if PY3: | |
156 if isinstance(content, basestring): | |
157 return io.StringIO(content) | |
158 else: | |
159 return io.BytesIO(content) | |
160 else: | |
161 return io.BytesIO(content) | |
162 else: | |
163 return orig_open(name, *args, **kwargs) | |
164 | |
165 orig_open = open | |
166 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
167 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
168 yield m | |
169 | |
170 | |
171 @contextlib.contextmanager | |
172 def mock_open_exception(for_path, exc): | |
173 """Mock open() builtin and raises `exc` if the path being opened | |
174 matches `for_path`. | |
175 """ | |
176 def open_mock(name, *args, **kwargs): | |
177 if name == for_path: | |
178 raise exc | |
179 else: | |
180 return orig_open(name, *args, **kwargs) | |
181 | |
182 orig_open = open | |
183 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
184 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
185 yield m | |
186 | |
187 | |
188 # ===================================================================== | |
189 # --- system virtual memory | |
190 # ===================================================================== | |
191 | |
192 | |
193 @unittest.skipIf(not LINUX, "LINUX only") | |
194 class TestSystemVirtualMemory(PsutilTestCase): | |
195 | |
196 def test_total(self): | |
197 # free_value = free_physmem().total | |
198 # psutil_value = psutil.virtual_memory().total | |
199 # self.assertEqual(free_value, psutil_value) | |
200 vmstat_value = vmstat('total memory') * 1024 | |
201 psutil_value = psutil.virtual_memory().total | |
202 self.assertAlmostEqual(vmstat_value, psutil_value) | |
203 | |
204 @retry_on_failure() | |
205 def test_used(self): | |
206 # Older versions of procps used slab memory to calculate used memory. | |
207 # This got changed in: | |
208 # https://gitlab.com/procps-ng/procps/commit/ | |
209 # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e | |
210 if get_free_version_info() < (3, 3, 12): | |
211 raise self.skipTest("old free version") | |
212 free = free_physmem() | |
213 free_value = free.used | |
214 psutil_value = psutil.virtual_memory().used | |
215 self.assertAlmostEqual( | |
216 free_value, psutil_value, delta=TOLERANCE_SYS_MEM, | |
217 msg='%s %s \n%s' % (free_value, psutil_value, free.output)) | |
218 | |
219 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
220 @retry_on_failure() | |
221 def test_free(self): | |
222 vmstat_value = vmstat('free memory') * 1024 | |
223 psutil_value = psutil.virtual_memory().free | |
224 self.assertAlmostEqual( | |
225 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
226 | |
227 @retry_on_failure() | |
228 def test_buffers(self): | |
229 vmstat_value = vmstat('buffer memory') * 1024 | |
230 psutil_value = psutil.virtual_memory().buffers | |
231 self.assertAlmostEqual( | |
232 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
233 | |
234 # https://travis-ci.org/giampaolo/psutil/jobs/226719664 | |
235 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
236 @retry_on_failure() | |
237 def test_active(self): | |
238 vmstat_value = vmstat('active memory') * 1024 | |
239 psutil_value = psutil.virtual_memory().active | |
240 self.assertAlmostEqual( | |
241 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
242 | |
243 # https://travis-ci.org/giampaolo/psutil/jobs/227242952 | |
244 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
245 @retry_on_failure() | |
246 def test_inactive(self): | |
247 vmstat_value = vmstat('inactive memory') * 1024 | |
248 psutil_value = psutil.virtual_memory().inactive | |
249 self.assertAlmostEqual( | |
250 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
251 | |
252 @retry_on_failure() | |
253 def test_shared(self): | |
254 free = free_physmem() | |
255 free_value = free.shared | |
256 if free_value == 0: | |
257 raise unittest.SkipTest("free does not support 'shared' column") | |
258 psutil_value = psutil.virtual_memory().shared | |
259 self.assertAlmostEqual( | |
260 free_value, psutil_value, delta=TOLERANCE_SYS_MEM, | |
261 msg='%s %s \n%s' % (free_value, psutil_value, free.output)) | |
262 | |
263 @retry_on_failure() | |
264 def test_available(self): | |
265 # "free" output format has changed at some point: | |
266 # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098 | |
267 out = sh("free -b") | |
268 lines = out.split('\n') | |
269 if 'available' not in lines[0]: | |
270 raise unittest.SkipTest("free does not support 'available' column") | |
271 else: | |
272 free_value = int(lines[1].split()[-1]) | |
273 psutil_value = psutil.virtual_memory().available | |
274 self.assertAlmostEqual( | |
275 free_value, psutil_value, delta=TOLERANCE_SYS_MEM, | |
276 msg='%s %s \n%s' % (free_value, psutil_value, out)) | |
277 | |
278 def test_warnings_on_misses(self): | |
279 # Emulate a case where /proc/meminfo provides few info. | |
280 # psutil is supposed to set the missing fields to 0 and | |
281 # raise a warning. | |
282 with mock_open_content( | |
283 '/proc/meminfo', | |
284 textwrap.dedent("""\ | |
285 Active(anon): 6145416 kB | |
286 Active(file): 2950064 kB | |
287 Inactive(anon): 574764 kB | |
288 Inactive(file): 1567648 kB | |
289 MemAvailable: -1 kB | |
290 MemFree: 2057400 kB | |
291 MemTotal: 16325648 kB | |
292 SReclaimable: 346648 kB | |
293 """).encode()) as m: | |
294 with warnings.catch_warnings(record=True) as ws: | |
295 warnings.simplefilter("always") | |
296 ret = psutil.virtual_memory() | |
297 assert m.called | |
298 self.assertEqual(len(ws), 1) | |
299 w = ws[0] | |
300 assert w.filename.endswith('psutil/_pslinux.py') | |
301 self.assertIn( | |
302 "memory stats couldn't be determined", str(w.message)) | |
303 self.assertIn("cached", str(w.message)) | |
304 self.assertIn("shared", str(w.message)) | |
305 self.assertIn("active", str(w.message)) | |
306 self.assertIn("inactive", str(w.message)) | |
307 self.assertIn("buffers", str(w.message)) | |
308 self.assertIn("available", str(w.message)) | |
309 self.assertEqual(ret.cached, 0) | |
310 self.assertEqual(ret.active, 0) | |
311 self.assertEqual(ret.inactive, 0) | |
312 self.assertEqual(ret.shared, 0) | |
313 self.assertEqual(ret.buffers, 0) | |
314 self.assertEqual(ret.available, 0) | |
315 self.assertEqual(ret.slab, 0) | |
316 | |
317 @retry_on_failure() | |
318 def test_avail_old_percent(self): | |
319 # Make sure that our calculation of avail mem for old kernels | |
320 # is off by max 15%. | |
321 from psutil._pslinux import calculate_avail_vmem | |
322 from psutil._pslinux import open_binary | |
323 | |
324 mems = {} | |
325 with open_binary('/proc/meminfo') as f: | |
326 for line in f: | |
327 fields = line.split() | |
328 mems[fields[0]] = int(fields[1]) * 1024 | |
329 | |
330 a = calculate_avail_vmem(mems) | |
331 if b'MemAvailable:' in mems: | |
332 b = mems[b'MemAvailable:'] | |
333 diff_percent = abs(a - b) / a * 100 | |
334 self.assertLess(diff_percent, 15) | |
335 | |
336 def test_avail_old_comes_from_kernel(self): | |
337 # Make sure "MemAvailable:" coluimn is used instead of relying | |
338 # on our internal algorithm to calculate avail mem. | |
339 with mock_open_content( | |
340 '/proc/meminfo', | |
341 textwrap.dedent("""\ | |
342 Active: 9444728 kB | |
343 Active(anon): 6145416 kB | |
344 Active(file): 2950064 kB | |
345 Buffers: 287952 kB | |
346 Cached: 4818144 kB | |
347 Inactive(file): 1578132 kB | |
348 Inactive(anon): 574764 kB | |
349 Inactive(file): 1567648 kB | |
350 MemAvailable: 6574984 kB | |
351 MemFree: 2057400 kB | |
352 MemTotal: 16325648 kB | |
353 Shmem: 577588 kB | |
354 SReclaimable: 346648 kB | |
355 """).encode()) as m: | |
356 with warnings.catch_warnings(record=True) as ws: | |
357 ret = psutil.virtual_memory() | |
358 assert m.called | |
359 self.assertEqual(ret.available, 6574984 * 1024) | |
360 w = ws[0] | |
361 self.assertIn( | |
362 "inactive memory stats couldn't be determined", str(w.message)) | |
363 | |
364 def test_avail_old_missing_fields(self): | |
365 # Remove Active(file), Inactive(file) and SReclaimable | |
366 # from /proc/meminfo and make sure the fallback is used | |
367 # (free + cached), | |
368 with mock_open_content( | |
369 "/proc/meminfo", | |
370 textwrap.dedent("""\ | |
371 Active: 9444728 kB | |
372 Active(anon): 6145416 kB | |
373 Buffers: 287952 kB | |
374 Cached: 4818144 kB | |
375 Inactive(file): 1578132 kB | |
376 Inactive(anon): 574764 kB | |
377 MemFree: 2057400 kB | |
378 MemTotal: 16325648 kB | |
379 Shmem: 577588 kB | |
380 """).encode()) as m: | |
381 with warnings.catch_warnings(record=True) as ws: | |
382 ret = psutil.virtual_memory() | |
383 assert m.called | |
384 self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024) | |
385 w = ws[0] | |
386 self.assertIn( | |
387 "inactive memory stats couldn't be determined", str(w.message)) | |
388 | |
389 def test_avail_old_missing_zoneinfo(self): | |
390 # Remove /proc/zoneinfo file. Make sure fallback is used | |
391 # (free + cached). | |
392 with mock_open_content( | |
393 "/proc/meminfo", | |
394 textwrap.dedent("""\ | |
395 Active: 9444728 kB | |
396 Active(anon): 6145416 kB | |
397 Active(file): 2950064 kB | |
398 Buffers: 287952 kB | |
399 Cached: 4818144 kB | |
400 Inactive(file): 1578132 kB | |
401 Inactive(anon): 574764 kB | |
402 Inactive(file): 1567648 kB | |
403 MemFree: 2057400 kB | |
404 MemTotal: 16325648 kB | |
405 Shmem: 577588 kB | |
406 SReclaimable: 346648 kB | |
407 """).encode()): | |
408 with mock_open_exception( | |
409 "/proc/zoneinfo", | |
410 IOError(errno.ENOENT, 'no such file or directory')): | |
411 with warnings.catch_warnings(record=True) as ws: | |
412 ret = psutil.virtual_memory() | |
413 self.assertEqual( | |
414 ret.available, 2057400 * 1024 + 4818144 * 1024) | |
415 w = ws[0] | |
416 self.assertIn( | |
417 "inactive memory stats couldn't be determined", | |
418 str(w.message)) | |
419 | |
420 def test_virtual_memory_mocked(self): | |
421 # Emulate /proc/meminfo because neither vmstat nor free return slab. | |
422 def open_mock(name, *args, **kwargs): | |
423 if name == '/proc/meminfo': | |
424 return io.BytesIO(textwrap.dedent("""\ | |
425 MemTotal: 100 kB | |
426 MemFree: 2 kB | |
427 MemAvailable: 3 kB | |
428 Buffers: 4 kB | |
429 Cached: 5 kB | |
430 SwapCached: 6 kB | |
431 Active: 7 kB | |
432 Inactive: 8 kB | |
433 Active(anon): 9 kB | |
434 Inactive(anon): 10 kB | |
435 Active(file): 11 kB | |
436 Inactive(file): 12 kB | |
437 Unevictable: 13 kB | |
438 Mlocked: 14 kB | |
439 SwapTotal: 15 kB | |
440 SwapFree: 16 kB | |
441 Dirty: 17 kB | |
442 Writeback: 18 kB | |
443 AnonPages: 19 kB | |
444 Mapped: 20 kB | |
445 Shmem: 21 kB | |
446 Slab: 22 kB | |
447 SReclaimable: 23 kB | |
448 SUnreclaim: 24 kB | |
449 KernelStack: 25 kB | |
450 PageTables: 26 kB | |
451 NFS_Unstable: 27 kB | |
452 Bounce: 28 kB | |
453 WritebackTmp: 29 kB | |
454 CommitLimit: 30 kB | |
455 Committed_AS: 31 kB | |
456 VmallocTotal: 32 kB | |
457 VmallocUsed: 33 kB | |
458 VmallocChunk: 34 kB | |
459 HardwareCorrupted: 35 kB | |
460 AnonHugePages: 36 kB | |
461 ShmemHugePages: 37 kB | |
462 ShmemPmdMapped: 38 kB | |
463 CmaTotal: 39 kB | |
464 CmaFree: 40 kB | |
465 HugePages_Total: 41 kB | |
466 HugePages_Free: 42 kB | |
467 HugePages_Rsvd: 43 kB | |
468 HugePages_Surp: 44 kB | |
469 Hugepagesize: 45 kB | |
470 DirectMap46k: 46 kB | |
471 DirectMap47M: 47 kB | |
472 DirectMap48G: 48 kB | |
473 """).encode()) | |
474 else: | |
475 return orig_open(name, *args, **kwargs) | |
476 | |
477 orig_open = open | |
478 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
479 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
480 mem = psutil.virtual_memory() | |
481 assert m.called | |
482 self.assertEqual(mem.total, 100 * 1024) | |
483 self.assertEqual(mem.free, 2 * 1024) | |
484 self.assertEqual(mem.buffers, 4 * 1024) | |
485 # cached mem also includes reclaimable memory | |
486 self.assertEqual(mem.cached, (5 + 23) * 1024) | |
487 self.assertEqual(mem.shared, 21 * 1024) | |
488 self.assertEqual(mem.active, 7 * 1024) | |
489 self.assertEqual(mem.inactive, 8 * 1024) | |
490 self.assertEqual(mem.slab, 22 * 1024) | |
491 self.assertEqual(mem.available, 3 * 1024) | |
492 | |
493 | |
494 # ===================================================================== | |
495 # --- system swap memory | |
496 # ===================================================================== | |
497 | |
498 | |
499 @unittest.skipIf(not LINUX, "LINUX only") | |
500 class TestSystemSwapMemory(PsutilTestCase): | |
501 | |
502 @staticmethod | |
503 def meminfo_has_swap_info(): | |
504 """Return True if /proc/meminfo provides swap metrics.""" | |
505 with open("/proc/meminfo") as f: | |
506 data = f.read() | |
507 return 'SwapTotal:' in data and 'SwapFree:' in data | |
508 | |
509 def test_total(self): | |
510 free_value = free_swap().total | |
511 psutil_value = psutil.swap_memory().total | |
512 return self.assertAlmostEqual( | |
513 free_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
514 | |
515 @retry_on_failure() | |
516 def test_used(self): | |
517 free_value = free_swap().used | |
518 psutil_value = psutil.swap_memory().used | |
519 return self.assertAlmostEqual( | |
520 free_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
521 | |
522 @retry_on_failure() | |
523 def test_free(self): | |
524 free_value = free_swap().free | |
525 psutil_value = psutil.swap_memory().free | |
526 return self.assertAlmostEqual( | |
527 free_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
528 | |
529 def test_missing_sin_sout(self): | |
530 with mock.patch('psutil._common.open', create=True) as m: | |
531 with warnings.catch_warnings(record=True) as ws: | |
532 warnings.simplefilter("always") | |
533 ret = psutil.swap_memory() | |
534 assert m.called | |
535 self.assertEqual(len(ws), 1) | |
536 w = ws[0] | |
537 assert w.filename.endswith('psutil/_pslinux.py') | |
538 self.assertIn( | |
539 "'sin' and 'sout' swap memory stats couldn't " | |
540 "be determined", str(w.message)) | |
541 self.assertEqual(ret.sin, 0) | |
542 self.assertEqual(ret.sout, 0) | |
543 | |
544 def test_no_vmstat_mocked(self): | |
545 # see https://github.com/giampaolo/psutil/issues/722 | |
546 with mock_open_exception( | |
547 "/proc/vmstat", | |
548 IOError(errno.ENOENT, 'no such file or directory')) as m: | |
549 with warnings.catch_warnings(record=True) as ws: | |
550 warnings.simplefilter("always") | |
551 ret = psutil.swap_memory() | |
552 assert m.called | |
553 self.assertEqual(len(ws), 1) | |
554 w = ws[0] | |
555 assert w.filename.endswith('psutil/_pslinux.py') | |
556 self.assertIn( | |
557 "'sin' and 'sout' swap memory stats couldn't " | |
558 "be determined and were set to 0", | |
559 str(w.message)) | |
560 self.assertEqual(ret.sin, 0) | |
561 self.assertEqual(ret.sout, 0) | |
562 | |
563 def test_meminfo_against_sysinfo(self): | |
564 # Make sure the content of /proc/meminfo about swap memory | |
565 # matches sysinfo() syscall, see: | |
566 # https://github.com/giampaolo/psutil/issues/1015 | |
567 if not self.meminfo_has_swap_info(): | |
568 return unittest.skip("/proc/meminfo has no swap metrics") | |
569 with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m: | |
570 swap = psutil.swap_memory() | |
571 assert not m.called | |
572 import psutil._psutil_linux as cext | |
573 _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo() | |
574 total *= unit_multiplier | |
575 free *= unit_multiplier | |
576 self.assertEqual(swap.total, total) | |
577 self.assertAlmostEqual(swap.free, free, delta=TOLERANCE_SYS_MEM) | |
578 | |
579 def test_emulate_meminfo_has_no_metrics(self): | |
580 # Emulate a case where /proc/meminfo provides no swap metrics | |
581 # in which case sysinfo() syscall is supposed to be used | |
582 # as a fallback. | |
583 with mock_open_content("/proc/meminfo", b"") as m: | |
584 psutil.swap_memory() | |
585 assert m.called | |
586 | |
587 | |
588 # ===================================================================== | |
589 # --- system CPU | |
590 # ===================================================================== | |
591 | |
592 | |
593 @unittest.skipIf(not LINUX, "LINUX only") | |
594 class TestSystemCPUTimes(PsutilTestCase): | |
595 | |
596 @unittest.skipIf(TRAVIS, "unknown failure on travis") | |
597 def test_fields(self): | |
598 fields = psutil.cpu_times()._fields | |
599 kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0] | |
600 kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) | |
601 if kernel_ver_info >= (2, 6, 11): | |
602 self.assertIn('steal', fields) | |
603 else: | |
604 self.assertNotIn('steal', fields) | |
605 if kernel_ver_info >= (2, 6, 24): | |
606 self.assertIn('guest', fields) | |
607 else: | |
608 self.assertNotIn('guest', fields) | |
609 if kernel_ver_info >= (3, 2, 0): | |
610 self.assertIn('guest_nice', fields) | |
611 else: | |
612 self.assertNotIn('guest_nice', fields) | |
613 | |
614 | |
615 @unittest.skipIf(not LINUX, "LINUX only") | |
616 class TestSystemCPUCountLogical(PsutilTestCase): | |
617 | |
618 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"), | |
619 "/sys/devices/system/cpu/online does not exist") | |
620 def test_against_sysdev_cpu_online(self): | |
621 with open("/sys/devices/system/cpu/online") as f: | |
622 value = f.read().strip() | |
623 if "-" in str(value): | |
624 value = int(value.split('-')[1]) + 1 | |
625 self.assertEqual(psutil.cpu_count(), value) | |
626 | |
627 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"), | |
628 "/sys/devices/system/cpu does not exist") | |
629 def test_against_sysdev_cpu_num(self): | |
630 ls = os.listdir("/sys/devices/system/cpu") | |
631 count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None]) | |
632 self.assertEqual(psutil.cpu_count(), count) | |
633 | |
634 @unittest.skipIf(not which("nproc"), "nproc utility not available") | |
635 def test_against_nproc(self): | |
636 num = int(sh("nproc --all")) | |
637 self.assertEqual(psutil.cpu_count(logical=True), num) | |
638 | |
639 @unittest.skipIf(not which("lscpu"), "lscpu utility not available") | |
640 def test_against_lscpu(self): | |
641 out = sh("lscpu -p") | |
642 num = len([x for x in out.split('\n') if not x.startswith('#')]) | |
643 self.assertEqual(psutil.cpu_count(logical=True), num) | |
644 | |
645 def test_emulate_fallbacks(self): | |
646 import psutil._pslinux | |
647 original = psutil._pslinux.cpu_count_logical() | |
648 # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in | |
649 # order to cause the parsing of /proc/cpuinfo and /proc/stat. | |
650 with mock.patch( | |
651 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m: | |
652 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
653 assert m.called | |
654 | |
655 # Let's have open() return emtpy data and make sure None is | |
656 # returned ('cause we mimick os.cpu_count()). | |
657 with mock.patch('psutil._common.open', create=True) as m: | |
658 self.assertIsNone(psutil._pslinux.cpu_count_logical()) | |
659 self.assertEqual(m.call_count, 2) | |
660 # /proc/stat should be the last one | |
661 self.assertEqual(m.call_args[0][0], '/proc/stat') | |
662 | |
663 # Let's push this a bit further and make sure /proc/cpuinfo | |
664 # parsing works as expected. | |
665 with open('/proc/cpuinfo', 'rb') as f: | |
666 cpuinfo_data = f.read() | |
667 fake_file = io.BytesIO(cpuinfo_data) | |
668 with mock.patch('psutil._common.open', | |
669 return_value=fake_file, create=True) as m: | |
670 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
671 | |
672 # Finally, let's make /proc/cpuinfo return meaningless data; | |
673 # this way we'll fall back on relying on /proc/stat | |
674 with mock_open_content('/proc/cpuinfo', b"") as m: | |
675 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
676 m.called | |
677 | |
678 | |
679 @unittest.skipIf(not LINUX, "LINUX only") | |
680 class TestSystemCPUCountPhysical(PsutilTestCase): | |
681 | |
682 @unittest.skipIf(not which("lscpu"), "lscpu utility not available") | |
683 def test_against_lscpu(self): | |
684 out = sh("lscpu -p") | |
685 core_ids = set() | |
686 for line in out.split('\n'): | |
687 if not line.startswith('#'): | |
688 fields = line.split(',') | |
689 core_ids.add(fields[1]) | |
690 self.assertEqual(psutil.cpu_count(logical=False), len(core_ids)) | |
691 | |
692 def test_emulate_none(self): | |
693 with mock.patch('glob.glob', return_value=[]) as m1: | |
694 with mock.patch('psutil._common.open', create=True) as m2: | |
695 self.assertIsNone(psutil._pslinux.cpu_count_physical()) | |
696 assert m1.called | |
697 assert m2.called | |
698 | |
699 | |
700 @unittest.skipIf(not LINUX, "LINUX only") | |
701 class TestSystemCPUFrequency(PsutilTestCase): | |
702 | |
703 @unittest.skipIf(TRAVIS, "fails on Travis") | |
704 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
705 def test_emulate_use_second_file(self): | |
706 # https://github.com/giampaolo/psutil/issues/981 | |
707 def path_exists_mock(path): | |
708 if path.startswith("/sys/devices/system/cpu/cpufreq/policy"): | |
709 return False | |
710 else: | |
711 return orig_exists(path) | |
712 | |
713 orig_exists = os.path.exists | |
714 with mock.patch("os.path.exists", side_effect=path_exists_mock, | |
715 create=True): | |
716 assert psutil.cpu_freq() | |
717 | |
718 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
719 def test_emulate_use_cpuinfo(self): | |
720 # Emulate a case where /sys/devices/system/cpu/cpufreq* does not | |
721 # exist and /proc/cpuinfo is used instead. | |
722 def path_exists_mock(path): | |
723 if path.startswith('/sys/devices/system/cpu/'): | |
724 return False | |
725 else: | |
726 if path == "/proc/cpuinfo": | |
727 flags.append(None) | |
728 return os_path_exists(path) | |
729 | |
730 flags = [] | |
731 os_path_exists = os.path.exists | |
732 try: | |
733 with mock.patch("os.path.exists", side_effect=path_exists_mock): | |
734 reload_module(psutil._pslinux) | |
735 ret = psutil.cpu_freq() | |
736 assert ret | |
737 assert flags | |
738 self.assertEqual(ret.max, 0.0) | |
739 self.assertEqual(ret.min, 0.0) | |
740 for freq in psutil.cpu_freq(percpu=True): | |
741 self.assertEqual(ret.max, 0.0) | |
742 self.assertEqual(ret.min, 0.0) | |
743 finally: | |
744 reload_module(psutil._pslinux) | |
745 reload_module(psutil) | |
746 | |
747 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
748 def test_emulate_data(self): | |
749 def open_mock(name, *args, **kwargs): | |
750 if (name.endswith('/scaling_cur_freq') and | |
751 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
752 return io.BytesIO(b"500000") | |
753 elif (name.endswith('/scaling_min_freq') and | |
754 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
755 return io.BytesIO(b"600000") | |
756 elif (name.endswith('/scaling_max_freq') and | |
757 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
758 return io.BytesIO(b"700000") | |
759 elif name == '/proc/cpuinfo': | |
760 return io.BytesIO(b"cpu MHz : 500") | |
761 else: | |
762 return orig_open(name, *args, **kwargs) | |
763 | |
764 orig_open = open | |
765 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
766 with mock.patch(patch_point, side_effect=open_mock): | |
767 with mock.patch( | |
768 'os.path.exists', return_value=True): | |
769 freq = psutil.cpu_freq() | |
770 self.assertEqual(freq.current, 500.0) | |
771 # when /proc/cpuinfo is used min and max frequencies are not | |
772 # available and are set to 0. | |
773 if freq.min != 0.0: | |
774 self.assertEqual(freq.min, 600.0) | |
775 if freq.max != 0.0: | |
776 self.assertEqual(freq.max, 700.0) | |
777 | |
778 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
779 def test_emulate_multi_cpu(self): | |
780 def open_mock(name, *args, **kwargs): | |
781 n = name | |
782 if (n.endswith('/scaling_cur_freq') and | |
783 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
784 return io.BytesIO(b"100000") | |
785 elif (n.endswith('/scaling_min_freq') and | |
786 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
787 return io.BytesIO(b"200000") | |
788 elif (n.endswith('/scaling_max_freq') and | |
789 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
790 return io.BytesIO(b"300000") | |
791 elif (n.endswith('/scaling_cur_freq') and | |
792 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
793 return io.BytesIO(b"400000") | |
794 elif (n.endswith('/scaling_min_freq') and | |
795 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
796 return io.BytesIO(b"500000") | |
797 elif (n.endswith('/scaling_max_freq') and | |
798 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
799 return io.BytesIO(b"600000") | |
800 elif name == '/proc/cpuinfo': | |
801 return io.BytesIO(b"cpu MHz : 100\n" | |
802 b"cpu MHz : 400") | |
803 else: | |
804 return orig_open(name, *args, **kwargs) | |
805 | |
806 orig_open = open | |
807 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
808 with mock.patch(patch_point, side_effect=open_mock): | |
809 with mock.patch('os.path.exists', return_value=True): | |
810 with mock.patch('psutil._pslinux.cpu_count_logical', | |
811 return_value=2): | |
812 freq = psutil.cpu_freq(percpu=True) | |
813 self.assertEqual(freq[0].current, 100.0) | |
814 if freq[0].min != 0.0: | |
815 self.assertEqual(freq[0].min, 200.0) | |
816 if freq[0].max != 0.0: | |
817 self.assertEqual(freq[0].max, 300.0) | |
818 self.assertEqual(freq[1].current, 400.0) | |
819 if freq[1].min != 0.0: | |
820 self.assertEqual(freq[1].min, 500.0) | |
821 if freq[1].max != 0.0: | |
822 self.assertEqual(freq[1].max, 600.0) | |
823 | |
824 @unittest.skipIf(TRAVIS, "fails on Travis") | |
825 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
826 def test_emulate_no_scaling_cur_freq_file(self): | |
827 # See: https://github.com/giampaolo/psutil/issues/1071 | |
828 def open_mock(name, *args, **kwargs): | |
829 if name.endswith('/scaling_cur_freq'): | |
830 raise IOError(errno.ENOENT, "") | |
831 elif name.endswith('/cpuinfo_cur_freq'): | |
832 return io.BytesIO(b"200000") | |
833 elif name == '/proc/cpuinfo': | |
834 return io.BytesIO(b"cpu MHz : 200") | |
835 else: | |
836 return orig_open(name, *args, **kwargs) | |
837 | |
838 orig_open = open | |
839 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
840 with mock.patch(patch_point, side_effect=open_mock): | |
841 with mock.patch('os.path.exists', return_value=True): | |
842 with mock.patch('psutil._pslinux.cpu_count_logical', | |
843 return_value=1): | |
844 freq = psutil.cpu_freq() | |
845 self.assertEqual(freq.current, 200) | |
846 | |
847 | |
848 @unittest.skipIf(not LINUX, "LINUX only") | |
849 class TestSystemCPUStats(PsutilTestCase): | |
850 | |
851 @unittest.skipIf(TRAVIS, "fails on Travis") | |
852 def test_ctx_switches(self): | |
853 vmstat_value = vmstat("context switches") | |
854 psutil_value = psutil.cpu_stats().ctx_switches | |
855 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500) | |
856 | |
857 @unittest.skipIf(TRAVIS, "fails on Travis") | |
858 def test_interrupts(self): | |
859 vmstat_value = vmstat("interrupts") | |
860 psutil_value = psutil.cpu_stats().interrupts | |
861 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500) | |
862 | |
863 | |
864 @unittest.skipIf(not LINUX, "LINUX only") | |
865 class TestLoadAvg(PsutilTestCase): | |
866 | |
867 @unittest.skipIf(not HAS_GETLOADAVG, "not supported") | |
868 def test_getloadavg(self): | |
869 psutil_value = psutil.getloadavg() | |
870 with open("/proc/loadavg", "r") as f: | |
871 proc_value = f.read().split() | |
872 | |
873 self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1) | |
874 self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1) | |
875 self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1) | |
876 | |
877 | |
878 # ===================================================================== | |
879 # --- system network | |
880 # ===================================================================== | |
881 | |
882 | |
883 @unittest.skipIf(not LINUX, "LINUX only") | |
884 class TestSystemNetIfAddrs(PsutilTestCase): | |
885 | |
886 def test_ips(self): | |
887 for name, addrs in psutil.net_if_addrs().items(): | |
888 for addr in addrs: | |
889 if addr.family == psutil.AF_LINK: | |
890 self.assertEqual(addr.address, get_mac_address(name)) | |
891 elif addr.family == socket.AF_INET: | |
892 self.assertEqual(addr.address, get_ipv4_address(name)) | |
893 # TODO: test for AF_INET6 family | |
894 | |
895 # XXX - not reliable when having virtual NICs installed by Docker. | |
896 # @unittest.skipIf(not which('ip'), "'ip' utility not available") | |
897 # @unittest.skipIf(TRAVIS, "skipped on Travis") | |
898 # def test_net_if_names(self): | |
899 # out = sh("ip addr").strip() | |
900 # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x] | |
901 # found = 0 | |
902 # for line in out.split('\n'): | |
903 # line = line.strip() | |
904 # if re.search(r"^\d+:", line): | |
905 # found += 1 | |
906 # name = line.split(':')[1].strip() | |
907 # self.assertIn(name, nics) | |
908 # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( | |
909 # pprint.pformat(nics), out)) | |
910 | |
911 | |
912 @unittest.skipIf(not LINUX, "LINUX only") | |
913 class TestSystemNetIfStats(PsutilTestCase): | |
914 | |
915 def test_against_ifconfig(self): | |
916 for name, stats in psutil.net_if_stats().items(): | |
917 try: | |
918 out = sh("ifconfig %s" % name) | |
919 except RuntimeError: | |
920 pass | |
921 else: | |
922 # Not always reliable. | |
923 # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) | |
924 self.assertEqual(stats.mtu, | |
925 int(re.findall(r'(?i)MTU[: ](\d+)', out)[0])) | |
926 | |
927 | |
928 @unittest.skipIf(not LINUX, "LINUX only") | |
929 class TestSystemNetIOCounters(PsutilTestCase): | |
930 | |
931 @retry_on_failure() | |
932 def test_against_ifconfig(self): | |
933 def ifconfig(nic): | |
934 ret = {} | |
935 out = sh("ifconfig %s" % name) | |
936 ret['packets_recv'] = int( | |
937 re.findall(r'RX packets[: ](\d+)', out)[0]) | |
938 ret['packets_sent'] = int( | |
939 re.findall(r'TX packets[: ](\d+)', out)[0]) | |
940 ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0]) | |
941 ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1]) | |
942 ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0]) | |
943 ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1]) | |
944 ret['bytes_recv'] = int( | |
945 re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) | |
946 ret['bytes_sent'] = int( | |
947 re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) | |
948 return ret | |
949 | |
950 nio = psutil.net_io_counters(pernic=True, nowrap=False) | |
951 for name, stats in nio.items(): | |
952 try: | |
953 ifconfig_ret = ifconfig(name) | |
954 except RuntimeError: | |
955 continue | |
956 self.assertAlmostEqual( | |
957 stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5) | |
958 self.assertAlmostEqual( | |
959 stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5) | |
960 self.assertAlmostEqual( | |
961 stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024) | |
962 self.assertAlmostEqual( | |
963 stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024) | |
964 self.assertAlmostEqual( | |
965 stats.errin, ifconfig_ret['errin'], delta=10) | |
966 self.assertAlmostEqual( | |
967 stats.errout, ifconfig_ret['errout'], delta=10) | |
968 self.assertAlmostEqual( | |
969 stats.dropin, ifconfig_ret['dropin'], delta=10) | |
970 self.assertAlmostEqual( | |
971 stats.dropout, ifconfig_ret['dropout'], delta=10) | |
972 | |
973 | |
974 @unittest.skipIf(not LINUX, "LINUX only") | |
975 class TestSystemNetConnections(PsutilTestCase): | |
976 | |
977 @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError) | |
978 @mock.patch('psutil._pslinux.supports_ipv6', return_value=False) | |
979 def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop): | |
980 # see: https://github.com/giampaolo/psutil/issues/623 | |
981 try: | |
982 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
983 self.addCleanup(s.close) | |
984 s.bind(("::1", 0)) | |
985 except socket.error: | |
986 pass | |
987 psutil.net_connections(kind='inet6') | |
988 | |
989 def test_emulate_unix(self): | |
990 with mock_open_content( | |
991 '/proc/net/unix', | |
992 textwrap.dedent("""\ | |
993 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n | |
994 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ | |
995 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O | |
996 000000000000000000000000000000000000000000000000000000 | |
997 """)) as m: | |
998 psutil.net_connections(kind='unix') | |
999 assert m.called | |
1000 | |
1001 | |
1002 # ===================================================================== | |
1003 # --- system disks | |
1004 # ===================================================================== | |
1005 | |
1006 | |
1007 @unittest.skipIf(not LINUX, "LINUX only") | |
1008 class TestSystemDiskPartitions(PsutilTestCase): | |
1009 | |
1010 @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available") | |
1011 @skip_on_not_implemented() | |
1012 def test_against_df(self): | |
1013 # test psutil.disk_usage() and psutil.disk_partitions() | |
1014 # against "df -a" | |
1015 def df(path): | |
1016 out = sh('df -P -B 1 "%s"' % path).strip() | |
1017 lines = out.split('\n') | |
1018 lines.pop(0) | |
1019 line = lines.pop(0) | |
1020 dev, total, used, free = line.split()[:4] | |
1021 if dev == 'none': | |
1022 dev = '' | |
1023 total, used, free = int(total), int(used), int(free) | |
1024 return dev, total, used, free | |
1025 | |
1026 for part in psutil.disk_partitions(all=False): | |
1027 usage = psutil.disk_usage(part.mountpoint) | |
1028 dev, total, used, free = df(part.mountpoint) | |
1029 self.assertEqual(usage.total, total) | |
1030 self.assertAlmostEqual(usage.free, free, | |
1031 delta=TOLERANCE_DISK_USAGE) | |
1032 self.assertAlmostEqual(usage.used, used, | |
1033 delta=TOLERANCE_DISK_USAGE) | |
1034 | |
1035 def test_zfs_fs(self): | |
1036 # Test that ZFS partitions are returned. | |
1037 with open("/proc/filesystems", "r") as f: | |
1038 data = f.read() | |
1039 if 'zfs' in data: | |
1040 for part in psutil.disk_partitions(): | |
1041 if part.fstype == 'zfs': | |
1042 break | |
1043 else: | |
1044 self.fail("couldn't find any ZFS partition") | |
1045 else: | |
1046 # No ZFS partitions on this system. Let's fake one. | |
1047 fake_file = io.StringIO(u("nodev\tzfs\n")) | |
1048 with mock.patch('psutil._common.open', | |
1049 return_value=fake_file, create=True) as m1: | |
1050 with mock.patch( | |
1051 'psutil._pslinux.cext.disk_partitions', | |
1052 return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2: | |
1053 ret = psutil.disk_partitions() | |
1054 assert m1.called | |
1055 assert m2.called | |
1056 assert ret | |
1057 self.assertEqual(ret[0].fstype, 'zfs') | |
1058 | |
1059 def test_emulate_realpath_fail(self): | |
1060 # See: https://github.com/giampaolo/psutil/issues/1307 | |
1061 try: | |
1062 with mock.patch('os.path.realpath', | |
1063 return_value='/non/existent') as m: | |
1064 with self.assertRaises(FileNotFoundError): | |
1065 psutil.disk_partitions() | |
1066 assert m.called | |
1067 finally: | |
1068 psutil.PROCFS_PATH = "/proc" | |
1069 | |
1070 | |
1071 @unittest.skipIf(not LINUX, "LINUX only") | |
1072 class TestSystemDiskIoCounters(PsutilTestCase): | |
1073 | |
1074 def test_emulate_kernel_2_4(self): | |
1075 # Tests /proc/diskstats parsing format for 2.4 kernels, see: | |
1076 # https://github.com/giampaolo/psutil/issues/767 | |
1077 with mock_open_content( | |
1078 '/proc/diskstats', | |
1079 " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"): | |
1080 with mock.patch('psutil._pslinux.is_storage_device', | |
1081 return_value=True): | |
1082 ret = psutil.disk_io_counters(nowrap=False) | |
1083 self.assertEqual(ret.read_count, 1) | |
1084 self.assertEqual(ret.read_merged_count, 2) | |
1085 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) | |
1086 self.assertEqual(ret.read_time, 4) | |
1087 self.assertEqual(ret.write_count, 5) | |
1088 self.assertEqual(ret.write_merged_count, 6) | |
1089 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) | |
1090 self.assertEqual(ret.write_time, 8) | |
1091 self.assertEqual(ret.busy_time, 10) | |
1092 | |
1093 def test_emulate_kernel_2_6_full(self): | |
1094 # Tests /proc/diskstats parsing format for 2.6 kernels, | |
1095 # lines reporting all metrics: | |
1096 # https://github.com/giampaolo/psutil/issues/767 | |
1097 with mock_open_content( | |
1098 '/proc/diskstats', | |
1099 " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"): | |
1100 with mock.patch('psutil._pslinux.is_storage_device', | |
1101 return_value=True): | |
1102 ret = psutil.disk_io_counters(nowrap=False) | |
1103 self.assertEqual(ret.read_count, 1) | |
1104 self.assertEqual(ret.read_merged_count, 2) | |
1105 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) | |
1106 self.assertEqual(ret.read_time, 4) | |
1107 self.assertEqual(ret.write_count, 5) | |
1108 self.assertEqual(ret.write_merged_count, 6) | |
1109 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) | |
1110 self.assertEqual(ret.write_time, 8) | |
1111 self.assertEqual(ret.busy_time, 10) | |
1112 | |
1113 def test_emulate_kernel_2_6_limited(self): | |
1114 # Tests /proc/diskstats parsing format for 2.6 kernels, | |
1115 # where one line of /proc/partitions return a limited | |
1116 # amount of metrics when it bumps into a partition | |
1117 # (instead of a disk). See: | |
1118 # https://github.com/giampaolo/psutil/issues/767 | |
1119 with mock_open_content( | |
1120 '/proc/diskstats', | |
1121 " 3 1 hda 1 2 3 4"): | |
1122 with mock.patch('psutil._pslinux.is_storage_device', | |
1123 return_value=True): | |
1124 ret = psutil.disk_io_counters(nowrap=False) | |
1125 self.assertEqual(ret.read_count, 1) | |
1126 self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE) | |
1127 self.assertEqual(ret.write_count, 3) | |
1128 self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE) | |
1129 | |
1130 self.assertEqual(ret.read_merged_count, 0) | |
1131 self.assertEqual(ret.read_time, 0) | |
1132 self.assertEqual(ret.write_merged_count, 0) | |
1133 self.assertEqual(ret.write_time, 0) | |
1134 self.assertEqual(ret.busy_time, 0) | |
1135 | |
1136 def test_emulate_include_partitions(self): | |
1137 # Make sure that when perdisk=True disk partitions are returned, | |
1138 # see: | |
1139 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842 | |
1140 with mock_open_content( | |
1141 '/proc/diskstats', | |
1142 textwrap.dedent("""\ | |
1143 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
1144 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
1145 """)): | |
1146 with mock.patch('psutil._pslinux.is_storage_device', | |
1147 return_value=False): | |
1148 ret = psutil.disk_io_counters(perdisk=True, nowrap=False) | |
1149 self.assertEqual(len(ret), 2) | |
1150 self.assertEqual(ret['nvme0n1'].read_count, 1) | |
1151 self.assertEqual(ret['nvme0n1p1'].read_count, 1) | |
1152 self.assertEqual(ret['nvme0n1'].write_count, 5) | |
1153 self.assertEqual(ret['nvme0n1p1'].write_count, 5) | |
1154 | |
1155 def test_emulate_exclude_partitions(self): | |
1156 # Make sure that when perdisk=False partitions (e.g. 'sda1', | |
1157 # 'nvme0n1p1') are skipped and not included in the total count. | |
1158 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842 | |
1159 with mock_open_content( | |
1160 '/proc/diskstats', | |
1161 textwrap.dedent("""\ | |
1162 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
1163 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
1164 """)): | |
1165 with mock.patch('psutil._pslinux.is_storage_device', | |
1166 return_value=False): | |
1167 ret = psutil.disk_io_counters(perdisk=False, nowrap=False) | |
1168 self.assertIsNone(ret) | |
1169 | |
1170 # | |
1171 def is_storage_device(name): | |
1172 return name == 'nvme0n1' | |
1173 | |
1174 with mock_open_content( | |
1175 '/proc/diskstats', | |
1176 textwrap.dedent("""\ | |
1177 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
1178 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
1179 """)): | |
1180 with mock.patch('psutil._pslinux.is_storage_device', | |
1181 create=True, side_effect=is_storage_device): | |
1182 ret = psutil.disk_io_counters(perdisk=False, nowrap=False) | |
1183 self.assertEqual(ret.read_count, 1) | |
1184 self.assertEqual(ret.write_count, 5) | |
1185 | |
1186 def test_emulate_use_sysfs(self): | |
1187 def exists(path): | |
1188 if path == '/proc/diskstats': | |
1189 return False | |
1190 return True | |
1191 | |
1192 wprocfs = psutil.disk_io_counters(perdisk=True) | |
1193 with mock.patch('psutil._pslinux.os.path.exists', | |
1194 create=True, side_effect=exists): | |
1195 wsysfs = psutil.disk_io_counters(perdisk=True) | |
1196 self.assertEqual(len(wprocfs), len(wsysfs)) | |
1197 | |
1198 def test_emulate_not_impl(self): | |
1199 def exists(path): | |
1200 return False | |
1201 | |
1202 with mock.patch('psutil._pslinux.os.path.exists', | |
1203 create=True, side_effect=exists): | |
1204 self.assertRaises(NotImplementedError, psutil.disk_io_counters) | |
1205 | |
1206 | |
1207 # ===================================================================== | |
1208 # --- misc | |
1209 # ===================================================================== | |
1210 | |
1211 | |
1212 @unittest.skipIf(not LINUX, "LINUX only") | |
1213 class TestMisc(PsutilTestCase): | |
1214 | |
1215 def test_boot_time(self): | |
1216 vmstat_value = vmstat('boot time') | |
1217 psutil_value = psutil.boot_time() | |
1218 self.assertEqual(int(vmstat_value), int(psutil_value)) | |
1219 | |
1220 def test_no_procfs_on_import(self): | |
1221 my_procfs = self.get_testfn() | |
1222 os.mkdir(my_procfs) | |
1223 | |
1224 with open(os.path.join(my_procfs, 'stat'), 'w') as f: | |
1225 f.write('cpu 0 0 0 0 0 0 0 0 0 0\n') | |
1226 f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n') | |
1227 f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n') | |
1228 | |
1229 try: | |
1230 orig_open = open | |
1231 | |
1232 def open_mock(name, *args, **kwargs): | |
1233 if name.startswith('/proc'): | |
1234 raise IOError(errno.ENOENT, 'rejecting access for test') | |
1235 return orig_open(name, *args, **kwargs) | |
1236 | |
1237 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1238 with mock.patch(patch_point, side_effect=open_mock): | |
1239 reload_module(psutil) | |
1240 | |
1241 self.assertRaises(IOError, psutil.cpu_times) | |
1242 self.assertRaises(IOError, psutil.cpu_times, percpu=True) | |
1243 self.assertRaises(IOError, psutil.cpu_percent) | |
1244 self.assertRaises(IOError, psutil.cpu_percent, percpu=True) | |
1245 self.assertRaises(IOError, psutil.cpu_times_percent) | |
1246 self.assertRaises( | |
1247 IOError, psutil.cpu_times_percent, percpu=True) | |
1248 | |
1249 psutil.PROCFS_PATH = my_procfs | |
1250 | |
1251 self.assertEqual(psutil.cpu_percent(), 0) | |
1252 self.assertEqual(sum(psutil.cpu_times_percent()), 0) | |
1253 | |
1254 # since we don't know the number of CPUs at import time, | |
1255 # we awkwardly say there are none until the second call | |
1256 per_cpu_percent = psutil.cpu_percent(percpu=True) | |
1257 self.assertEqual(sum(per_cpu_percent), 0) | |
1258 | |
1259 # ditto awkward length | |
1260 per_cpu_times_percent = psutil.cpu_times_percent(percpu=True) | |
1261 self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0) | |
1262 | |
1263 # much user, very busy | |
1264 with open(os.path.join(my_procfs, 'stat'), 'w') as f: | |
1265 f.write('cpu 1 0 0 0 0 0 0 0 0 0\n') | |
1266 f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n') | |
1267 f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n') | |
1268 | |
1269 self.assertNotEqual(psutil.cpu_percent(), 0) | |
1270 self.assertNotEqual( | |
1271 sum(psutil.cpu_percent(percpu=True)), 0) | |
1272 self.assertNotEqual(sum(psutil.cpu_times_percent()), 0) | |
1273 self.assertNotEqual( | |
1274 sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0) | |
1275 finally: | |
1276 shutil.rmtree(my_procfs) | |
1277 reload_module(psutil) | |
1278 | |
1279 self.assertEqual(psutil.PROCFS_PATH, '/proc') | |
1280 | |
1281 def test_cpu_steal_decrease(self): | |
1282 # Test cumulative cpu stats decrease. We should ignore this. | |
1283 # See issue #1210. | |
1284 with mock_open_content( | |
1285 "/proc/stat", | |
1286 textwrap.dedent("""\ | |
1287 cpu 0 0 0 0 0 0 0 1 0 0 | |
1288 cpu0 0 0 0 0 0 0 0 1 0 0 | |
1289 cpu1 0 0 0 0 0 0 0 1 0 0 | |
1290 """).encode()) as m: | |
1291 # first call to "percent" functions should read the new stat file | |
1292 # and compare to the "real" file read at import time - so the | |
1293 # values are meaningless | |
1294 psutil.cpu_percent() | |
1295 assert m.called | |
1296 psutil.cpu_percent(percpu=True) | |
1297 psutil.cpu_times_percent() | |
1298 psutil.cpu_times_percent(percpu=True) | |
1299 | |
1300 with mock_open_content( | |
1301 "/proc/stat", | |
1302 textwrap.dedent("""\ | |
1303 cpu 1 0 0 0 0 0 0 0 0 0 | |
1304 cpu0 1 0 0 0 0 0 0 0 0 0 | |
1305 cpu1 1 0 0 0 0 0 0 0 0 0 | |
1306 """).encode()) as m: | |
1307 # Increase "user" while steal goes "backwards" to zero. | |
1308 cpu_percent = psutil.cpu_percent() | |
1309 assert m.called | |
1310 cpu_percent_percpu = psutil.cpu_percent(percpu=True) | |
1311 cpu_times_percent = psutil.cpu_times_percent() | |
1312 cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True) | |
1313 self.assertNotEqual(cpu_percent, 0) | |
1314 self.assertNotEqual(sum(cpu_percent_percpu), 0) | |
1315 self.assertNotEqual(sum(cpu_times_percent), 0) | |
1316 self.assertNotEqual(sum(cpu_times_percent), 100.0) | |
1317 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0) | |
1318 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0) | |
1319 self.assertEqual(cpu_times_percent.steal, 0) | |
1320 self.assertNotEqual(cpu_times_percent.user, 0) | |
1321 | |
1322 def test_boot_time_mocked(self): | |
1323 with mock.patch('psutil._common.open', create=True) as m: | |
1324 self.assertRaises( | |
1325 RuntimeError, | |
1326 psutil._pslinux.boot_time) | |
1327 assert m.called | |
1328 | |
1329 def test_users_mocked(self): | |
1330 # Make sure ':0' and ':0.0' (returned by C ext) are converted | |
1331 # to 'localhost'. | |
1332 with mock.patch('psutil._pslinux.cext.users', | |
1333 return_value=[('giampaolo', 'pts/2', ':0', | |
1334 1436573184.0, True, 2)]) as m: | |
1335 self.assertEqual(psutil.users()[0].host, 'localhost') | |
1336 assert m.called | |
1337 with mock.patch('psutil._pslinux.cext.users', | |
1338 return_value=[('giampaolo', 'pts/2', ':0.0', | |
1339 1436573184.0, True, 2)]) as m: | |
1340 self.assertEqual(psutil.users()[0].host, 'localhost') | |
1341 assert m.called | |
1342 # ...otherwise it should be returned as-is | |
1343 with mock.patch('psutil._pslinux.cext.users', | |
1344 return_value=[('giampaolo', 'pts/2', 'foo', | |
1345 1436573184.0, True, 2)]) as m: | |
1346 self.assertEqual(psutil.users()[0].host, 'foo') | |
1347 assert m.called | |
1348 | |
1349 def test_procfs_path(self): | |
1350 tdir = self.get_testfn() | |
1351 os.mkdir(tdir) | |
1352 try: | |
1353 psutil.PROCFS_PATH = tdir | |
1354 self.assertRaises(IOError, psutil.virtual_memory) | |
1355 self.assertRaises(IOError, psutil.cpu_times) | |
1356 self.assertRaises(IOError, psutil.cpu_times, percpu=True) | |
1357 self.assertRaises(IOError, psutil.boot_time) | |
1358 # self.assertRaises(IOError, psutil.pids) | |
1359 self.assertRaises(IOError, psutil.net_connections) | |
1360 self.assertRaises(IOError, psutil.net_io_counters) | |
1361 self.assertRaises(IOError, psutil.net_if_stats) | |
1362 # self.assertRaises(IOError, psutil.disk_io_counters) | |
1363 self.assertRaises(IOError, psutil.disk_partitions) | |
1364 self.assertRaises(psutil.NoSuchProcess, psutil.Process) | |
1365 finally: | |
1366 psutil.PROCFS_PATH = "/proc" | |
1367 | |
1368 @retry_on_failure() | |
1369 def test_issue_687(self): | |
1370 # In case of thread ID: | |
1371 # - pid_exists() is supposed to return False | |
1372 # - Process(tid) is supposed to work | |
1373 # - pids() should not return the TID | |
1374 # See: https://github.com/giampaolo/psutil/issues/687 | |
1375 t = ThreadTask() | |
1376 t.start() | |
1377 try: | |
1378 p = psutil.Process() | |
1379 threads = p.threads() | |
1380 self.assertEqual(len(threads), 2) | |
1381 tid = sorted(threads, key=lambda x: x.id)[1].id | |
1382 self.assertNotEqual(p.pid, tid) | |
1383 pt = psutil.Process(tid) | |
1384 pt.as_dict() | |
1385 self.assertNotIn(tid, psutil.pids()) | |
1386 finally: | |
1387 t.stop() | |
1388 | |
1389 def test_pid_exists_no_proc_status(self): | |
1390 # Internally pid_exists relies on /proc/{pid}/status. | |
1391 # Emulate a case where this file is empty in which case | |
1392 # psutil is supposed to fall back on using pids(). | |
1393 with mock_open_content("/proc/%s/status", "") as m: | |
1394 assert psutil.pid_exists(os.getpid()) | |
1395 assert m.called | |
1396 | |
1397 | |
1398 # ===================================================================== | |
1399 # --- sensors | |
1400 # ===================================================================== | |
1401 | |
1402 | |
1403 @unittest.skipIf(not LINUX, "LINUX only") | |
1404 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
1405 class TestSensorsBattery(PsutilTestCase): | |
1406 | |
1407 @unittest.skipIf(not which("acpi"), "acpi utility not available") | |
1408 def test_percent(self): | |
1409 out = sh("acpi -b") | |
1410 acpi_value = int(out.split(",")[1].strip().replace('%', '')) | |
1411 psutil_value = psutil.sensors_battery().percent | |
1412 self.assertAlmostEqual(acpi_value, psutil_value, delta=1) | |
1413 | |
1414 @unittest.skipIf(not which("acpi"), "acpi utility not available") | |
1415 def test_power_plugged(self): | |
1416 out = sh("acpi -b") | |
1417 if 'unknown' in out.lower(): | |
1418 return unittest.skip("acpi output not reliable") | |
1419 if 'discharging at zero rate' in out: | |
1420 plugged = True | |
1421 else: | |
1422 plugged = "Charging" in out.split('\n')[0] | |
1423 self.assertEqual(psutil.sensors_battery().power_plugged, plugged) | |
1424 | |
1425 def test_emulate_power_plugged(self): | |
1426 # Pretend the AC power cable is connected. | |
1427 def open_mock(name, *args, **kwargs): | |
1428 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
1429 return io.BytesIO(b"1") | |
1430 else: | |
1431 return orig_open(name, *args, **kwargs) | |
1432 | |
1433 orig_open = open | |
1434 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1435 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1436 self.assertEqual(psutil.sensors_battery().power_plugged, True) | |
1437 self.assertEqual( | |
1438 psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED) | |
1439 assert m.called | |
1440 | |
1441 def test_emulate_power_plugged_2(self): | |
1442 # Same as above but pretend /AC0/online does not exist in which | |
1443 # case code relies on /status file. | |
1444 def open_mock(name, *args, **kwargs): | |
1445 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
1446 raise IOError(errno.ENOENT, "") | |
1447 elif name.endswith("/status"): | |
1448 return io.StringIO(u("charging")) | |
1449 else: | |
1450 return orig_open(name, *args, **kwargs) | |
1451 | |
1452 orig_open = open | |
1453 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1454 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1455 self.assertEqual(psutil.sensors_battery().power_plugged, True) | |
1456 assert m.called | |
1457 | |
1458 def test_emulate_power_not_plugged(self): | |
1459 # Pretend the AC power cable is not connected. | |
1460 def open_mock(name, *args, **kwargs): | |
1461 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
1462 return io.BytesIO(b"0") | |
1463 else: | |
1464 return orig_open(name, *args, **kwargs) | |
1465 | |
1466 orig_open = open | |
1467 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1468 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1469 self.assertEqual(psutil.sensors_battery().power_plugged, False) | |
1470 assert m.called | |
1471 | |
1472 def test_emulate_power_not_plugged_2(self): | |
1473 # Same as above but pretend /AC0/online does not exist in which | |
1474 # case code relies on /status file. | |
1475 def open_mock(name, *args, **kwargs): | |
1476 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
1477 raise IOError(errno.ENOENT, "") | |
1478 elif name.endswith("/status"): | |
1479 return io.StringIO(u("discharging")) | |
1480 else: | |
1481 return orig_open(name, *args, **kwargs) | |
1482 | |
1483 orig_open = open | |
1484 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1485 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1486 self.assertEqual(psutil.sensors_battery().power_plugged, False) | |
1487 assert m.called | |
1488 | |
1489 def test_emulate_power_undetermined(self): | |
1490 # Pretend we can't know whether the AC power cable not | |
1491 # connected (assert fallback to False). | |
1492 def open_mock(name, *args, **kwargs): | |
1493 if name.startswith("/sys/class/power_supply/AC0/online") or \ | |
1494 name.startswith("/sys/class/power_supply/AC/online"): | |
1495 raise IOError(errno.ENOENT, "") | |
1496 elif name.startswith("/sys/class/power_supply/BAT0/status"): | |
1497 return io.BytesIO(b"???") | |
1498 else: | |
1499 return orig_open(name, *args, **kwargs) | |
1500 | |
1501 orig_open = open | |
1502 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1503 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1504 self.assertIsNone(psutil.sensors_battery().power_plugged) | |
1505 assert m.called | |
1506 | |
1507 def test_emulate_no_base_files(self): | |
1508 # Emulate a case where base metrics files are not present, | |
1509 # in which case we're supposed to get None. | |
1510 with mock_open_exception( | |
1511 "/sys/class/power_supply/BAT0/energy_now", | |
1512 IOError(errno.ENOENT, "")): | |
1513 with mock_open_exception( | |
1514 "/sys/class/power_supply/BAT0/charge_now", | |
1515 IOError(errno.ENOENT, "")): | |
1516 self.assertIsNone(psutil.sensors_battery()) | |
1517 | |
1518 def test_emulate_energy_full_0(self): | |
1519 # Emulate a case where energy_full files returns 0. | |
1520 with mock_open_content( | |
1521 "/sys/class/power_supply/BAT0/energy_full", b"0") as m: | |
1522 self.assertEqual(psutil.sensors_battery().percent, 0) | |
1523 assert m.called | |
1524 | |
1525 def test_emulate_energy_full_not_avail(self): | |
1526 # Emulate a case where energy_full file does not exist. | |
1527 # Expected fallback on /capacity. | |
1528 with mock_open_exception( | |
1529 "/sys/class/power_supply/BAT0/energy_full", | |
1530 IOError(errno.ENOENT, "")): | |
1531 with mock_open_exception( | |
1532 "/sys/class/power_supply/BAT0/charge_full", | |
1533 IOError(errno.ENOENT, "")): | |
1534 with mock_open_content( | |
1535 "/sys/class/power_supply/BAT0/capacity", b"88"): | |
1536 self.assertEqual(psutil.sensors_battery().percent, 88) | |
1537 | |
1538 def test_emulate_no_power(self): | |
1539 # Emulate a case where /AC0/online file nor /BAT0/status exist. | |
1540 with mock_open_exception( | |
1541 "/sys/class/power_supply/AC/online", | |
1542 IOError(errno.ENOENT, "")): | |
1543 with mock_open_exception( | |
1544 "/sys/class/power_supply/AC0/online", | |
1545 IOError(errno.ENOENT, "")): | |
1546 with mock_open_exception( | |
1547 "/sys/class/power_supply/BAT0/status", | |
1548 IOError(errno.ENOENT, "")): | |
1549 self.assertIsNone(psutil.sensors_battery().power_plugged) | |
1550 | |
1551 | |
1552 @unittest.skipIf(not LINUX, "LINUX only") | |
1553 class TestSensorsTemperatures(PsutilTestCase): | |
1554 | |
1555 def test_emulate_class_hwmon(self): | |
1556 def open_mock(name, *args, **kwargs): | |
1557 if name.endswith('/name'): | |
1558 return io.StringIO(u("name")) | |
1559 elif name.endswith('/temp1_label'): | |
1560 return io.StringIO(u("label")) | |
1561 elif name.endswith('/temp1_input'): | |
1562 return io.BytesIO(b"30000") | |
1563 elif name.endswith('/temp1_max'): | |
1564 return io.BytesIO(b"40000") | |
1565 elif name.endswith('/temp1_crit'): | |
1566 return io.BytesIO(b"50000") | |
1567 else: | |
1568 return orig_open(name, *args, **kwargs) | |
1569 | |
1570 orig_open = open | |
1571 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1572 with mock.patch(patch_point, side_effect=open_mock): | |
1573 # Test case with /sys/class/hwmon | |
1574 with mock.patch('glob.glob', | |
1575 return_value=['/sys/class/hwmon/hwmon0/temp1']): | |
1576 temp = psutil.sensors_temperatures()['name'][0] | |
1577 self.assertEqual(temp.label, 'label') | |
1578 self.assertEqual(temp.current, 30.0) | |
1579 self.assertEqual(temp.high, 40.0) | |
1580 self.assertEqual(temp.critical, 50.0) | |
1581 | |
1582 def test_emulate_class_thermal(self): | |
1583 def open_mock(name, *args, **kwargs): | |
1584 if name.endswith('0_temp'): | |
1585 return io.BytesIO(b"50000") | |
1586 elif name.endswith('temp'): | |
1587 return io.BytesIO(b"30000") | |
1588 elif name.endswith('0_type'): | |
1589 return io.StringIO(u("critical")) | |
1590 elif name.endswith('type'): | |
1591 return io.StringIO(u("name")) | |
1592 else: | |
1593 return orig_open(name, *args, **kwargs) | |
1594 | |
1595 def glob_mock(path): | |
1596 if path == '/sys/class/hwmon/hwmon*/temp*_*': | |
1597 return [] | |
1598 elif path == '/sys/class/hwmon/hwmon*/device/temp*_*': | |
1599 return [] | |
1600 elif path == '/sys/class/thermal/thermal_zone*': | |
1601 return ['/sys/class/thermal/thermal_zone0'] | |
1602 elif path == '/sys/class/thermal/thermal_zone0/trip_point*': | |
1603 return ['/sys/class/thermal/thermal_zone1/trip_point_0_type', | |
1604 '/sys/class/thermal/thermal_zone1/trip_point_0_temp'] | |
1605 return [] | |
1606 | |
1607 orig_open = open | |
1608 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1609 with mock.patch(patch_point, side_effect=open_mock): | |
1610 with mock.patch('glob.glob', create=True, side_effect=glob_mock): | |
1611 temp = psutil.sensors_temperatures()['name'][0] | |
1612 self.assertEqual(temp.label, '') | |
1613 self.assertEqual(temp.current, 30.0) | |
1614 self.assertEqual(temp.high, 50.0) | |
1615 self.assertEqual(temp.critical, 50.0) | |
1616 | |
1617 | |
1618 @unittest.skipIf(not LINUX, "LINUX only") | |
1619 class TestSensorsFans(PsutilTestCase): | |
1620 | |
1621 def test_emulate_data(self): | |
1622 def open_mock(name, *args, **kwargs): | |
1623 if name.endswith('/name'): | |
1624 return io.StringIO(u("name")) | |
1625 elif name.endswith('/fan1_label'): | |
1626 return io.StringIO(u("label")) | |
1627 elif name.endswith('/fan1_input'): | |
1628 return io.StringIO(u("2000")) | |
1629 else: | |
1630 return orig_open(name, *args, **kwargs) | |
1631 | |
1632 orig_open = open | |
1633 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1634 with mock.patch(patch_point, side_effect=open_mock): | |
1635 with mock.patch('glob.glob', | |
1636 return_value=['/sys/class/hwmon/hwmon2/fan1']): | |
1637 fan = psutil.sensors_fans()['name'][0] | |
1638 self.assertEqual(fan.label, 'label') | |
1639 self.assertEqual(fan.current, 2000) | |
1640 | |
1641 | |
1642 # ===================================================================== | |
1643 # --- test process | |
1644 # ===================================================================== | |
1645 | |
1646 | |
1647 @unittest.skipIf(not LINUX, "LINUX only") | |
1648 class TestProcess(PsutilTestCase): | |
1649 | |
1650 @retry_on_failure() | |
1651 def test_memory_full_info(self): | |
1652 testfn = self.get_testfn() | |
1653 src = textwrap.dedent(""" | |
1654 import time | |
1655 with open("%s", "w") as f: | |
1656 time.sleep(10) | |
1657 """ % testfn) | |
1658 sproc = self.pyrun(src) | |
1659 call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn) | |
1660 p = psutil.Process(sproc.pid) | |
1661 time.sleep(.1) | |
1662 mem = p.memory_full_info() | |
1663 maps = p.memory_maps(grouped=False) | |
1664 self.assertAlmostEqual( | |
1665 mem.uss, sum([x.private_dirty + x.private_clean for x in maps]), | |
1666 delta=4096) | |
1667 self.assertAlmostEqual( | |
1668 mem.pss, sum([x.pss for x in maps]), delta=4096) | |
1669 self.assertAlmostEqual( | |
1670 mem.swap, sum([x.swap for x in maps]), delta=4096) | |
1671 | |
1672 def test_memory_full_info_mocked(self): | |
1673 # See: https://github.com/giampaolo/psutil/issues/1222 | |
1674 with mock_open_content( | |
1675 "/proc/%s/smaps" % os.getpid(), | |
1676 textwrap.dedent("""\ | |
1677 fffff0 r-xp 00000000 00:00 0 [vsyscall] | |
1678 Size: 1 kB | |
1679 Rss: 2 kB | |
1680 Pss: 3 kB | |
1681 Shared_Clean: 4 kB | |
1682 Shared_Dirty: 5 kB | |
1683 Private_Clean: 6 kB | |
1684 Private_Dirty: 7 kB | |
1685 Referenced: 8 kB | |
1686 Anonymous: 9 kB | |
1687 LazyFree: 10 kB | |
1688 AnonHugePages: 11 kB | |
1689 ShmemPmdMapped: 12 kB | |
1690 Shared_Hugetlb: 13 kB | |
1691 Private_Hugetlb: 14 kB | |
1692 Swap: 15 kB | |
1693 SwapPss: 16 kB | |
1694 KernelPageSize: 17 kB | |
1695 MMUPageSize: 18 kB | |
1696 Locked: 19 kB | |
1697 VmFlags: rd ex | |
1698 """).encode()) as m: | |
1699 p = psutil.Process() | |
1700 mem = p.memory_full_info() | |
1701 assert m.called | |
1702 self.assertEqual(mem.uss, (6 + 7 + 14) * 1024) | |
1703 self.assertEqual(mem.pss, 3 * 1024) | |
1704 self.assertEqual(mem.swap, 15 * 1024) | |
1705 | |
1706 # On PYPY file descriptors are not closed fast enough. | |
1707 @unittest.skipIf(PYPY, "unreliable on PYPY") | |
1708 def test_open_files_mode(self): | |
1709 def get_test_file(fname): | |
1710 p = psutil.Process() | |
1711 giveup_at = time.time() + GLOBAL_TIMEOUT | |
1712 while True: | |
1713 for file in p.open_files(): | |
1714 if file.path == os.path.abspath(fname): | |
1715 return file | |
1716 elif time.time() > giveup_at: | |
1717 break | |
1718 raise RuntimeError("timeout looking for test file") | |
1719 | |
1720 # | |
1721 testfn = self.get_testfn() | |
1722 with open(testfn, "w"): | |
1723 self.assertEqual(get_test_file(testfn).mode, "w") | |
1724 with open(testfn, "r"): | |
1725 self.assertEqual(get_test_file(testfn).mode, "r") | |
1726 with open(testfn, "a"): | |
1727 self.assertEqual(get_test_file(testfn).mode, "a") | |
1728 # | |
1729 with open(testfn, "r+"): | |
1730 self.assertEqual(get_test_file(testfn).mode, "r+") | |
1731 with open(testfn, "w+"): | |
1732 self.assertEqual(get_test_file(testfn).mode, "r+") | |
1733 with open(testfn, "a+"): | |
1734 self.assertEqual(get_test_file(testfn).mode, "a+") | |
1735 # note: "x" bit is not supported | |
1736 if PY3: | |
1737 safe_rmpath(testfn) | |
1738 with open(testfn, "x"): | |
1739 self.assertEqual(get_test_file(testfn).mode, "w") | |
1740 safe_rmpath(testfn) | |
1741 with open(testfn, "x+"): | |
1742 self.assertEqual(get_test_file(testfn).mode, "r+") | |
1743 | |
1744 def test_open_files_file_gone(self): | |
1745 # simulates a file which gets deleted during open_files() | |
1746 # execution | |
1747 p = psutil.Process() | |
1748 files = p.open_files() | |
1749 with open(self.get_testfn(), 'w'): | |
1750 # give the kernel some time to see the new file | |
1751 call_until(p.open_files, "len(ret) != %i" % len(files)) | |
1752 with mock.patch('psutil._pslinux.os.readlink', | |
1753 side_effect=OSError(errno.ENOENT, "")) as m: | |
1754 files = p.open_files() | |
1755 assert not files | |
1756 assert m.called | |
1757 # also simulate the case where os.readlink() returns EINVAL | |
1758 # in which case psutil is supposed to 'continue' | |
1759 with mock.patch('psutil._pslinux.os.readlink', | |
1760 side_effect=OSError(errno.EINVAL, "")) as m: | |
1761 self.assertEqual(p.open_files(), []) | |
1762 assert m.called | |
1763 | |
1764 def test_open_files_fd_gone(self): | |
1765 # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears | |
1766 # while iterating through fds. | |
1767 # https://travis-ci.org/giampaolo/psutil/jobs/225694530 | |
1768 p = psutil.Process() | |
1769 files = p.open_files() | |
1770 with open(self.get_testfn(), 'w'): | |
1771 # give the kernel some time to see the new file | |
1772 call_until(p.open_files, "len(ret) != %i" % len(files)) | |
1773 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1774 with mock.patch(patch_point, | |
1775 side_effect=IOError(errno.ENOENT, "")) as m: | |
1776 files = p.open_files() | |
1777 assert not files | |
1778 assert m.called | |
1779 | |
1780 # --- mocked tests | |
1781 | |
1782 def test_terminal_mocked(self): | |
1783 with mock.patch('psutil._pslinux._psposix.get_terminal_map', | |
1784 return_value={}) as m: | |
1785 self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) | |
1786 assert m.called | |
1787 | |
1788 # TODO: re-enable this test. | |
1789 # def test_num_ctx_switches_mocked(self): | |
1790 # with mock.patch('psutil._common.open', create=True) as m: | |
1791 # self.assertRaises( | |
1792 # NotImplementedError, | |
1793 # psutil._pslinux.Process(os.getpid()).num_ctx_switches) | |
1794 # assert m.called | |
1795 | |
1796 def test_cmdline_mocked(self): | |
1797 # see: https://github.com/giampaolo/psutil/issues/639 | |
1798 p = psutil.Process() | |
1799 fake_file = io.StringIO(u('foo\x00bar\x00')) | |
1800 with mock.patch('psutil._common.open', | |
1801 return_value=fake_file, create=True) as m: | |
1802 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
1803 assert m.called | |
1804 fake_file = io.StringIO(u('foo\x00bar\x00\x00')) | |
1805 with mock.patch('psutil._common.open', | |
1806 return_value=fake_file, create=True) as m: | |
1807 self.assertEqual(p.cmdline(), ['foo', 'bar', '']) | |
1808 assert m.called | |
1809 | |
1810 def test_cmdline_spaces_mocked(self): | |
1811 # see: https://github.com/giampaolo/psutil/issues/1179 | |
1812 p = psutil.Process() | |
1813 fake_file = io.StringIO(u('foo bar ')) | |
1814 with mock.patch('psutil._common.open', | |
1815 return_value=fake_file, create=True) as m: | |
1816 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
1817 assert m.called | |
1818 fake_file = io.StringIO(u('foo bar ')) | |
1819 with mock.patch('psutil._common.open', | |
1820 return_value=fake_file, create=True) as m: | |
1821 self.assertEqual(p.cmdline(), ['foo', 'bar', '']) | |
1822 assert m.called | |
1823 | |
1824 def test_cmdline_mixed_separators(self): | |
1825 # https://github.com/giampaolo/psutil/issues/ | |
1826 # 1179#issuecomment-552984549 | |
1827 p = psutil.Process() | |
1828 fake_file = io.StringIO(u('foo\x20bar\x00')) | |
1829 with mock.patch('psutil._common.open', | |
1830 return_value=fake_file, create=True) as m: | |
1831 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
1832 assert m.called | |
1833 | |
1834 def test_readlink_path_deleted_mocked(self): | |
1835 with mock.patch('psutil._pslinux.os.readlink', | |
1836 return_value='/home/foo (deleted)'): | |
1837 self.assertEqual(psutil.Process().exe(), "/home/foo") | |
1838 self.assertEqual(psutil.Process().cwd(), "/home/foo") | |
1839 | |
1840 def test_threads_mocked(self): | |
1841 # Test the case where os.listdir() returns a file (thread) | |
1842 # which no longer exists by the time we open() it (race | |
1843 # condition). threads() is supposed to ignore that instead | |
1844 # of raising NSP. | |
1845 def open_mock(name, *args, **kwargs): | |
1846 if name.startswith('/proc/%s/task' % os.getpid()): | |
1847 raise IOError(errno.ENOENT, "") | |
1848 else: | |
1849 return orig_open(name, *args, **kwargs) | |
1850 | |
1851 orig_open = open | |
1852 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1853 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1854 ret = psutil.Process().threads() | |
1855 assert m.called | |
1856 self.assertEqual(ret, []) | |
1857 | |
1858 # ...but if it bumps into something != ENOENT we want an | |
1859 # exception. | |
1860 def open_mock(name, *args, **kwargs): | |
1861 if name.startswith('/proc/%s/task' % os.getpid()): | |
1862 raise IOError(errno.EPERM, "") | |
1863 else: | |
1864 return orig_open(name, *args, **kwargs) | |
1865 | |
1866 with mock.patch(patch_point, side_effect=open_mock): | |
1867 self.assertRaises(psutil.AccessDenied, psutil.Process().threads) | |
1868 | |
1869 def test_exe_mocked(self): | |
1870 with mock.patch('psutil._pslinux.readlink', | |
1871 side_effect=OSError(errno.ENOENT, "")) as m1: | |
1872 with mock.patch('psutil.Process.cmdline', | |
1873 side_effect=psutil.AccessDenied(0, "")) as m2: | |
1874 # No such file error; might be raised also if /proc/pid/exe | |
1875 # path actually exists for system processes with low pids | |
1876 # (about 0-20). In this case psutil is supposed to return | |
1877 # an empty string. | |
1878 ret = psutil.Process().exe() | |
1879 assert m1.called | |
1880 assert m2.called | |
1881 self.assertEqual(ret, "") | |
1882 | |
1883 # ...but if /proc/pid no longer exist we're supposed to treat | |
1884 # it as an alias for zombie process | |
1885 with mock.patch('psutil._pslinux.os.path.lexists', | |
1886 return_value=False): | |
1887 self.assertRaises( | |
1888 psutil.ZombieProcess, psutil.Process().exe) | |
1889 | |
1890 def test_issue_1014(self): | |
1891 # Emulates a case where smaps file does not exist. In this case | |
1892 # wrap_exception decorator should not raise NoSuchProcess. | |
1893 with mock_open_exception( | |
1894 '/proc/%s/smaps' % os.getpid(), | |
1895 IOError(errno.ENOENT, "")) as m: | |
1896 p = psutil.Process() | |
1897 with self.assertRaises(FileNotFoundError): | |
1898 p.memory_maps() | |
1899 assert m.called | |
1900 | |
1901 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
1902 def test_rlimit_zombie(self): | |
1903 # Emulate a case where rlimit() raises ENOSYS, which may | |
1904 # happen in case of zombie process: | |
1905 # https://travis-ci.org/giampaolo/psutil/jobs/51368273 | |
1906 with mock.patch("psutil._pslinux.cext.linux_prlimit", | |
1907 side_effect=OSError(errno.ENOSYS, "")) as m: | |
1908 p = psutil.Process() | |
1909 p.name() | |
1910 with self.assertRaises(psutil.ZombieProcess) as exc: | |
1911 p.rlimit(psutil.RLIMIT_NOFILE) | |
1912 assert m.called | |
1913 self.assertEqual(exc.exception.pid, p.pid) | |
1914 self.assertEqual(exc.exception.name, p.name()) | |
1915 | |
1916 def test_cwd_zombie(self): | |
1917 with mock.patch("psutil._pslinux.os.readlink", | |
1918 side_effect=OSError(errno.ENOENT, "")) as m: | |
1919 p = psutil.Process() | |
1920 p.name() | |
1921 with self.assertRaises(psutil.ZombieProcess) as exc: | |
1922 p.cwd() | |
1923 assert m.called | |
1924 self.assertEqual(exc.exception.pid, p.pid) | |
1925 self.assertEqual(exc.exception.name, p.name()) | |
1926 | |
1927 def test_stat_file_parsing(self): | |
1928 from psutil._pslinux import CLOCK_TICKS | |
1929 | |
1930 args = [ | |
1931 "0", # pid | |
1932 "(cat)", # name | |
1933 "Z", # status | |
1934 "1", # ppid | |
1935 "0", # pgrp | |
1936 "0", # session | |
1937 "0", # tty | |
1938 "0", # tpgid | |
1939 "0", # flags | |
1940 "0", # minflt | |
1941 "0", # cminflt | |
1942 "0", # majflt | |
1943 "0", # cmajflt | |
1944 "2", # utime | |
1945 "3", # stime | |
1946 "4", # cutime | |
1947 "5", # cstime | |
1948 "0", # priority | |
1949 "0", # nice | |
1950 "0", # num_threads | |
1951 "0", # itrealvalue | |
1952 "6", # starttime | |
1953 "0", # vsize | |
1954 "0", # rss | |
1955 "0", # rsslim | |
1956 "0", # startcode | |
1957 "0", # endcode | |
1958 "0", # startstack | |
1959 "0", # kstkesp | |
1960 "0", # kstkeip | |
1961 "0", # signal | |
1962 "0", # blocked | |
1963 "0", # sigignore | |
1964 "0", # sigcatch | |
1965 "0", # wchan | |
1966 "0", # nswap | |
1967 "0", # cnswap | |
1968 "0", # exit_signal | |
1969 "6", # processor | |
1970 "0", # rt priority | |
1971 "0", # policy | |
1972 "7", # delayacct_blkio_ticks | |
1973 ] | |
1974 content = " ".join(args).encode() | |
1975 with mock_open_content('/proc/%s/stat' % os.getpid(), content): | |
1976 p = psutil.Process() | |
1977 self.assertEqual(p.name(), 'cat') | |
1978 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) | |
1979 self.assertEqual(p.ppid(), 1) | |
1980 self.assertEqual( | |
1981 p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time()) | |
1982 cpu = p.cpu_times() | |
1983 self.assertEqual(cpu.user, 2 / CLOCK_TICKS) | |
1984 self.assertEqual(cpu.system, 3 / CLOCK_TICKS) | |
1985 self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS) | |
1986 self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS) | |
1987 self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS) | |
1988 self.assertEqual(p.cpu_num(), 6) | |
1989 | |
1990 def test_status_file_parsing(self): | |
1991 with mock_open_content( | |
1992 '/proc/%s/status' % os.getpid(), | |
1993 textwrap.dedent("""\ | |
1994 Uid:\t1000\t1001\t1002\t1003 | |
1995 Gid:\t1004\t1005\t1006\t1007 | |
1996 Threads:\t66 | |
1997 Cpus_allowed:\tf | |
1998 Cpus_allowed_list:\t0-7 | |
1999 voluntary_ctxt_switches:\t12 | |
2000 nonvoluntary_ctxt_switches:\t13""").encode()): | |
2001 p = psutil.Process() | |
2002 self.assertEqual(p.num_ctx_switches().voluntary, 12) | |
2003 self.assertEqual(p.num_ctx_switches().involuntary, 13) | |
2004 self.assertEqual(p.num_threads(), 66) | |
2005 uids = p.uids() | |
2006 self.assertEqual(uids.real, 1000) | |
2007 self.assertEqual(uids.effective, 1001) | |
2008 self.assertEqual(uids.saved, 1002) | |
2009 gids = p.gids() | |
2010 self.assertEqual(gids.real, 1004) | |
2011 self.assertEqual(gids.effective, 1005) | |
2012 self.assertEqual(gids.saved, 1006) | |
2013 self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8))) | |
2014 | |
2015 | |
2016 @unittest.skipIf(not LINUX, "LINUX only") | |
2017 class TestProcessAgainstStatus(PsutilTestCase): | |
2018 """/proc/pid/stat and /proc/pid/status have many values in common. | |
2019 Whenever possible, psutil uses /proc/pid/stat (it's faster). | |
2020 For all those cases we check that the value found in | |
2021 /proc/pid/stat (by psutil) matches the one found in | |
2022 /proc/pid/status. | |
2023 """ | |
2024 | |
2025 @classmethod | |
2026 def setUpClass(cls): | |
2027 cls.proc = psutil.Process() | |
2028 | |
2029 def read_status_file(self, linestart): | |
2030 with psutil._psplatform.open_text( | |
2031 '/proc/%s/status' % self.proc.pid) as f: | |
2032 for line in f: | |
2033 line = line.strip() | |
2034 if line.startswith(linestart): | |
2035 value = line.partition('\t')[2] | |
2036 try: | |
2037 return int(value) | |
2038 except ValueError: | |
2039 return value | |
2040 raise ValueError("can't find %r" % linestart) | |
2041 | |
2042 def test_name(self): | |
2043 value = self.read_status_file("Name:") | |
2044 self.assertEqual(self.proc.name(), value) | |
2045 | |
2046 def test_status(self): | |
2047 value = self.read_status_file("State:") | |
2048 value = value[value.find('(') + 1:value.rfind(')')] | |
2049 value = value.replace(' ', '-') | |
2050 self.assertEqual(self.proc.status(), value) | |
2051 | |
2052 def test_ppid(self): | |
2053 value = self.read_status_file("PPid:") | |
2054 self.assertEqual(self.proc.ppid(), value) | |
2055 | |
2056 def test_num_threads(self): | |
2057 value = self.read_status_file("Threads:") | |
2058 self.assertEqual(self.proc.num_threads(), value) | |
2059 | |
2060 def test_uids(self): | |
2061 value = self.read_status_file("Uid:") | |
2062 value = tuple(map(int, value.split()[1:4])) | |
2063 self.assertEqual(self.proc.uids(), value) | |
2064 | |
2065 def test_gids(self): | |
2066 value = self.read_status_file("Gid:") | |
2067 value = tuple(map(int, value.split()[1:4])) | |
2068 self.assertEqual(self.proc.gids(), value) | |
2069 | |
2070 @retry_on_failure() | |
2071 def test_num_ctx_switches(self): | |
2072 value = self.read_status_file("voluntary_ctxt_switches:") | |
2073 self.assertEqual(self.proc.num_ctx_switches().voluntary, value) | |
2074 value = self.read_status_file("nonvoluntary_ctxt_switches:") | |
2075 self.assertEqual(self.proc.num_ctx_switches().involuntary, value) | |
2076 | |
2077 def test_cpu_affinity(self): | |
2078 value = self.read_status_file("Cpus_allowed_list:") | |
2079 if '-' in str(value): | |
2080 min_, max_ = map(int, value.split('-')) | |
2081 self.assertEqual( | |
2082 self.proc.cpu_affinity(), list(range(min_, max_ + 1))) | |
2083 | |
2084 def test_cpu_affinity_eligible_cpus(self): | |
2085 value = self.read_status_file("Cpus_allowed_list:") | |
2086 with mock.patch("psutil._pslinux.per_cpu_times") as m: | |
2087 self.proc._proc._get_eligible_cpus() | |
2088 if '-' in str(value): | |
2089 assert not m.called | |
2090 else: | |
2091 assert m.called | |
2092 | |
2093 | |
2094 # ===================================================================== | |
2095 # --- test utils | |
2096 # ===================================================================== | |
2097 | |
2098 | |
2099 @unittest.skipIf(not LINUX, "LINUX only") | |
2100 class TestUtils(PsutilTestCase): | |
2101 | |
2102 def test_readlink(self): | |
2103 with mock.patch("os.readlink", return_value="foo (deleted)") as m: | |
2104 self.assertEqual(psutil._psplatform.readlink("bar"), "foo") | |
2105 assert m.called | |
2106 | |
2107 def test_cat(self): | |
2108 testfn = self.get_testfn() | |
2109 with open(testfn, "wt") as f: | |
2110 f.write("foo ") | |
2111 self.assertEqual(psutil._psplatform.cat(testfn, binary=False), "foo") | |
2112 self.assertEqual(psutil._psplatform.cat(testfn, binary=True), b"foo") | |
2113 self.assertEqual( | |
2114 psutil._psplatform.cat(testfn + '??', fallback="bar"), "bar") | |
2115 | |
2116 | |
2117 if __name__ == '__main__': | |
2118 from psutil.tests.runner import run_from_name | |
2119 run_from_name(__file__) |