comparison planemo/lib/python3.7/site-packages/psutil/tests/test_linux.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """Linux specific tests."""
8
9 from __future__ import division
10 import collections
11 import contextlib
12 import errno
13 import glob
14 import io
15 import os
16 import re
17 import shutil
18 import socket
19 import struct
20 import textwrap
21 import time
22 import warnings
23
24 import psutil
25 from psutil import LINUX
26 from psutil._compat import basestring
27 from psutil._compat import FileNotFoundError
28 from psutil._compat import PY3
29 from psutil._compat import u
30 from psutil.tests import call_until
31 from psutil.tests import GLOBAL_TIMEOUT
32 from psutil.tests import HAS_BATTERY
33 from psutil.tests import HAS_CPU_FREQ
34 from psutil.tests import HAS_GETLOADAVG
35 from psutil.tests import HAS_RLIMIT
36 from psutil.tests import mock
37 from psutil.tests import PsutilTestCase
38 from psutil.tests import PYPY
39 from psutil.tests import reload_module
40 from psutil.tests import retry_on_failure
41 from psutil.tests import safe_rmpath
42 from psutil.tests import sh
43 from psutil.tests import skip_on_not_implemented
44 from psutil.tests import ThreadTask
45 from psutil.tests import TOLERANCE_DISK_USAGE
46 from psutil.tests import TOLERANCE_SYS_MEM
47 from psutil.tests import TRAVIS
48 from psutil.tests import unittest
49 from psutil.tests import which
50
51
52 HERE = os.path.abspath(os.path.dirname(__file__))
53 SIOCGIFADDR = 0x8915
54 SIOCGIFCONF = 0x8912
55 SIOCGIFHWADDR = 0x8927
56 if LINUX:
57 SECTOR_SIZE = 512
58 EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*')
59
60 # =====================================================================
61 # --- utils
62 # =====================================================================
63
64
65 def get_ipv4_address(ifname):
66 import fcntl
67 ifname = ifname[:15]
68 if PY3:
69 ifname = bytes(ifname, 'ascii')
70 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
71 with contextlib.closing(s):
72 return socket.inet_ntoa(
73 fcntl.ioctl(s.fileno(),
74 SIOCGIFADDR,
75 struct.pack('256s', ifname))[20:24])
76
77
78 def get_mac_address(ifname):
79 import fcntl
80 ifname = ifname[:15]
81 if PY3:
82 ifname = bytes(ifname, 'ascii')
83 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
84 with contextlib.closing(s):
85 info = fcntl.ioctl(
86 s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
87 if PY3:
88 def ord(x):
89 return x
90 else:
91 import __builtin__
92 ord = __builtin__.ord
93 return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
94
95
96 def free_swap():
97 """Parse 'free' cmd and return swap memory's s total, used and free
98 values.
99 """
100 out = sh('free -b', env={"LANG": "C.UTF-8"})
101 lines = out.split('\n')
102 for line in lines:
103 if line.startswith('Swap'):
104 _, total, used, free = line.split()
105 nt = collections.namedtuple('free', 'total used free')
106 return nt(int(total), int(used), int(free))
107 raise ValueError(
108 "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines))
109
110
111 def free_physmem():
112 """Parse 'free' cmd and return physical memory's total, used
113 and free values.
114 """
115 # Note: free can have 2 different formats, invalidating 'shared'
116 # and 'cached' memory which may have different positions so we
117 # do not return them.
118 # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946
119 out = sh('free -b', env={"LANG": "C.UTF-8"})
120 lines = out.split('\n')
121 for line in lines:
122 if line.startswith('Mem'):
123 total, used, free, shared = \
124 [int(x) for x in line.split()[1:5]]
125 nt = collections.namedtuple(
126 'free', 'total used free shared output')
127 return nt(total, used, free, shared, out)
128 raise ValueError(
129 "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines))
130
131
132 def vmstat(stat):
133 out = sh("vmstat -s", env={"LANG": "C.UTF-8"})
134 for line in out.split("\n"):
135 line = line.strip()
136 if stat in line:
137 return int(line.split(' ')[0])
138 raise ValueError("can't find %r in 'vmstat' output" % stat)
139
140
141 def get_free_version_info():
142 out = sh("free -V").strip()
143 if 'UNKNOWN' in out:
144 raise unittest.SkipTest("can't determine free version")
145 return tuple(map(int, out.split()[-1].split('.')))
146
147
148 @contextlib.contextmanager
149 def mock_open_content(for_path, content):
150 """Mock open() builtin and forces it to return a certain `content`
151 on read() if the path being opened matches `for_path`.
152 """
153 def open_mock(name, *args, **kwargs):
154 if name == for_path:
155 if PY3:
156 if isinstance(content, basestring):
157 return io.StringIO(content)
158 else:
159 return io.BytesIO(content)
160 else:
161 return io.BytesIO(content)
162 else:
163 return orig_open(name, *args, **kwargs)
164
165 orig_open = open
166 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
167 with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
168 yield m
169
170
171 @contextlib.contextmanager
172 def mock_open_exception(for_path, exc):
173 """Mock open() builtin and raises `exc` if the path being opened
174 matches `for_path`.
175 """
176 def open_mock(name, *args, **kwargs):
177 if name == for_path:
178 raise exc
179 else:
180 return orig_open(name, *args, **kwargs)
181
182 orig_open = open
183 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
184 with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
185 yield m
186
187
188 # =====================================================================
189 # --- system virtual memory
190 # =====================================================================
191
192
193 @unittest.skipIf(not LINUX, "LINUX only")
194 class TestSystemVirtualMemory(PsutilTestCase):
195
196 def test_total(self):
197 # free_value = free_physmem().total
198 # psutil_value = psutil.virtual_memory().total
199 # self.assertEqual(free_value, psutil_value)
200 vmstat_value = vmstat('total memory') * 1024
201 psutil_value = psutil.virtual_memory().total
202 self.assertAlmostEqual(vmstat_value, psutil_value)
203
204 @retry_on_failure()
205 def test_used(self):
206 # Older versions of procps used slab memory to calculate used memory.
207 # This got changed in:
208 # https://gitlab.com/procps-ng/procps/commit/
209 # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e
210 if get_free_version_info() < (3, 3, 12):
211 raise self.skipTest("old free version")
212 free = free_physmem()
213 free_value = free.used
214 psutil_value = psutil.virtual_memory().used
215 self.assertAlmostEqual(
216 free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
217 msg='%s %s \n%s' % (free_value, psutil_value, free.output))
218
219 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
220 @retry_on_failure()
221 def test_free(self):
222 vmstat_value = vmstat('free memory') * 1024
223 psutil_value = psutil.virtual_memory().free
224 self.assertAlmostEqual(
225 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
226
227 @retry_on_failure()
228 def test_buffers(self):
229 vmstat_value = vmstat('buffer memory') * 1024
230 psutil_value = psutil.virtual_memory().buffers
231 self.assertAlmostEqual(
232 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
233
234 # https://travis-ci.org/giampaolo/psutil/jobs/226719664
235 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
236 @retry_on_failure()
237 def test_active(self):
238 vmstat_value = vmstat('active memory') * 1024
239 psutil_value = psutil.virtual_memory().active
240 self.assertAlmostEqual(
241 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
242
243 # https://travis-ci.org/giampaolo/psutil/jobs/227242952
244 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
245 @retry_on_failure()
246 def test_inactive(self):
247 vmstat_value = vmstat('inactive memory') * 1024
248 psutil_value = psutil.virtual_memory().inactive
249 self.assertAlmostEqual(
250 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
251
252 @retry_on_failure()
253 def test_shared(self):
254 free = free_physmem()
255 free_value = free.shared
256 if free_value == 0:
257 raise unittest.SkipTest("free does not support 'shared' column")
258 psutil_value = psutil.virtual_memory().shared
259 self.assertAlmostEqual(
260 free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
261 msg='%s %s \n%s' % (free_value, psutil_value, free.output))
262
263 @retry_on_failure()
264 def test_available(self):
265 # "free" output format has changed at some point:
266 # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098
267 out = sh("free -b")
268 lines = out.split('\n')
269 if 'available' not in lines[0]:
270 raise unittest.SkipTest("free does not support 'available' column")
271 else:
272 free_value = int(lines[1].split()[-1])
273 psutil_value = psutil.virtual_memory().available
274 self.assertAlmostEqual(
275 free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
276 msg='%s %s \n%s' % (free_value, psutil_value, out))
277
278 def test_warnings_on_misses(self):
279 # Emulate a case where /proc/meminfo provides few info.
280 # psutil is supposed to set the missing fields to 0 and
281 # raise a warning.
282 with mock_open_content(
283 '/proc/meminfo',
284 textwrap.dedent("""\
285 Active(anon): 6145416 kB
286 Active(file): 2950064 kB
287 Inactive(anon): 574764 kB
288 Inactive(file): 1567648 kB
289 MemAvailable: -1 kB
290 MemFree: 2057400 kB
291 MemTotal: 16325648 kB
292 SReclaimable: 346648 kB
293 """).encode()) as m:
294 with warnings.catch_warnings(record=True) as ws:
295 warnings.simplefilter("always")
296 ret = psutil.virtual_memory()
297 assert m.called
298 self.assertEqual(len(ws), 1)
299 w = ws[0]
300 assert w.filename.endswith('psutil/_pslinux.py')
301 self.assertIn(
302 "memory stats couldn't be determined", str(w.message))
303 self.assertIn("cached", str(w.message))
304 self.assertIn("shared", str(w.message))
305 self.assertIn("active", str(w.message))
306 self.assertIn("inactive", str(w.message))
307 self.assertIn("buffers", str(w.message))
308 self.assertIn("available", str(w.message))
309 self.assertEqual(ret.cached, 0)
310 self.assertEqual(ret.active, 0)
311 self.assertEqual(ret.inactive, 0)
312 self.assertEqual(ret.shared, 0)
313 self.assertEqual(ret.buffers, 0)
314 self.assertEqual(ret.available, 0)
315 self.assertEqual(ret.slab, 0)
316
317 @retry_on_failure()
318 def test_avail_old_percent(self):
319 # Make sure that our calculation of avail mem for old kernels
320 # is off by max 15%.
321 from psutil._pslinux import calculate_avail_vmem
322 from psutil._pslinux import open_binary
323
324 mems = {}
325 with open_binary('/proc/meminfo') as f:
326 for line in f:
327 fields = line.split()
328 mems[fields[0]] = int(fields[1]) * 1024
329
330 a = calculate_avail_vmem(mems)
331 if b'MemAvailable:' in mems:
332 b = mems[b'MemAvailable:']
333 diff_percent = abs(a - b) / a * 100
334 self.assertLess(diff_percent, 15)
335
336 def test_avail_old_comes_from_kernel(self):
337 # Make sure "MemAvailable:" coluimn is used instead of relying
338 # on our internal algorithm to calculate avail mem.
339 with mock_open_content(
340 '/proc/meminfo',
341 textwrap.dedent("""\
342 Active: 9444728 kB
343 Active(anon): 6145416 kB
344 Active(file): 2950064 kB
345 Buffers: 287952 kB
346 Cached: 4818144 kB
347 Inactive(file): 1578132 kB
348 Inactive(anon): 574764 kB
349 Inactive(file): 1567648 kB
350 MemAvailable: 6574984 kB
351 MemFree: 2057400 kB
352 MemTotal: 16325648 kB
353 Shmem: 577588 kB
354 SReclaimable: 346648 kB
355 """).encode()) as m:
356 with warnings.catch_warnings(record=True) as ws:
357 ret = psutil.virtual_memory()
358 assert m.called
359 self.assertEqual(ret.available, 6574984 * 1024)
360 w = ws[0]
361 self.assertIn(
362 "inactive memory stats couldn't be determined", str(w.message))
363
364 def test_avail_old_missing_fields(self):
365 # Remove Active(file), Inactive(file) and SReclaimable
366 # from /proc/meminfo and make sure the fallback is used
367 # (free + cached),
368 with mock_open_content(
369 "/proc/meminfo",
370 textwrap.dedent("""\
371 Active: 9444728 kB
372 Active(anon): 6145416 kB
373 Buffers: 287952 kB
374 Cached: 4818144 kB
375 Inactive(file): 1578132 kB
376 Inactive(anon): 574764 kB
377 MemFree: 2057400 kB
378 MemTotal: 16325648 kB
379 Shmem: 577588 kB
380 """).encode()) as m:
381 with warnings.catch_warnings(record=True) as ws:
382 ret = psutil.virtual_memory()
383 assert m.called
384 self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024)
385 w = ws[0]
386 self.assertIn(
387 "inactive memory stats couldn't be determined", str(w.message))
388
389 def test_avail_old_missing_zoneinfo(self):
390 # Remove /proc/zoneinfo file. Make sure fallback is used
391 # (free + cached).
392 with mock_open_content(
393 "/proc/meminfo",
394 textwrap.dedent("""\
395 Active: 9444728 kB
396 Active(anon): 6145416 kB
397 Active(file): 2950064 kB
398 Buffers: 287952 kB
399 Cached: 4818144 kB
400 Inactive(file): 1578132 kB
401 Inactive(anon): 574764 kB
402 Inactive(file): 1567648 kB
403 MemFree: 2057400 kB
404 MemTotal: 16325648 kB
405 Shmem: 577588 kB
406 SReclaimable: 346648 kB
407 """).encode()):
408 with mock_open_exception(
409 "/proc/zoneinfo",
410 IOError(errno.ENOENT, 'no such file or directory')):
411 with warnings.catch_warnings(record=True) as ws:
412 ret = psutil.virtual_memory()
413 self.assertEqual(
414 ret.available, 2057400 * 1024 + 4818144 * 1024)
415 w = ws[0]
416 self.assertIn(
417 "inactive memory stats couldn't be determined",
418 str(w.message))
419
420 def test_virtual_memory_mocked(self):
421 # Emulate /proc/meminfo because neither vmstat nor free return slab.
422 def open_mock(name, *args, **kwargs):
423 if name == '/proc/meminfo':
424 return io.BytesIO(textwrap.dedent("""\
425 MemTotal: 100 kB
426 MemFree: 2 kB
427 MemAvailable: 3 kB
428 Buffers: 4 kB
429 Cached: 5 kB
430 SwapCached: 6 kB
431 Active: 7 kB
432 Inactive: 8 kB
433 Active(anon): 9 kB
434 Inactive(anon): 10 kB
435 Active(file): 11 kB
436 Inactive(file): 12 kB
437 Unevictable: 13 kB
438 Mlocked: 14 kB
439 SwapTotal: 15 kB
440 SwapFree: 16 kB
441 Dirty: 17 kB
442 Writeback: 18 kB
443 AnonPages: 19 kB
444 Mapped: 20 kB
445 Shmem: 21 kB
446 Slab: 22 kB
447 SReclaimable: 23 kB
448 SUnreclaim: 24 kB
449 KernelStack: 25 kB
450 PageTables: 26 kB
451 NFS_Unstable: 27 kB
452 Bounce: 28 kB
453 WritebackTmp: 29 kB
454 CommitLimit: 30 kB
455 Committed_AS: 31 kB
456 VmallocTotal: 32 kB
457 VmallocUsed: 33 kB
458 VmallocChunk: 34 kB
459 HardwareCorrupted: 35 kB
460 AnonHugePages: 36 kB
461 ShmemHugePages: 37 kB
462 ShmemPmdMapped: 38 kB
463 CmaTotal: 39 kB
464 CmaFree: 40 kB
465 HugePages_Total: 41 kB
466 HugePages_Free: 42 kB
467 HugePages_Rsvd: 43 kB
468 HugePages_Surp: 44 kB
469 Hugepagesize: 45 kB
470 DirectMap46k: 46 kB
471 DirectMap47M: 47 kB
472 DirectMap48G: 48 kB
473 """).encode())
474 else:
475 return orig_open(name, *args, **kwargs)
476
477 orig_open = open
478 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
479 with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
480 mem = psutil.virtual_memory()
481 assert m.called
482 self.assertEqual(mem.total, 100 * 1024)
483 self.assertEqual(mem.free, 2 * 1024)
484 self.assertEqual(mem.buffers, 4 * 1024)
485 # cached mem also includes reclaimable memory
486 self.assertEqual(mem.cached, (5 + 23) * 1024)
487 self.assertEqual(mem.shared, 21 * 1024)
488 self.assertEqual(mem.active, 7 * 1024)
489 self.assertEqual(mem.inactive, 8 * 1024)
490 self.assertEqual(mem.slab, 22 * 1024)
491 self.assertEqual(mem.available, 3 * 1024)
492
493
494 # =====================================================================
495 # --- system swap memory
496 # =====================================================================
497
498
499 @unittest.skipIf(not LINUX, "LINUX only")
500 class TestSystemSwapMemory(PsutilTestCase):
501
502 @staticmethod
503 def meminfo_has_swap_info():
504 """Return True if /proc/meminfo provides swap metrics."""
505 with open("/proc/meminfo") as f:
506 data = f.read()
507 return 'SwapTotal:' in data and 'SwapFree:' in data
508
509 def test_total(self):
510 free_value = free_swap().total
511 psutil_value = psutil.swap_memory().total
512 return self.assertAlmostEqual(
513 free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
514
515 @retry_on_failure()
516 def test_used(self):
517 free_value = free_swap().used
518 psutil_value = psutil.swap_memory().used
519 return self.assertAlmostEqual(
520 free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
521
522 @retry_on_failure()
523 def test_free(self):
524 free_value = free_swap().free
525 psutil_value = psutil.swap_memory().free
526 return self.assertAlmostEqual(
527 free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
528
529 def test_missing_sin_sout(self):
530 with mock.patch('psutil._common.open', create=True) as m:
531 with warnings.catch_warnings(record=True) as ws:
532 warnings.simplefilter("always")
533 ret = psutil.swap_memory()
534 assert m.called
535 self.assertEqual(len(ws), 1)
536 w = ws[0]
537 assert w.filename.endswith('psutil/_pslinux.py')
538 self.assertIn(
539 "'sin' and 'sout' swap memory stats couldn't "
540 "be determined", str(w.message))
541 self.assertEqual(ret.sin, 0)
542 self.assertEqual(ret.sout, 0)
543
544 def test_no_vmstat_mocked(self):
545 # see https://github.com/giampaolo/psutil/issues/722
546 with mock_open_exception(
547 "/proc/vmstat",
548 IOError(errno.ENOENT, 'no such file or directory')) as m:
549 with warnings.catch_warnings(record=True) as ws:
550 warnings.simplefilter("always")
551 ret = psutil.swap_memory()
552 assert m.called
553 self.assertEqual(len(ws), 1)
554 w = ws[0]
555 assert w.filename.endswith('psutil/_pslinux.py')
556 self.assertIn(
557 "'sin' and 'sout' swap memory stats couldn't "
558 "be determined and were set to 0",
559 str(w.message))
560 self.assertEqual(ret.sin, 0)
561 self.assertEqual(ret.sout, 0)
562
563 def test_meminfo_against_sysinfo(self):
564 # Make sure the content of /proc/meminfo about swap memory
565 # matches sysinfo() syscall, see:
566 # https://github.com/giampaolo/psutil/issues/1015
567 if not self.meminfo_has_swap_info():
568 return unittest.skip("/proc/meminfo has no swap metrics")
569 with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m:
570 swap = psutil.swap_memory()
571 assert not m.called
572 import psutil._psutil_linux as cext
573 _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo()
574 total *= unit_multiplier
575 free *= unit_multiplier
576 self.assertEqual(swap.total, total)
577 self.assertAlmostEqual(swap.free, free, delta=TOLERANCE_SYS_MEM)
578
579 def test_emulate_meminfo_has_no_metrics(self):
580 # Emulate a case where /proc/meminfo provides no swap metrics
581 # in which case sysinfo() syscall is supposed to be used
582 # as a fallback.
583 with mock_open_content("/proc/meminfo", b"") as m:
584 psutil.swap_memory()
585 assert m.called
586
587
588 # =====================================================================
589 # --- system CPU
590 # =====================================================================
591
592
593 @unittest.skipIf(not LINUX, "LINUX only")
594 class TestSystemCPUTimes(PsutilTestCase):
595
596 @unittest.skipIf(TRAVIS, "unknown failure on travis")
597 def test_fields(self):
598 fields = psutil.cpu_times()._fields
599 kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0]
600 kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
601 if kernel_ver_info >= (2, 6, 11):
602 self.assertIn('steal', fields)
603 else:
604 self.assertNotIn('steal', fields)
605 if kernel_ver_info >= (2, 6, 24):
606 self.assertIn('guest', fields)
607 else:
608 self.assertNotIn('guest', fields)
609 if kernel_ver_info >= (3, 2, 0):
610 self.assertIn('guest_nice', fields)
611 else:
612 self.assertNotIn('guest_nice', fields)
613
614
615 @unittest.skipIf(not LINUX, "LINUX only")
616 class TestSystemCPUCountLogical(PsutilTestCase):
617
618 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"),
619 "/sys/devices/system/cpu/online does not exist")
620 def test_against_sysdev_cpu_online(self):
621 with open("/sys/devices/system/cpu/online") as f:
622 value = f.read().strip()
623 if "-" in str(value):
624 value = int(value.split('-')[1]) + 1
625 self.assertEqual(psutil.cpu_count(), value)
626
627 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"),
628 "/sys/devices/system/cpu does not exist")
629 def test_against_sysdev_cpu_num(self):
630 ls = os.listdir("/sys/devices/system/cpu")
631 count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None])
632 self.assertEqual(psutil.cpu_count(), count)
633
634 @unittest.skipIf(not which("nproc"), "nproc utility not available")
635 def test_against_nproc(self):
636 num = int(sh("nproc --all"))
637 self.assertEqual(psutil.cpu_count(logical=True), num)
638
639 @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
640 def test_against_lscpu(self):
641 out = sh("lscpu -p")
642 num = len([x for x in out.split('\n') if not x.startswith('#')])
643 self.assertEqual(psutil.cpu_count(logical=True), num)
644
645 def test_emulate_fallbacks(self):
646 import psutil._pslinux
647 original = psutil._pslinux.cpu_count_logical()
648 # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
649 # order to cause the parsing of /proc/cpuinfo and /proc/stat.
650 with mock.patch(
651 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
652 self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
653 assert m.called
654
655 # Let's have open() return emtpy data and make sure None is
656 # returned ('cause we mimick os.cpu_count()).
657 with mock.patch('psutil._common.open', create=True) as m:
658 self.assertIsNone(psutil._pslinux.cpu_count_logical())
659 self.assertEqual(m.call_count, 2)
660 # /proc/stat should be the last one
661 self.assertEqual(m.call_args[0][0], '/proc/stat')
662
663 # Let's push this a bit further and make sure /proc/cpuinfo
664 # parsing works as expected.
665 with open('/proc/cpuinfo', 'rb') as f:
666 cpuinfo_data = f.read()
667 fake_file = io.BytesIO(cpuinfo_data)
668 with mock.patch('psutil._common.open',
669 return_value=fake_file, create=True) as m:
670 self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
671
672 # Finally, let's make /proc/cpuinfo return meaningless data;
673 # this way we'll fall back on relying on /proc/stat
674 with mock_open_content('/proc/cpuinfo', b"") as m:
675 self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
676 m.called
677
678
679 @unittest.skipIf(not LINUX, "LINUX only")
680 class TestSystemCPUCountPhysical(PsutilTestCase):
681
682 @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
683 def test_against_lscpu(self):
684 out = sh("lscpu -p")
685 core_ids = set()
686 for line in out.split('\n'):
687 if not line.startswith('#'):
688 fields = line.split(',')
689 core_ids.add(fields[1])
690 self.assertEqual(psutil.cpu_count(logical=False), len(core_ids))
691
692 def test_emulate_none(self):
693 with mock.patch('glob.glob', return_value=[]) as m1:
694 with mock.patch('psutil._common.open', create=True) as m2:
695 self.assertIsNone(psutil._pslinux.cpu_count_physical())
696 assert m1.called
697 assert m2.called
698
699
700 @unittest.skipIf(not LINUX, "LINUX only")
701 class TestSystemCPUFrequency(PsutilTestCase):
702
703 @unittest.skipIf(TRAVIS, "fails on Travis")
704 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
705 def test_emulate_use_second_file(self):
706 # https://github.com/giampaolo/psutil/issues/981
707 def path_exists_mock(path):
708 if path.startswith("/sys/devices/system/cpu/cpufreq/policy"):
709 return False
710 else:
711 return orig_exists(path)
712
713 orig_exists = os.path.exists
714 with mock.patch("os.path.exists", side_effect=path_exists_mock,
715 create=True):
716 assert psutil.cpu_freq()
717
718 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
719 def test_emulate_use_cpuinfo(self):
720 # Emulate a case where /sys/devices/system/cpu/cpufreq* does not
721 # exist and /proc/cpuinfo is used instead.
722 def path_exists_mock(path):
723 if path.startswith('/sys/devices/system/cpu/'):
724 return False
725 else:
726 if path == "/proc/cpuinfo":
727 flags.append(None)
728 return os_path_exists(path)
729
730 flags = []
731 os_path_exists = os.path.exists
732 try:
733 with mock.patch("os.path.exists", side_effect=path_exists_mock):
734 reload_module(psutil._pslinux)
735 ret = psutil.cpu_freq()
736 assert ret
737 assert flags
738 self.assertEqual(ret.max, 0.0)
739 self.assertEqual(ret.min, 0.0)
740 for freq in psutil.cpu_freq(percpu=True):
741 self.assertEqual(ret.max, 0.0)
742 self.assertEqual(ret.min, 0.0)
743 finally:
744 reload_module(psutil._pslinux)
745 reload_module(psutil)
746
747 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
748 def test_emulate_data(self):
749 def open_mock(name, *args, **kwargs):
750 if (name.endswith('/scaling_cur_freq') and
751 name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
752 return io.BytesIO(b"500000")
753 elif (name.endswith('/scaling_min_freq') and
754 name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
755 return io.BytesIO(b"600000")
756 elif (name.endswith('/scaling_max_freq') and
757 name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
758 return io.BytesIO(b"700000")
759 elif name == '/proc/cpuinfo':
760 return io.BytesIO(b"cpu MHz : 500")
761 else:
762 return orig_open(name, *args, **kwargs)
763
764 orig_open = open
765 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
766 with mock.patch(patch_point, side_effect=open_mock):
767 with mock.patch(
768 'os.path.exists', return_value=True):
769 freq = psutil.cpu_freq()
770 self.assertEqual(freq.current, 500.0)
771 # when /proc/cpuinfo is used min and max frequencies are not
772 # available and are set to 0.
773 if freq.min != 0.0:
774 self.assertEqual(freq.min, 600.0)
775 if freq.max != 0.0:
776 self.assertEqual(freq.max, 700.0)
777
778 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
779 def test_emulate_multi_cpu(self):
780 def open_mock(name, *args, **kwargs):
781 n = name
782 if (n.endswith('/scaling_cur_freq') and
783 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
784 return io.BytesIO(b"100000")
785 elif (n.endswith('/scaling_min_freq') and
786 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
787 return io.BytesIO(b"200000")
788 elif (n.endswith('/scaling_max_freq') and
789 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
790 return io.BytesIO(b"300000")
791 elif (n.endswith('/scaling_cur_freq') and
792 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
793 return io.BytesIO(b"400000")
794 elif (n.endswith('/scaling_min_freq') and
795 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
796 return io.BytesIO(b"500000")
797 elif (n.endswith('/scaling_max_freq') and
798 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
799 return io.BytesIO(b"600000")
800 elif name == '/proc/cpuinfo':
801 return io.BytesIO(b"cpu MHz : 100\n"
802 b"cpu MHz : 400")
803 else:
804 return orig_open(name, *args, **kwargs)
805
806 orig_open = open
807 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
808 with mock.patch(patch_point, side_effect=open_mock):
809 with mock.patch('os.path.exists', return_value=True):
810 with mock.patch('psutil._pslinux.cpu_count_logical',
811 return_value=2):
812 freq = psutil.cpu_freq(percpu=True)
813 self.assertEqual(freq[0].current, 100.0)
814 if freq[0].min != 0.0:
815 self.assertEqual(freq[0].min, 200.0)
816 if freq[0].max != 0.0:
817 self.assertEqual(freq[0].max, 300.0)
818 self.assertEqual(freq[1].current, 400.0)
819 if freq[1].min != 0.0:
820 self.assertEqual(freq[1].min, 500.0)
821 if freq[1].max != 0.0:
822 self.assertEqual(freq[1].max, 600.0)
823
824 @unittest.skipIf(TRAVIS, "fails on Travis")
825 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
826 def test_emulate_no_scaling_cur_freq_file(self):
827 # See: https://github.com/giampaolo/psutil/issues/1071
828 def open_mock(name, *args, **kwargs):
829 if name.endswith('/scaling_cur_freq'):
830 raise IOError(errno.ENOENT, "")
831 elif name.endswith('/cpuinfo_cur_freq'):
832 return io.BytesIO(b"200000")
833 elif name == '/proc/cpuinfo':
834 return io.BytesIO(b"cpu MHz : 200")
835 else:
836 return orig_open(name, *args, **kwargs)
837
838 orig_open = open
839 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
840 with mock.patch(patch_point, side_effect=open_mock):
841 with mock.patch('os.path.exists', return_value=True):
842 with mock.patch('psutil._pslinux.cpu_count_logical',
843 return_value=1):
844 freq = psutil.cpu_freq()
845 self.assertEqual(freq.current, 200)
846
847
848 @unittest.skipIf(not LINUX, "LINUX only")
849 class TestSystemCPUStats(PsutilTestCase):
850
851 @unittest.skipIf(TRAVIS, "fails on Travis")
852 def test_ctx_switches(self):
853 vmstat_value = vmstat("context switches")
854 psutil_value = psutil.cpu_stats().ctx_switches
855 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
856
857 @unittest.skipIf(TRAVIS, "fails on Travis")
858 def test_interrupts(self):
859 vmstat_value = vmstat("interrupts")
860 psutil_value = psutil.cpu_stats().interrupts
861 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
862
863
864 @unittest.skipIf(not LINUX, "LINUX only")
865 class TestLoadAvg(PsutilTestCase):
866
867 @unittest.skipIf(not HAS_GETLOADAVG, "not supported")
868 def test_getloadavg(self):
869 psutil_value = psutil.getloadavg()
870 with open("/proc/loadavg", "r") as f:
871 proc_value = f.read().split()
872
873 self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1)
874 self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1)
875 self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1)
876
877
878 # =====================================================================
879 # --- system network
880 # =====================================================================
881
882
883 @unittest.skipIf(not LINUX, "LINUX only")
884 class TestSystemNetIfAddrs(PsutilTestCase):
885
886 def test_ips(self):
887 for name, addrs in psutil.net_if_addrs().items():
888 for addr in addrs:
889 if addr.family == psutil.AF_LINK:
890 self.assertEqual(addr.address, get_mac_address(name))
891 elif addr.family == socket.AF_INET:
892 self.assertEqual(addr.address, get_ipv4_address(name))
893 # TODO: test for AF_INET6 family
894
895 # XXX - not reliable when having virtual NICs installed by Docker.
896 # @unittest.skipIf(not which('ip'), "'ip' utility not available")
897 # @unittest.skipIf(TRAVIS, "skipped on Travis")
898 # def test_net_if_names(self):
899 # out = sh("ip addr").strip()
900 # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x]
901 # found = 0
902 # for line in out.split('\n'):
903 # line = line.strip()
904 # if re.search(r"^\d+:", line):
905 # found += 1
906 # name = line.split(':')[1].strip()
907 # self.assertIn(name, nics)
908 # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
909 # pprint.pformat(nics), out))
910
911
912 @unittest.skipIf(not LINUX, "LINUX only")
913 class TestSystemNetIfStats(PsutilTestCase):
914
915 def test_against_ifconfig(self):
916 for name, stats in psutil.net_if_stats().items():
917 try:
918 out = sh("ifconfig %s" % name)
919 except RuntimeError:
920 pass
921 else:
922 # Not always reliable.
923 # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
924 self.assertEqual(stats.mtu,
925 int(re.findall(r'(?i)MTU[: ](\d+)', out)[0]))
926
927
928 @unittest.skipIf(not LINUX, "LINUX only")
929 class TestSystemNetIOCounters(PsutilTestCase):
930
931 @retry_on_failure()
932 def test_against_ifconfig(self):
933 def ifconfig(nic):
934 ret = {}
935 out = sh("ifconfig %s" % name)
936 ret['packets_recv'] = int(
937 re.findall(r'RX packets[: ](\d+)', out)[0])
938 ret['packets_sent'] = int(
939 re.findall(r'TX packets[: ](\d+)', out)[0])
940 ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0])
941 ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1])
942 ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0])
943 ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1])
944 ret['bytes_recv'] = int(
945 re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
946 ret['bytes_sent'] = int(
947 re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
948 return ret
949
950 nio = psutil.net_io_counters(pernic=True, nowrap=False)
951 for name, stats in nio.items():
952 try:
953 ifconfig_ret = ifconfig(name)
954 except RuntimeError:
955 continue
956 self.assertAlmostEqual(
957 stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5)
958 self.assertAlmostEqual(
959 stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5)
960 self.assertAlmostEqual(
961 stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024)
962 self.assertAlmostEqual(
963 stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024)
964 self.assertAlmostEqual(
965 stats.errin, ifconfig_ret['errin'], delta=10)
966 self.assertAlmostEqual(
967 stats.errout, ifconfig_ret['errout'], delta=10)
968 self.assertAlmostEqual(
969 stats.dropin, ifconfig_ret['dropin'], delta=10)
970 self.assertAlmostEqual(
971 stats.dropout, ifconfig_ret['dropout'], delta=10)
972
973
974 @unittest.skipIf(not LINUX, "LINUX only")
975 class TestSystemNetConnections(PsutilTestCase):
976
977 @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError)
978 @mock.patch('psutil._pslinux.supports_ipv6', return_value=False)
979 def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop):
980 # see: https://github.com/giampaolo/psutil/issues/623
981 try:
982 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
983 self.addCleanup(s.close)
984 s.bind(("::1", 0))
985 except socket.error:
986 pass
987 psutil.net_connections(kind='inet6')
988
989 def test_emulate_unix(self):
990 with mock_open_content(
991 '/proc/net/unix',
992 textwrap.dedent("""\
993 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
994 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
995 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
996 000000000000000000000000000000000000000000000000000000
997 """)) as m:
998 psutil.net_connections(kind='unix')
999 assert m.called
1000
1001
1002 # =====================================================================
1003 # --- system disks
1004 # =====================================================================
1005
1006
1007 @unittest.skipIf(not LINUX, "LINUX only")
1008 class TestSystemDiskPartitions(PsutilTestCase):
1009
1010 @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available")
1011 @skip_on_not_implemented()
1012 def test_against_df(self):
1013 # test psutil.disk_usage() and psutil.disk_partitions()
1014 # against "df -a"
1015 def df(path):
1016 out = sh('df -P -B 1 "%s"' % path).strip()
1017 lines = out.split('\n')
1018 lines.pop(0)
1019 line = lines.pop(0)
1020 dev, total, used, free = line.split()[:4]
1021 if dev == 'none':
1022 dev = ''
1023 total, used, free = int(total), int(used), int(free)
1024 return dev, total, used, free
1025
1026 for part in psutil.disk_partitions(all=False):
1027 usage = psutil.disk_usage(part.mountpoint)
1028 dev, total, used, free = df(part.mountpoint)
1029 self.assertEqual(usage.total, total)
1030 self.assertAlmostEqual(usage.free, free,
1031 delta=TOLERANCE_DISK_USAGE)
1032 self.assertAlmostEqual(usage.used, used,
1033 delta=TOLERANCE_DISK_USAGE)
1034
1035 def test_zfs_fs(self):
1036 # Test that ZFS partitions are returned.
1037 with open("/proc/filesystems", "r") as f:
1038 data = f.read()
1039 if 'zfs' in data:
1040 for part in psutil.disk_partitions():
1041 if part.fstype == 'zfs':
1042 break
1043 else:
1044 self.fail("couldn't find any ZFS partition")
1045 else:
1046 # No ZFS partitions on this system. Let's fake one.
1047 fake_file = io.StringIO(u("nodev\tzfs\n"))
1048 with mock.patch('psutil._common.open',
1049 return_value=fake_file, create=True) as m1:
1050 with mock.patch(
1051 'psutil._pslinux.cext.disk_partitions',
1052 return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
1053 ret = psutil.disk_partitions()
1054 assert m1.called
1055 assert m2.called
1056 assert ret
1057 self.assertEqual(ret[0].fstype, 'zfs')
1058
1059 def test_emulate_realpath_fail(self):
1060 # See: https://github.com/giampaolo/psutil/issues/1307
1061 try:
1062 with mock.patch('os.path.realpath',
1063 return_value='/non/existent') as m:
1064 with self.assertRaises(FileNotFoundError):
1065 psutil.disk_partitions()
1066 assert m.called
1067 finally:
1068 psutil.PROCFS_PATH = "/proc"
1069
1070
1071 @unittest.skipIf(not LINUX, "LINUX only")
1072 class TestSystemDiskIoCounters(PsutilTestCase):
1073
1074 def test_emulate_kernel_2_4(self):
1075 # Tests /proc/diskstats parsing format for 2.4 kernels, see:
1076 # https://github.com/giampaolo/psutil/issues/767
1077 with mock_open_content(
1078 '/proc/diskstats',
1079 " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"):
1080 with mock.patch('psutil._pslinux.is_storage_device',
1081 return_value=True):
1082 ret = psutil.disk_io_counters(nowrap=False)
1083 self.assertEqual(ret.read_count, 1)
1084 self.assertEqual(ret.read_merged_count, 2)
1085 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
1086 self.assertEqual(ret.read_time, 4)
1087 self.assertEqual(ret.write_count, 5)
1088 self.assertEqual(ret.write_merged_count, 6)
1089 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
1090 self.assertEqual(ret.write_time, 8)
1091 self.assertEqual(ret.busy_time, 10)
1092
1093 def test_emulate_kernel_2_6_full(self):
1094 # Tests /proc/diskstats parsing format for 2.6 kernels,
1095 # lines reporting all metrics:
1096 # https://github.com/giampaolo/psutil/issues/767
1097 with mock_open_content(
1098 '/proc/diskstats',
1099 " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"):
1100 with mock.patch('psutil._pslinux.is_storage_device',
1101 return_value=True):
1102 ret = psutil.disk_io_counters(nowrap=False)
1103 self.assertEqual(ret.read_count, 1)
1104 self.assertEqual(ret.read_merged_count, 2)
1105 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
1106 self.assertEqual(ret.read_time, 4)
1107 self.assertEqual(ret.write_count, 5)
1108 self.assertEqual(ret.write_merged_count, 6)
1109 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
1110 self.assertEqual(ret.write_time, 8)
1111 self.assertEqual(ret.busy_time, 10)
1112
1113 def test_emulate_kernel_2_6_limited(self):
1114 # Tests /proc/diskstats parsing format for 2.6 kernels,
1115 # where one line of /proc/partitions return a limited
1116 # amount of metrics when it bumps into a partition
1117 # (instead of a disk). See:
1118 # https://github.com/giampaolo/psutil/issues/767
1119 with mock_open_content(
1120 '/proc/diskstats',
1121 " 3 1 hda 1 2 3 4"):
1122 with mock.patch('psutil._pslinux.is_storage_device',
1123 return_value=True):
1124 ret = psutil.disk_io_counters(nowrap=False)
1125 self.assertEqual(ret.read_count, 1)
1126 self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
1127 self.assertEqual(ret.write_count, 3)
1128 self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
1129
1130 self.assertEqual(ret.read_merged_count, 0)
1131 self.assertEqual(ret.read_time, 0)
1132 self.assertEqual(ret.write_merged_count, 0)
1133 self.assertEqual(ret.write_time, 0)
1134 self.assertEqual(ret.busy_time, 0)
1135
1136 def test_emulate_include_partitions(self):
1137 # Make sure that when perdisk=True disk partitions are returned,
1138 # see:
1139 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
1140 with mock_open_content(
1141 '/proc/diskstats',
1142 textwrap.dedent("""\
1143 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1144 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1145 """)):
1146 with mock.patch('psutil._pslinux.is_storage_device',
1147 return_value=False):
1148 ret = psutil.disk_io_counters(perdisk=True, nowrap=False)
1149 self.assertEqual(len(ret), 2)
1150 self.assertEqual(ret['nvme0n1'].read_count, 1)
1151 self.assertEqual(ret['nvme0n1p1'].read_count, 1)
1152 self.assertEqual(ret['nvme0n1'].write_count, 5)
1153 self.assertEqual(ret['nvme0n1p1'].write_count, 5)
1154
1155 def test_emulate_exclude_partitions(self):
1156 # Make sure that when perdisk=False partitions (e.g. 'sda1',
1157 # 'nvme0n1p1') are skipped and not included in the total count.
1158 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
1159 with mock_open_content(
1160 '/proc/diskstats',
1161 textwrap.dedent("""\
1162 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1163 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1164 """)):
1165 with mock.patch('psutil._pslinux.is_storage_device',
1166 return_value=False):
1167 ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
1168 self.assertIsNone(ret)
1169
1170 #
1171 def is_storage_device(name):
1172 return name == 'nvme0n1'
1173
1174 with mock_open_content(
1175 '/proc/diskstats',
1176 textwrap.dedent("""\
1177 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1178 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1179 """)):
1180 with mock.patch('psutil._pslinux.is_storage_device',
1181 create=True, side_effect=is_storage_device):
1182 ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
1183 self.assertEqual(ret.read_count, 1)
1184 self.assertEqual(ret.write_count, 5)
1185
1186 def test_emulate_use_sysfs(self):
1187 def exists(path):
1188 if path == '/proc/diskstats':
1189 return False
1190 return True
1191
1192 wprocfs = psutil.disk_io_counters(perdisk=True)
1193 with mock.patch('psutil._pslinux.os.path.exists',
1194 create=True, side_effect=exists):
1195 wsysfs = psutil.disk_io_counters(perdisk=True)
1196 self.assertEqual(len(wprocfs), len(wsysfs))
1197
1198 def test_emulate_not_impl(self):
1199 def exists(path):
1200 return False
1201
1202 with mock.patch('psutil._pslinux.os.path.exists',
1203 create=True, side_effect=exists):
1204 self.assertRaises(NotImplementedError, psutil.disk_io_counters)
1205
1206
1207 # =====================================================================
1208 # --- misc
1209 # =====================================================================
1210
1211
1212 @unittest.skipIf(not LINUX, "LINUX only")
1213 class TestMisc(PsutilTestCase):
1214
1215 def test_boot_time(self):
1216 vmstat_value = vmstat('boot time')
1217 psutil_value = psutil.boot_time()
1218 self.assertEqual(int(vmstat_value), int(psutil_value))
1219
1220 def test_no_procfs_on_import(self):
1221 my_procfs = self.get_testfn()
1222 os.mkdir(my_procfs)
1223
1224 with open(os.path.join(my_procfs, 'stat'), 'w') as f:
1225 f.write('cpu 0 0 0 0 0 0 0 0 0 0\n')
1226 f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n')
1227 f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n')
1228
1229 try:
1230 orig_open = open
1231
1232 def open_mock(name, *args, **kwargs):
1233 if name.startswith('/proc'):
1234 raise IOError(errno.ENOENT, 'rejecting access for test')
1235 return orig_open(name, *args, **kwargs)
1236
1237 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1238 with mock.patch(patch_point, side_effect=open_mock):
1239 reload_module(psutil)
1240
1241 self.assertRaises(IOError, psutil.cpu_times)
1242 self.assertRaises(IOError, psutil.cpu_times, percpu=True)
1243 self.assertRaises(IOError, psutil.cpu_percent)
1244 self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
1245 self.assertRaises(IOError, psutil.cpu_times_percent)
1246 self.assertRaises(
1247 IOError, psutil.cpu_times_percent, percpu=True)
1248
1249 psutil.PROCFS_PATH = my_procfs
1250
1251 self.assertEqual(psutil.cpu_percent(), 0)
1252 self.assertEqual(sum(psutil.cpu_times_percent()), 0)
1253
1254 # since we don't know the number of CPUs at import time,
1255 # we awkwardly say there are none until the second call
1256 per_cpu_percent = psutil.cpu_percent(percpu=True)
1257 self.assertEqual(sum(per_cpu_percent), 0)
1258
1259 # ditto awkward length
1260 per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
1261 self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
1262
1263 # much user, very busy
1264 with open(os.path.join(my_procfs, 'stat'), 'w') as f:
1265 f.write('cpu 1 0 0 0 0 0 0 0 0 0\n')
1266 f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n')
1267 f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n')
1268
1269 self.assertNotEqual(psutil.cpu_percent(), 0)
1270 self.assertNotEqual(
1271 sum(psutil.cpu_percent(percpu=True)), 0)
1272 self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
1273 self.assertNotEqual(
1274 sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
1275 finally:
1276 shutil.rmtree(my_procfs)
1277 reload_module(psutil)
1278
1279 self.assertEqual(psutil.PROCFS_PATH, '/proc')
1280
1281 def test_cpu_steal_decrease(self):
1282 # Test cumulative cpu stats decrease. We should ignore this.
1283 # See issue #1210.
1284 with mock_open_content(
1285 "/proc/stat",
1286 textwrap.dedent("""\
1287 cpu 0 0 0 0 0 0 0 1 0 0
1288 cpu0 0 0 0 0 0 0 0 1 0 0
1289 cpu1 0 0 0 0 0 0 0 1 0 0
1290 """).encode()) as m:
1291 # first call to "percent" functions should read the new stat file
1292 # and compare to the "real" file read at import time - so the
1293 # values are meaningless
1294 psutil.cpu_percent()
1295 assert m.called
1296 psutil.cpu_percent(percpu=True)
1297 psutil.cpu_times_percent()
1298 psutil.cpu_times_percent(percpu=True)
1299
1300 with mock_open_content(
1301 "/proc/stat",
1302 textwrap.dedent("""\
1303 cpu 1 0 0 0 0 0 0 0 0 0
1304 cpu0 1 0 0 0 0 0 0 0 0 0
1305 cpu1 1 0 0 0 0 0 0 0 0 0
1306 """).encode()) as m:
1307 # Increase "user" while steal goes "backwards" to zero.
1308 cpu_percent = psutil.cpu_percent()
1309 assert m.called
1310 cpu_percent_percpu = psutil.cpu_percent(percpu=True)
1311 cpu_times_percent = psutil.cpu_times_percent()
1312 cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True)
1313 self.assertNotEqual(cpu_percent, 0)
1314 self.assertNotEqual(sum(cpu_percent_percpu), 0)
1315 self.assertNotEqual(sum(cpu_times_percent), 0)
1316 self.assertNotEqual(sum(cpu_times_percent), 100.0)
1317 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0)
1318 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0)
1319 self.assertEqual(cpu_times_percent.steal, 0)
1320 self.assertNotEqual(cpu_times_percent.user, 0)
1321
1322 def test_boot_time_mocked(self):
1323 with mock.patch('psutil._common.open', create=True) as m:
1324 self.assertRaises(
1325 RuntimeError,
1326 psutil._pslinux.boot_time)
1327 assert m.called
1328
1329 def test_users_mocked(self):
1330 # Make sure ':0' and ':0.0' (returned by C ext) are converted
1331 # to 'localhost'.
1332 with mock.patch('psutil._pslinux.cext.users',
1333 return_value=[('giampaolo', 'pts/2', ':0',
1334 1436573184.0, True, 2)]) as m:
1335 self.assertEqual(psutil.users()[0].host, 'localhost')
1336 assert m.called
1337 with mock.patch('psutil._pslinux.cext.users',
1338 return_value=[('giampaolo', 'pts/2', ':0.0',
1339 1436573184.0, True, 2)]) as m:
1340 self.assertEqual(psutil.users()[0].host, 'localhost')
1341 assert m.called
1342 # ...otherwise it should be returned as-is
1343 with mock.patch('psutil._pslinux.cext.users',
1344 return_value=[('giampaolo', 'pts/2', 'foo',
1345 1436573184.0, True, 2)]) as m:
1346 self.assertEqual(psutil.users()[0].host, 'foo')
1347 assert m.called
1348
1349 def test_procfs_path(self):
1350 tdir = self.get_testfn()
1351 os.mkdir(tdir)
1352 try:
1353 psutil.PROCFS_PATH = tdir
1354 self.assertRaises(IOError, psutil.virtual_memory)
1355 self.assertRaises(IOError, psutil.cpu_times)
1356 self.assertRaises(IOError, psutil.cpu_times, percpu=True)
1357 self.assertRaises(IOError, psutil.boot_time)
1358 # self.assertRaises(IOError, psutil.pids)
1359 self.assertRaises(IOError, psutil.net_connections)
1360 self.assertRaises(IOError, psutil.net_io_counters)
1361 self.assertRaises(IOError, psutil.net_if_stats)
1362 # self.assertRaises(IOError, psutil.disk_io_counters)
1363 self.assertRaises(IOError, psutil.disk_partitions)
1364 self.assertRaises(psutil.NoSuchProcess, psutil.Process)
1365 finally:
1366 psutil.PROCFS_PATH = "/proc"
1367
1368 @retry_on_failure()
1369 def test_issue_687(self):
1370 # In case of thread ID:
1371 # - pid_exists() is supposed to return False
1372 # - Process(tid) is supposed to work
1373 # - pids() should not return the TID
1374 # See: https://github.com/giampaolo/psutil/issues/687
1375 t = ThreadTask()
1376 t.start()
1377 try:
1378 p = psutil.Process()
1379 threads = p.threads()
1380 self.assertEqual(len(threads), 2)
1381 tid = sorted(threads, key=lambda x: x.id)[1].id
1382 self.assertNotEqual(p.pid, tid)
1383 pt = psutil.Process(tid)
1384 pt.as_dict()
1385 self.assertNotIn(tid, psutil.pids())
1386 finally:
1387 t.stop()
1388
1389 def test_pid_exists_no_proc_status(self):
1390 # Internally pid_exists relies on /proc/{pid}/status.
1391 # Emulate a case where this file is empty in which case
1392 # psutil is supposed to fall back on using pids().
1393 with mock_open_content("/proc/%s/status", "") as m:
1394 assert psutil.pid_exists(os.getpid())
1395 assert m.called
1396
1397
1398 # =====================================================================
1399 # --- sensors
1400 # =====================================================================
1401
1402
1403 @unittest.skipIf(not LINUX, "LINUX only")
1404 @unittest.skipIf(not HAS_BATTERY, "no battery")
1405 class TestSensorsBattery(PsutilTestCase):
1406
1407 @unittest.skipIf(not which("acpi"), "acpi utility not available")
1408 def test_percent(self):
1409 out = sh("acpi -b")
1410 acpi_value = int(out.split(",")[1].strip().replace('%', ''))
1411 psutil_value = psutil.sensors_battery().percent
1412 self.assertAlmostEqual(acpi_value, psutil_value, delta=1)
1413
1414 @unittest.skipIf(not which("acpi"), "acpi utility not available")
1415 def test_power_plugged(self):
1416 out = sh("acpi -b")
1417 if 'unknown' in out.lower():
1418 return unittest.skip("acpi output not reliable")
1419 if 'discharging at zero rate' in out:
1420 plugged = True
1421 else:
1422 plugged = "Charging" in out.split('\n')[0]
1423 self.assertEqual(psutil.sensors_battery().power_plugged, plugged)
1424
1425 def test_emulate_power_plugged(self):
1426 # Pretend the AC power cable is connected.
1427 def open_mock(name, *args, **kwargs):
1428 if name.endswith("AC0/online") or name.endswith("AC/online"):
1429 return io.BytesIO(b"1")
1430 else:
1431 return orig_open(name, *args, **kwargs)
1432
1433 orig_open = open
1434 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1435 with mock.patch(patch_point, side_effect=open_mock) as m:
1436 self.assertEqual(psutil.sensors_battery().power_plugged, True)
1437 self.assertEqual(
1438 psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED)
1439 assert m.called
1440
1441 def test_emulate_power_plugged_2(self):
1442 # Same as above but pretend /AC0/online does not exist in which
1443 # case code relies on /status file.
1444 def open_mock(name, *args, **kwargs):
1445 if name.endswith("AC0/online") or name.endswith("AC/online"):
1446 raise IOError(errno.ENOENT, "")
1447 elif name.endswith("/status"):
1448 return io.StringIO(u("charging"))
1449 else:
1450 return orig_open(name, *args, **kwargs)
1451
1452 orig_open = open
1453 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1454 with mock.patch(patch_point, side_effect=open_mock) as m:
1455 self.assertEqual(psutil.sensors_battery().power_plugged, True)
1456 assert m.called
1457
1458 def test_emulate_power_not_plugged(self):
1459 # Pretend the AC power cable is not connected.
1460 def open_mock(name, *args, **kwargs):
1461 if name.endswith("AC0/online") or name.endswith("AC/online"):
1462 return io.BytesIO(b"0")
1463 else:
1464 return orig_open(name, *args, **kwargs)
1465
1466 orig_open = open
1467 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1468 with mock.patch(patch_point, side_effect=open_mock) as m:
1469 self.assertEqual(psutil.sensors_battery().power_plugged, False)
1470 assert m.called
1471
1472 def test_emulate_power_not_plugged_2(self):
1473 # Same as above but pretend /AC0/online does not exist in which
1474 # case code relies on /status file.
1475 def open_mock(name, *args, **kwargs):
1476 if name.endswith("AC0/online") or name.endswith("AC/online"):
1477 raise IOError(errno.ENOENT, "")
1478 elif name.endswith("/status"):
1479 return io.StringIO(u("discharging"))
1480 else:
1481 return orig_open(name, *args, **kwargs)
1482
1483 orig_open = open
1484 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1485 with mock.patch(patch_point, side_effect=open_mock) as m:
1486 self.assertEqual(psutil.sensors_battery().power_plugged, False)
1487 assert m.called
1488
1489 def test_emulate_power_undetermined(self):
1490 # Pretend we can't know whether the AC power cable not
1491 # connected (assert fallback to False).
1492 def open_mock(name, *args, **kwargs):
1493 if name.startswith("/sys/class/power_supply/AC0/online") or \
1494 name.startswith("/sys/class/power_supply/AC/online"):
1495 raise IOError(errno.ENOENT, "")
1496 elif name.startswith("/sys/class/power_supply/BAT0/status"):
1497 return io.BytesIO(b"???")
1498 else:
1499 return orig_open(name, *args, **kwargs)
1500
1501 orig_open = open
1502 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1503 with mock.patch(patch_point, side_effect=open_mock) as m:
1504 self.assertIsNone(psutil.sensors_battery().power_plugged)
1505 assert m.called
1506
1507 def test_emulate_no_base_files(self):
1508 # Emulate a case where base metrics files are not present,
1509 # in which case we're supposed to get None.
1510 with mock_open_exception(
1511 "/sys/class/power_supply/BAT0/energy_now",
1512 IOError(errno.ENOENT, "")):
1513 with mock_open_exception(
1514 "/sys/class/power_supply/BAT0/charge_now",
1515 IOError(errno.ENOENT, "")):
1516 self.assertIsNone(psutil.sensors_battery())
1517
1518 def test_emulate_energy_full_0(self):
1519 # Emulate a case where energy_full files returns 0.
1520 with mock_open_content(
1521 "/sys/class/power_supply/BAT0/energy_full", b"0") as m:
1522 self.assertEqual(psutil.sensors_battery().percent, 0)
1523 assert m.called
1524
1525 def test_emulate_energy_full_not_avail(self):
1526 # Emulate a case where energy_full file does not exist.
1527 # Expected fallback on /capacity.
1528 with mock_open_exception(
1529 "/sys/class/power_supply/BAT0/energy_full",
1530 IOError(errno.ENOENT, "")):
1531 with mock_open_exception(
1532 "/sys/class/power_supply/BAT0/charge_full",
1533 IOError(errno.ENOENT, "")):
1534 with mock_open_content(
1535 "/sys/class/power_supply/BAT0/capacity", b"88"):
1536 self.assertEqual(psutil.sensors_battery().percent, 88)
1537
1538 def test_emulate_no_power(self):
1539 # Emulate a case where /AC0/online file nor /BAT0/status exist.
1540 with mock_open_exception(
1541 "/sys/class/power_supply/AC/online",
1542 IOError(errno.ENOENT, "")):
1543 with mock_open_exception(
1544 "/sys/class/power_supply/AC0/online",
1545 IOError(errno.ENOENT, "")):
1546 with mock_open_exception(
1547 "/sys/class/power_supply/BAT0/status",
1548 IOError(errno.ENOENT, "")):
1549 self.assertIsNone(psutil.sensors_battery().power_plugged)
1550
1551
1552 @unittest.skipIf(not LINUX, "LINUX only")
1553 class TestSensorsTemperatures(PsutilTestCase):
1554
1555 def test_emulate_class_hwmon(self):
1556 def open_mock(name, *args, **kwargs):
1557 if name.endswith('/name'):
1558 return io.StringIO(u("name"))
1559 elif name.endswith('/temp1_label'):
1560 return io.StringIO(u("label"))
1561 elif name.endswith('/temp1_input'):
1562 return io.BytesIO(b"30000")
1563 elif name.endswith('/temp1_max'):
1564 return io.BytesIO(b"40000")
1565 elif name.endswith('/temp1_crit'):
1566 return io.BytesIO(b"50000")
1567 else:
1568 return orig_open(name, *args, **kwargs)
1569
1570 orig_open = open
1571 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1572 with mock.patch(patch_point, side_effect=open_mock):
1573 # Test case with /sys/class/hwmon
1574 with mock.patch('glob.glob',
1575 return_value=['/sys/class/hwmon/hwmon0/temp1']):
1576 temp = psutil.sensors_temperatures()['name'][0]
1577 self.assertEqual(temp.label, 'label')
1578 self.assertEqual(temp.current, 30.0)
1579 self.assertEqual(temp.high, 40.0)
1580 self.assertEqual(temp.critical, 50.0)
1581
1582 def test_emulate_class_thermal(self):
1583 def open_mock(name, *args, **kwargs):
1584 if name.endswith('0_temp'):
1585 return io.BytesIO(b"50000")
1586 elif name.endswith('temp'):
1587 return io.BytesIO(b"30000")
1588 elif name.endswith('0_type'):
1589 return io.StringIO(u("critical"))
1590 elif name.endswith('type'):
1591 return io.StringIO(u("name"))
1592 else:
1593 return orig_open(name, *args, **kwargs)
1594
1595 def glob_mock(path):
1596 if path == '/sys/class/hwmon/hwmon*/temp*_*':
1597 return []
1598 elif path == '/sys/class/hwmon/hwmon*/device/temp*_*':
1599 return []
1600 elif path == '/sys/class/thermal/thermal_zone*':
1601 return ['/sys/class/thermal/thermal_zone0']
1602 elif path == '/sys/class/thermal/thermal_zone0/trip_point*':
1603 return ['/sys/class/thermal/thermal_zone1/trip_point_0_type',
1604 '/sys/class/thermal/thermal_zone1/trip_point_0_temp']
1605 return []
1606
1607 orig_open = open
1608 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1609 with mock.patch(patch_point, side_effect=open_mock):
1610 with mock.patch('glob.glob', create=True, side_effect=glob_mock):
1611 temp = psutil.sensors_temperatures()['name'][0]
1612 self.assertEqual(temp.label, '')
1613 self.assertEqual(temp.current, 30.0)
1614 self.assertEqual(temp.high, 50.0)
1615 self.assertEqual(temp.critical, 50.0)
1616
1617
1618 @unittest.skipIf(not LINUX, "LINUX only")
1619 class TestSensorsFans(PsutilTestCase):
1620
1621 def test_emulate_data(self):
1622 def open_mock(name, *args, **kwargs):
1623 if name.endswith('/name'):
1624 return io.StringIO(u("name"))
1625 elif name.endswith('/fan1_label'):
1626 return io.StringIO(u("label"))
1627 elif name.endswith('/fan1_input'):
1628 return io.StringIO(u("2000"))
1629 else:
1630 return orig_open(name, *args, **kwargs)
1631
1632 orig_open = open
1633 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1634 with mock.patch(patch_point, side_effect=open_mock):
1635 with mock.patch('glob.glob',
1636 return_value=['/sys/class/hwmon/hwmon2/fan1']):
1637 fan = psutil.sensors_fans()['name'][0]
1638 self.assertEqual(fan.label, 'label')
1639 self.assertEqual(fan.current, 2000)
1640
1641
1642 # =====================================================================
1643 # --- test process
1644 # =====================================================================
1645
1646
1647 @unittest.skipIf(not LINUX, "LINUX only")
1648 class TestProcess(PsutilTestCase):
1649
1650 @retry_on_failure()
1651 def test_memory_full_info(self):
1652 testfn = self.get_testfn()
1653 src = textwrap.dedent("""
1654 import time
1655 with open("%s", "w") as f:
1656 time.sleep(10)
1657 """ % testfn)
1658 sproc = self.pyrun(src)
1659 call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn)
1660 p = psutil.Process(sproc.pid)
1661 time.sleep(.1)
1662 mem = p.memory_full_info()
1663 maps = p.memory_maps(grouped=False)
1664 self.assertAlmostEqual(
1665 mem.uss, sum([x.private_dirty + x.private_clean for x in maps]),
1666 delta=4096)
1667 self.assertAlmostEqual(
1668 mem.pss, sum([x.pss for x in maps]), delta=4096)
1669 self.assertAlmostEqual(
1670 mem.swap, sum([x.swap for x in maps]), delta=4096)
1671
1672 def test_memory_full_info_mocked(self):
1673 # See: https://github.com/giampaolo/psutil/issues/1222
1674 with mock_open_content(
1675 "/proc/%s/smaps" % os.getpid(),
1676 textwrap.dedent("""\
1677 fffff0 r-xp 00000000 00:00 0 [vsyscall]
1678 Size: 1 kB
1679 Rss: 2 kB
1680 Pss: 3 kB
1681 Shared_Clean: 4 kB
1682 Shared_Dirty: 5 kB
1683 Private_Clean: 6 kB
1684 Private_Dirty: 7 kB
1685 Referenced: 8 kB
1686 Anonymous: 9 kB
1687 LazyFree: 10 kB
1688 AnonHugePages: 11 kB
1689 ShmemPmdMapped: 12 kB
1690 Shared_Hugetlb: 13 kB
1691 Private_Hugetlb: 14 kB
1692 Swap: 15 kB
1693 SwapPss: 16 kB
1694 KernelPageSize: 17 kB
1695 MMUPageSize: 18 kB
1696 Locked: 19 kB
1697 VmFlags: rd ex
1698 """).encode()) as m:
1699 p = psutil.Process()
1700 mem = p.memory_full_info()
1701 assert m.called
1702 self.assertEqual(mem.uss, (6 + 7 + 14) * 1024)
1703 self.assertEqual(mem.pss, 3 * 1024)
1704 self.assertEqual(mem.swap, 15 * 1024)
1705
1706 # On PYPY file descriptors are not closed fast enough.
1707 @unittest.skipIf(PYPY, "unreliable on PYPY")
1708 def test_open_files_mode(self):
1709 def get_test_file(fname):
1710 p = psutil.Process()
1711 giveup_at = time.time() + GLOBAL_TIMEOUT
1712 while True:
1713 for file in p.open_files():
1714 if file.path == os.path.abspath(fname):
1715 return file
1716 elif time.time() > giveup_at:
1717 break
1718 raise RuntimeError("timeout looking for test file")
1719
1720 #
1721 testfn = self.get_testfn()
1722 with open(testfn, "w"):
1723 self.assertEqual(get_test_file(testfn).mode, "w")
1724 with open(testfn, "r"):
1725 self.assertEqual(get_test_file(testfn).mode, "r")
1726 with open(testfn, "a"):
1727 self.assertEqual(get_test_file(testfn).mode, "a")
1728 #
1729 with open(testfn, "r+"):
1730 self.assertEqual(get_test_file(testfn).mode, "r+")
1731 with open(testfn, "w+"):
1732 self.assertEqual(get_test_file(testfn).mode, "r+")
1733 with open(testfn, "a+"):
1734 self.assertEqual(get_test_file(testfn).mode, "a+")
1735 # note: "x" bit is not supported
1736 if PY3:
1737 safe_rmpath(testfn)
1738 with open(testfn, "x"):
1739 self.assertEqual(get_test_file(testfn).mode, "w")
1740 safe_rmpath(testfn)
1741 with open(testfn, "x+"):
1742 self.assertEqual(get_test_file(testfn).mode, "r+")
1743
1744 def test_open_files_file_gone(self):
1745 # simulates a file which gets deleted during open_files()
1746 # execution
1747 p = psutil.Process()
1748 files = p.open_files()
1749 with open(self.get_testfn(), 'w'):
1750 # give the kernel some time to see the new file
1751 call_until(p.open_files, "len(ret) != %i" % len(files))
1752 with mock.patch('psutil._pslinux.os.readlink',
1753 side_effect=OSError(errno.ENOENT, "")) as m:
1754 files = p.open_files()
1755 assert not files
1756 assert m.called
1757 # also simulate the case where os.readlink() returns EINVAL
1758 # in which case psutil is supposed to 'continue'
1759 with mock.patch('psutil._pslinux.os.readlink',
1760 side_effect=OSError(errno.EINVAL, "")) as m:
1761 self.assertEqual(p.open_files(), [])
1762 assert m.called
1763
1764 def test_open_files_fd_gone(self):
1765 # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears
1766 # while iterating through fds.
1767 # https://travis-ci.org/giampaolo/psutil/jobs/225694530
1768 p = psutil.Process()
1769 files = p.open_files()
1770 with open(self.get_testfn(), 'w'):
1771 # give the kernel some time to see the new file
1772 call_until(p.open_files, "len(ret) != %i" % len(files))
1773 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1774 with mock.patch(patch_point,
1775 side_effect=IOError(errno.ENOENT, "")) as m:
1776 files = p.open_files()
1777 assert not files
1778 assert m.called
1779
1780 # --- mocked tests
1781
1782 def test_terminal_mocked(self):
1783 with mock.patch('psutil._pslinux._psposix.get_terminal_map',
1784 return_value={}) as m:
1785 self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
1786 assert m.called
1787
1788 # TODO: re-enable this test.
1789 # def test_num_ctx_switches_mocked(self):
1790 # with mock.patch('psutil._common.open', create=True) as m:
1791 # self.assertRaises(
1792 # NotImplementedError,
1793 # psutil._pslinux.Process(os.getpid()).num_ctx_switches)
1794 # assert m.called
1795
1796 def test_cmdline_mocked(self):
1797 # see: https://github.com/giampaolo/psutil/issues/639
1798 p = psutil.Process()
1799 fake_file = io.StringIO(u('foo\x00bar\x00'))
1800 with mock.patch('psutil._common.open',
1801 return_value=fake_file, create=True) as m:
1802 self.assertEqual(p.cmdline(), ['foo', 'bar'])
1803 assert m.called
1804 fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
1805 with mock.patch('psutil._common.open',
1806 return_value=fake_file, create=True) as m:
1807 self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
1808 assert m.called
1809
1810 def test_cmdline_spaces_mocked(self):
1811 # see: https://github.com/giampaolo/psutil/issues/1179
1812 p = psutil.Process()
1813 fake_file = io.StringIO(u('foo bar '))
1814 with mock.patch('psutil._common.open',
1815 return_value=fake_file, create=True) as m:
1816 self.assertEqual(p.cmdline(), ['foo', 'bar'])
1817 assert m.called
1818 fake_file = io.StringIO(u('foo bar '))
1819 with mock.patch('psutil._common.open',
1820 return_value=fake_file, create=True) as m:
1821 self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
1822 assert m.called
1823
1824 def test_cmdline_mixed_separators(self):
1825 # https://github.com/giampaolo/psutil/issues/
1826 # 1179#issuecomment-552984549
1827 p = psutil.Process()
1828 fake_file = io.StringIO(u('foo\x20bar\x00'))
1829 with mock.patch('psutil._common.open',
1830 return_value=fake_file, create=True) as m:
1831 self.assertEqual(p.cmdline(), ['foo', 'bar'])
1832 assert m.called
1833
1834 def test_readlink_path_deleted_mocked(self):
1835 with mock.patch('psutil._pslinux.os.readlink',
1836 return_value='/home/foo (deleted)'):
1837 self.assertEqual(psutil.Process().exe(), "/home/foo")
1838 self.assertEqual(psutil.Process().cwd(), "/home/foo")
1839
1840 def test_threads_mocked(self):
1841 # Test the case where os.listdir() returns a file (thread)
1842 # which no longer exists by the time we open() it (race
1843 # condition). threads() is supposed to ignore that instead
1844 # of raising NSP.
1845 def open_mock(name, *args, **kwargs):
1846 if name.startswith('/proc/%s/task' % os.getpid()):
1847 raise IOError(errno.ENOENT, "")
1848 else:
1849 return orig_open(name, *args, **kwargs)
1850
1851 orig_open = open
1852 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1853 with mock.patch(patch_point, side_effect=open_mock) as m:
1854 ret = psutil.Process().threads()
1855 assert m.called
1856 self.assertEqual(ret, [])
1857
1858 # ...but if it bumps into something != ENOENT we want an
1859 # exception.
1860 def open_mock(name, *args, **kwargs):
1861 if name.startswith('/proc/%s/task' % os.getpid()):
1862 raise IOError(errno.EPERM, "")
1863 else:
1864 return orig_open(name, *args, **kwargs)
1865
1866 with mock.patch(patch_point, side_effect=open_mock):
1867 self.assertRaises(psutil.AccessDenied, psutil.Process().threads)
1868
1869 def test_exe_mocked(self):
1870 with mock.patch('psutil._pslinux.readlink',
1871 side_effect=OSError(errno.ENOENT, "")) as m1:
1872 with mock.patch('psutil.Process.cmdline',
1873 side_effect=psutil.AccessDenied(0, "")) as m2:
1874 # No such file error; might be raised also if /proc/pid/exe
1875 # path actually exists for system processes with low pids
1876 # (about 0-20). In this case psutil is supposed to return
1877 # an empty string.
1878 ret = psutil.Process().exe()
1879 assert m1.called
1880 assert m2.called
1881 self.assertEqual(ret, "")
1882
1883 # ...but if /proc/pid no longer exist we're supposed to treat
1884 # it as an alias for zombie process
1885 with mock.patch('psutil._pslinux.os.path.lexists',
1886 return_value=False):
1887 self.assertRaises(
1888 psutil.ZombieProcess, psutil.Process().exe)
1889
1890 def test_issue_1014(self):
1891 # Emulates a case where smaps file does not exist. In this case
1892 # wrap_exception decorator should not raise NoSuchProcess.
1893 with mock_open_exception(
1894 '/proc/%s/smaps' % os.getpid(),
1895 IOError(errno.ENOENT, "")) as m:
1896 p = psutil.Process()
1897 with self.assertRaises(FileNotFoundError):
1898 p.memory_maps()
1899 assert m.called
1900
1901 @unittest.skipIf(not HAS_RLIMIT, "not supported")
1902 def test_rlimit_zombie(self):
1903 # Emulate a case where rlimit() raises ENOSYS, which may
1904 # happen in case of zombie process:
1905 # https://travis-ci.org/giampaolo/psutil/jobs/51368273
1906 with mock.patch("psutil._pslinux.cext.linux_prlimit",
1907 side_effect=OSError(errno.ENOSYS, "")) as m:
1908 p = psutil.Process()
1909 p.name()
1910 with self.assertRaises(psutil.ZombieProcess) as exc:
1911 p.rlimit(psutil.RLIMIT_NOFILE)
1912 assert m.called
1913 self.assertEqual(exc.exception.pid, p.pid)
1914 self.assertEqual(exc.exception.name, p.name())
1915
1916 def test_cwd_zombie(self):
1917 with mock.patch("psutil._pslinux.os.readlink",
1918 side_effect=OSError(errno.ENOENT, "")) as m:
1919 p = psutil.Process()
1920 p.name()
1921 with self.assertRaises(psutil.ZombieProcess) as exc:
1922 p.cwd()
1923 assert m.called
1924 self.assertEqual(exc.exception.pid, p.pid)
1925 self.assertEqual(exc.exception.name, p.name())
1926
1927 def test_stat_file_parsing(self):
1928 from psutil._pslinux import CLOCK_TICKS
1929
1930 args = [
1931 "0", # pid
1932 "(cat)", # name
1933 "Z", # status
1934 "1", # ppid
1935 "0", # pgrp
1936 "0", # session
1937 "0", # tty
1938 "0", # tpgid
1939 "0", # flags
1940 "0", # minflt
1941 "0", # cminflt
1942 "0", # majflt
1943 "0", # cmajflt
1944 "2", # utime
1945 "3", # stime
1946 "4", # cutime
1947 "5", # cstime
1948 "0", # priority
1949 "0", # nice
1950 "0", # num_threads
1951 "0", # itrealvalue
1952 "6", # starttime
1953 "0", # vsize
1954 "0", # rss
1955 "0", # rsslim
1956 "0", # startcode
1957 "0", # endcode
1958 "0", # startstack
1959 "0", # kstkesp
1960 "0", # kstkeip
1961 "0", # signal
1962 "0", # blocked
1963 "0", # sigignore
1964 "0", # sigcatch
1965 "0", # wchan
1966 "0", # nswap
1967 "0", # cnswap
1968 "0", # exit_signal
1969 "6", # processor
1970 "0", # rt priority
1971 "0", # policy
1972 "7", # delayacct_blkio_ticks
1973 ]
1974 content = " ".join(args).encode()
1975 with mock_open_content('/proc/%s/stat' % os.getpid(), content):
1976 p = psutil.Process()
1977 self.assertEqual(p.name(), 'cat')
1978 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
1979 self.assertEqual(p.ppid(), 1)
1980 self.assertEqual(
1981 p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time())
1982 cpu = p.cpu_times()
1983 self.assertEqual(cpu.user, 2 / CLOCK_TICKS)
1984 self.assertEqual(cpu.system, 3 / CLOCK_TICKS)
1985 self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS)
1986 self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS)
1987 self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS)
1988 self.assertEqual(p.cpu_num(), 6)
1989
1990 def test_status_file_parsing(self):
1991 with mock_open_content(
1992 '/proc/%s/status' % os.getpid(),
1993 textwrap.dedent("""\
1994 Uid:\t1000\t1001\t1002\t1003
1995 Gid:\t1004\t1005\t1006\t1007
1996 Threads:\t66
1997 Cpus_allowed:\tf
1998 Cpus_allowed_list:\t0-7
1999 voluntary_ctxt_switches:\t12
2000 nonvoluntary_ctxt_switches:\t13""").encode()):
2001 p = psutil.Process()
2002 self.assertEqual(p.num_ctx_switches().voluntary, 12)
2003 self.assertEqual(p.num_ctx_switches().involuntary, 13)
2004 self.assertEqual(p.num_threads(), 66)
2005 uids = p.uids()
2006 self.assertEqual(uids.real, 1000)
2007 self.assertEqual(uids.effective, 1001)
2008 self.assertEqual(uids.saved, 1002)
2009 gids = p.gids()
2010 self.assertEqual(gids.real, 1004)
2011 self.assertEqual(gids.effective, 1005)
2012 self.assertEqual(gids.saved, 1006)
2013 self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8)))
2014
2015
2016 @unittest.skipIf(not LINUX, "LINUX only")
2017 class TestProcessAgainstStatus(PsutilTestCase):
2018 """/proc/pid/stat and /proc/pid/status have many values in common.
2019 Whenever possible, psutil uses /proc/pid/stat (it's faster).
2020 For all those cases we check that the value found in
2021 /proc/pid/stat (by psutil) matches the one found in
2022 /proc/pid/status.
2023 """
2024
2025 @classmethod
2026 def setUpClass(cls):
2027 cls.proc = psutil.Process()
2028
2029 def read_status_file(self, linestart):
2030 with psutil._psplatform.open_text(
2031 '/proc/%s/status' % self.proc.pid) as f:
2032 for line in f:
2033 line = line.strip()
2034 if line.startswith(linestart):
2035 value = line.partition('\t')[2]
2036 try:
2037 return int(value)
2038 except ValueError:
2039 return value
2040 raise ValueError("can't find %r" % linestart)
2041
2042 def test_name(self):
2043 value = self.read_status_file("Name:")
2044 self.assertEqual(self.proc.name(), value)
2045
2046 def test_status(self):
2047 value = self.read_status_file("State:")
2048 value = value[value.find('(') + 1:value.rfind(')')]
2049 value = value.replace(' ', '-')
2050 self.assertEqual(self.proc.status(), value)
2051
2052 def test_ppid(self):
2053 value = self.read_status_file("PPid:")
2054 self.assertEqual(self.proc.ppid(), value)
2055
2056 def test_num_threads(self):
2057 value = self.read_status_file("Threads:")
2058 self.assertEqual(self.proc.num_threads(), value)
2059
2060 def test_uids(self):
2061 value = self.read_status_file("Uid:")
2062 value = tuple(map(int, value.split()[1:4]))
2063 self.assertEqual(self.proc.uids(), value)
2064
2065 def test_gids(self):
2066 value = self.read_status_file("Gid:")
2067 value = tuple(map(int, value.split()[1:4]))
2068 self.assertEqual(self.proc.gids(), value)
2069
2070 @retry_on_failure()
2071 def test_num_ctx_switches(self):
2072 value = self.read_status_file("voluntary_ctxt_switches:")
2073 self.assertEqual(self.proc.num_ctx_switches().voluntary, value)
2074 value = self.read_status_file("nonvoluntary_ctxt_switches:")
2075 self.assertEqual(self.proc.num_ctx_switches().involuntary, value)
2076
2077 def test_cpu_affinity(self):
2078 value = self.read_status_file("Cpus_allowed_list:")
2079 if '-' in str(value):
2080 min_, max_ = map(int, value.split('-'))
2081 self.assertEqual(
2082 self.proc.cpu_affinity(), list(range(min_, max_ + 1)))
2083
2084 def test_cpu_affinity_eligible_cpus(self):
2085 value = self.read_status_file("Cpus_allowed_list:")
2086 with mock.patch("psutil._pslinux.per_cpu_times") as m:
2087 self.proc._proc._get_eligible_cpus()
2088 if '-' in str(value):
2089 assert not m.called
2090 else:
2091 assert m.called
2092
2093
2094 # =====================================================================
2095 # --- test utils
2096 # =====================================================================
2097
2098
2099 @unittest.skipIf(not LINUX, "LINUX only")
2100 class TestUtils(PsutilTestCase):
2101
2102 def test_readlink(self):
2103 with mock.patch("os.readlink", return_value="foo (deleted)") as m:
2104 self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
2105 assert m.called
2106
2107 def test_cat(self):
2108 testfn = self.get_testfn()
2109 with open(testfn, "wt") as f:
2110 f.write("foo ")
2111 self.assertEqual(psutil._psplatform.cat(testfn, binary=False), "foo")
2112 self.assertEqual(psutil._psplatform.cat(testfn, binary=True), b"foo")
2113 self.assertEqual(
2114 psutil._psplatform.cat(testfn + '??', fallback="bar"), "bar")
2115
2116
2117 if __name__ == '__main__':
2118 from psutil.tests.runner import run_from_name
2119 run_from_name(__file__)