Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_system.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
4 # Use of this source code is governed by a BSD-style license that can be | |
5 # found in the LICENSE file. | |
6 | |
7 """Tests for system APIS.""" | |
8 | |
9 import contextlib | |
10 import datetime | |
11 import errno | |
12 import os | |
13 import pprint | |
14 import shutil | |
15 import signal | |
16 import socket | |
17 import sys | |
18 import time | |
19 | |
20 import psutil | |
21 from psutil import AIX | |
22 from psutil import BSD | |
23 from psutil import FREEBSD | |
24 from psutil import LINUX | |
25 from psutil import MACOS | |
26 from psutil import NETBSD | |
27 from psutil import OPENBSD | |
28 from psutil import POSIX | |
29 from psutil import SUNOS | |
30 from psutil import WINDOWS | |
31 from psutil._compat import FileNotFoundError | |
32 from psutil._compat import long | |
33 from psutil.tests import ASCII_FS | |
34 from psutil.tests import check_net_address | |
35 from psutil.tests import CI_TESTING | |
36 from psutil.tests import DEVNULL | |
37 from psutil.tests import enum | |
38 from psutil.tests import GLOBAL_TIMEOUT | |
39 from psutil.tests import HAS_BATTERY | |
40 from psutil.tests import HAS_CPU_FREQ | |
41 from psutil.tests import HAS_GETLOADAVG | |
42 from psutil.tests import HAS_NET_IO_COUNTERS | |
43 from psutil.tests import HAS_SENSORS_BATTERY | |
44 from psutil.tests import HAS_SENSORS_FANS | |
45 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
46 from psutil.tests import IS_64BIT | |
47 from psutil.tests import mock | |
48 from psutil.tests import PsutilTestCase | |
49 from psutil.tests import PYPY | |
50 from psutil.tests import retry_on_failure | |
51 from psutil.tests import TRAVIS | |
52 from psutil.tests import GITHUB_WHEELS | |
53 from psutil.tests import UNICODE_SUFFIX | |
54 from psutil.tests import unittest | |
55 | |
56 | |
57 # =================================================================== | |
58 # --- System-related API tests | |
59 # =================================================================== | |
60 | |
61 | |
62 class TestProcessAPIs(PsutilTestCase): | |
63 | |
64 def test_process_iter(self): | |
65 self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) | |
66 sproc = self.spawn_testproc() | |
67 self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) | |
68 p = psutil.Process(sproc.pid) | |
69 p.kill() | |
70 p.wait() | |
71 self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) | |
72 | |
73 with mock.patch('psutil.Process', | |
74 side_effect=psutil.NoSuchProcess(os.getpid())): | |
75 self.assertEqual(list(psutil.process_iter()), []) | |
76 with mock.patch('psutil.Process', | |
77 side_effect=psutil.AccessDenied(os.getpid())): | |
78 with self.assertRaises(psutil.AccessDenied): | |
79 list(psutil.process_iter()) | |
80 | |
81 def test_prcess_iter_w_attrs(self): | |
82 for p in psutil.process_iter(attrs=['pid']): | |
83 self.assertEqual(list(p.info.keys()), ['pid']) | |
84 with self.assertRaises(ValueError): | |
85 list(psutil.process_iter(attrs=['foo'])) | |
86 with mock.patch("psutil._psplatform.Process.cpu_times", | |
87 side_effect=psutil.AccessDenied(0, "")) as m: | |
88 for p in psutil.process_iter(attrs=["pid", "cpu_times"]): | |
89 self.assertIsNone(p.info['cpu_times']) | |
90 self.assertGreaterEqual(p.info['pid'], 0) | |
91 assert m.called | |
92 with mock.patch("psutil._psplatform.Process.cpu_times", | |
93 side_effect=psutil.AccessDenied(0, "")) as m: | |
94 flag = object() | |
95 for p in psutil.process_iter( | |
96 attrs=["pid", "cpu_times"], ad_value=flag): | |
97 self.assertIs(p.info['cpu_times'], flag) | |
98 self.assertGreaterEqual(p.info['pid'], 0) | |
99 assert m.called | |
100 | |
101 @unittest.skipIf(PYPY and WINDOWS, | |
102 "spawn_testproc() unreliable on PYPY + WINDOWS") | |
103 def test_wait_procs(self): | |
104 def callback(p): | |
105 pids.append(p.pid) | |
106 | |
107 pids = [] | |
108 sproc1 = self.spawn_testproc() | |
109 sproc2 = self.spawn_testproc() | |
110 sproc3 = self.spawn_testproc() | |
111 procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] | |
112 self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) | |
113 self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) | |
114 t = time.time() | |
115 gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) | |
116 | |
117 self.assertLess(time.time() - t, 0.5) | |
118 self.assertEqual(gone, []) | |
119 self.assertEqual(len(alive), 3) | |
120 self.assertEqual(pids, []) | |
121 for p in alive: | |
122 self.assertFalse(hasattr(p, 'returncode')) | |
123 | |
124 @retry_on_failure(30) | |
125 def test(procs, callback): | |
126 gone, alive = psutil.wait_procs(procs, timeout=0.03, | |
127 callback=callback) | |
128 self.assertEqual(len(gone), 1) | |
129 self.assertEqual(len(alive), 2) | |
130 return gone, alive | |
131 | |
132 sproc3.terminate() | |
133 gone, alive = test(procs, callback) | |
134 self.assertIn(sproc3.pid, [x.pid for x in gone]) | |
135 if POSIX: | |
136 self.assertEqual(gone.pop().returncode, -signal.SIGTERM) | |
137 else: | |
138 self.assertEqual(gone.pop().returncode, 1) | |
139 self.assertEqual(pids, [sproc3.pid]) | |
140 for p in alive: | |
141 self.assertFalse(hasattr(p, 'returncode')) | |
142 | |
143 @retry_on_failure(30) | |
144 def test(procs, callback): | |
145 gone, alive = psutil.wait_procs(procs, timeout=0.03, | |
146 callback=callback) | |
147 self.assertEqual(len(gone), 3) | |
148 self.assertEqual(len(alive), 0) | |
149 return gone, alive | |
150 | |
151 sproc1.terminate() | |
152 sproc2.terminate() | |
153 gone, alive = test(procs, callback) | |
154 self.assertEqual(set(pids), set([sproc1.pid, sproc2.pid, sproc3.pid])) | |
155 for p in gone: | |
156 self.assertTrue(hasattr(p, 'returncode')) | |
157 | |
158 @unittest.skipIf(PYPY and WINDOWS, | |
159 "spawn_testproc() unreliable on PYPY + WINDOWS") | |
160 def test_wait_procs_no_timeout(self): | |
161 sproc1 = self.spawn_testproc() | |
162 sproc2 = self.spawn_testproc() | |
163 sproc3 = self.spawn_testproc() | |
164 procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] | |
165 for p in procs: | |
166 p.terminate() | |
167 gone, alive = psutil.wait_procs(procs) | |
168 | |
169 def test_pid_exists(self): | |
170 sproc = self.spawn_testproc() | |
171 self.assertTrue(psutil.pid_exists(sproc.pid)) | |
172 p = psutil.Process(sproc.pid) | |
173 p.kill() | |
174 p.wait() | |
175 self.assertFalse(psutil.pid_exists(sproc.pid)) | |
176 self.assertFalse(psutil.pid_exists(-1)) | |
177 self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) | |
178 | |
179 def test_pid_exists_2(self): | |
180 pids = psutil.pids() | |
181 for pid in pids: | |
182 try: | |
183 assert psutil.pid_exists(pid) | |
184 except AssertionError: | |
185 # in case the process disappeared in meantime fail only | |
186 # if it is no longer in psutil.pids() | |
187 time.sleep(.1) | |
188 if pid in psutil.pids(): | |
189 self.fail(pid) | |
190 pids = range(max(pids) + 5000, max(pids) + 6000) | |
191 for pid in pids: | |
192 self.assertFalse(psutil.pid_exists(pid), msg=pid) | |
193 | |
194 | |
195 class TestMiscAPIs(PsutilTestCase): | |
196 | |
197 def test_boot_time(self): | |
198 bt = psutil.boot_time() | |
199 self.assertIsInstance(bt, float) | |
200 self.assertGreater(bt, 0) | |
201 self.assertLess(bt, time.time()) | |
202 | |
203 @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI") | |
204 def test_users(self): | |
205 users = psutil.users() | |
206 self.assertNotEqual(users, []) | |
207 for user in users: | |
208 assert user.name, user | |
209 self.assertIsInstance(user.name, str) | |
210 self.assertIsInstance(user.terminal, (str, type(None))) | |
211 if user.host is not None: | |
212 self.assertIsInstance(user.host, (str, type(None))) | |
213 user.terminal | |
214 user.host | |
215 assert user.started > 0.0, user | |
216 datetime.datetime.fromtimestamp(user.started) | |
217 if WINDOWS or OPENBSD: | |
218 self.assertIsNone(user.pid) | |
219 else: | |
220 psutil.Process(user.pid) | |
221 | |
222 @unittest.skipIf(not POSIX, 'POSIX only') | |
223 def test_PAGESIZE(self): | |
224 # pagesize is used internally to perform different calculations | |
225 # and it's determined by using SC_PAGE_SIZE; make sure | |
226 # getpagesize() returns the same value. | |
227 import resource | |
228 self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) | |
229 | |
230 def test_test(self): | |
231 # test for psutil.test() function | |
232 stdout = sys.stdout | |
233 sys.stdout = DEVNULL | |
234 try: | |
235 psutil.test() | |
236 finally: | |
237 sys.stdout = stdout | |
238 | |
239 def test_os_constants(self): | |
240 names = ["POSIX", "WINDOWS", "LINUX", "MACOS", "FREEBSD", "OPENBSD", | |
241 "NETBSD", "BSD", "SUNOS"] | |
242 for name in names: | |
243 self.assertIsInstance(getattr(psutil, name), bool, msg=name) | |
244 | |
245 if os.name == 'posix': | |
246 assert psutil.POSIX | |
247 assert not psutil.WINDOWS | |
248 names.remove("POSIX") | |
249 if "linux" in sys.platform.lower(): | |
250 assert psutil.LINUX | |
251 names.remove("LINUX") | |
252 elif "bsd" in sys.platform.lower(): | |
253 assert psutil.BSD | |
254 self.assertEqual([psutil.FREEBSD, psutil.OPENBSD, | |
255 psutil.NETBSD].count(True), 1) | |
256 names.remove("BSD") | |
257 names.remove("FREEBSD") | |
258 names.remove("OPENBSD") | |
259 names.remove("NETBSD") | |
260 elif "sunos" in sys.platform.lower() or \ | |
261 "solaris" in sys.platform.lower(): | |
262 assert psutil.SUNOS | |
263 names.remove("SUNOS") | |
264 elif "darwin" in sys.platform.lower(): | |
265 assert psutil.MACOS | |
266 names.remove("MACOS") | |
267 else: | |
268 assert psutil.WINDOWS | |
269 assert not psutil.POSIX | |
270 names.remove("WINDOWS") | |
271 | |
272 # assert all other constants are set to False | |
273 for name in names: | |
274 self.assertIs(getattr(psutil, name), False, msg=name) | |
275 | |
276 | |
277 class TestMemoryAPIs(PsutilTestCase): | |
278 | |
279 def test_virtual_memory(self): | |
280 mem = psutil.virtual_memory() | |
281 assert mem.total > 0, mem | |
282 assert mem.available > 0, mem | |
283 assert 0 <= mem.percent <= 100, mem | |
284 assert mem.used > 0, mem | |
285 assert mem.free >= 0, mem | |
286 for name in mem._fields: | |
287 value = getattr(mem, name) | |
288 if name != 'percent': | |
289 self.assertIsInstance(value, (int, long)) | |
290 if name != 'total': | |
291 if not value >= 0: | |
292 self.fail("%r < 0 (%s)" % (name, value)) | |
293 if value > mem.total: | |
294 self.fail("%r > total (total=%s, %s=%s)" | |
295 % (name, mem.total, name, value)) | |
296 | |
297 def test_swap_memory(self): | |
298 mem = psutil.swap_memory() | |
299 self.assertEqual( | |
300 mem._fields, ('total', 'used', 'free', 'percent', 'sin', 'sout')) | |
301 | |
302 assert mem.total >= 0, mem | |
303 assert mem.used >= 0, mem | |
304 if mem.total > 0: | |
305 # likely a system with no swap partition | |
306 assert mem.free > 0, mem | |
307 else: | |
308 assert mem.free == 0, mem | |
309 assert 0 <= mem.percent <= 100, mem | |
310 assert mem.sin >= 0, mem | |
311 assert mem.sout >= 0, mem | |
312 | |
313 | |
314 class TestCpuAPIs(PsutilTestCase): | |
315 | |
316 def test_cpu_count_logical(self): | |
317 logical = psutil.cpu_count() | |
318 self.assertIsNotNone(logical) | |
319 self.assertEqual(logical, len(psutil.cpu_times(percpu=True))) | |
320 self.assertGreaterEqual(logical, 1) | |
321 # | |
322 if os.path.exists("/proc/cpuinfo"): | |
323 with open("/proc/cpuinfo") as fd: | |
324 cpuinfo_data = fd.read() | |
325 if "physical id" not in cpuinfo_data: | |
326 raise unittest.SkipTest("cpuinfo doesn't include physical id") | |
327 | |
328 def test_cpu_count_physical(self): | |
329 logical = psutil.cpu_count() | |
330 physical = psutil.cpu_count(logical=False) | |
331 if physical is None: | |
332 raise self.skipTest("physical cpu_count() is None") | |
333 if WINDOWS and sys.getwindowsversion()[:2] <= (6, 1): # <= Vista | |
334 self.assertIsNone(physical) | |
335 else: | |
336 self.assertGreaterEqual(physical, 1) | |
337 self.assertGreaterEqual(logical, physical) | |
338 | |
339 def test_cpu_count_none(self): | |
340 # https://github.com/giampaolo/psutil/issues/1085 | |
341 for val in (-1, 0, None): | |
342 with mock.patch('psutil._psplatform.cpu_count_logical', | |
343 return_value=val) as m: | |
344 self.assertIsNone(psutil.cpu_count()) | |
345 assert m.called | |
346 with mock.patch('psutil._psplatform.cpu_count_physical', | |
347 return_value=val) as m: | |
348 self.assertIsNone(psutil.cpu_count(logical=False)) | |
349 assert m.called | |
350 | |
351 def test_cpu_times(self): | |
352 # Check type, value >= 0, str(). | |
353 total = 0 | |
354 times = psutil.cpu_times() | |
355 sum(times) | |
356 for cp_time in times: | |
357 self.assertIsInstance(cp_time, float) | |
358 self.assertGreaterEqual(cp_time, 0.0) | |
359 total += cp_time | |
360 self.assertEqual(total, sum(times)) | |
361 str(times) | |
362 # CPU times are always supposed to increase over time | |
363 # or at least remain the same and that's because time | |
364 # cannot go backwards. | |
365 # Surprisingly sometimes this might not be the case (at | |
366 # least on Windows and Linux), see: | |
367 # https://github.com/giampaolo/psutil/issues/392 | |
368 # https://github.com/giampaolo/psutil/issues/645 | |
369 # if not WINDOWS: | |
370 # last = psutil.cpu_times() | |
371 # for x in range(100): | |
372 # new = psutil.cpu_times() | |
373 # for field in new._fields: | |
374 # new_t = getattr(new, field) | |
375 # last_t = getattr(last, field) | |
376 # self.assertGreaterEqual(new_t, last_t, | |
377 # msg="%s %s" % (new_t, last_t)) | |
378 # last = new | |
379 | |
380 def test_cpu_times_time_increases(self): | |
381 # Make sure time increases between calls. | |
382 t1 = sum(psutil.cpu_times()) | |
383 stop_at = time.time() + GLOBAL_TIMEOUT | |
384 while time.time() < stop_at: | |
385 t2 = sum(psutil.cpu_times()) | |
386 if t2 > t1: | |
387 return | |
388 self.fail("time remained the same") | |
389 | |
390 def test_per_cpu_times(self): | |
391 # Check type, value >= 0, str(). | |
392 for times in psutil.cpu_times(percpu=True): | |
393 total = 0 | |
394 sum(times) | |
395 for cp_time in times: | |
396 self.assertIsInstance(cp_time, float) | |
397 self.assertGreaterEqual(cp_time, 0.0) | |
398 total += cp_time | |
399 self.assertEqual(total, sum(times)) | |
400 str(times) | |
401 self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), | |
402 len(psutil.cpu_times(percpu=False))) | |
403 | |
404 # Note: in theory CPU times are always supposed to increase over | |
405 # time or remain the same but never go backwards. In practice | |
406 # sometimes this is not the case. | |
407 # This issue seemd to be afflict Windows: | |
408 # https://github.com/giampaolo/psutil/issues/392 | |
409 # ...but it turns out also Linux (rarely) behaves the same. | |
410 # last = psutil.cpu_times(percpu=True) | |
411 # for x in range(100): | |
412 # new = psutil.cpu_times(percpu=True) | |
413 # for index in range(len(new)): | |
414 # newcpu = new[index] | |
415 # lastcpu = last[index] | |
416 # for field in newcpu._fields: | |
417 # new_t = getattr(newcpu, field) | |
418 # last_t = getattr(lastcpu, field) | |
419 # self.assertGreaterEqual( | |
420 # new_t, last_t, msg="%s %s" % (lastcpu, newcpu)) | |
421 # last = new | |
422 | |
423 def test_per_cpu_times_2(self): | |
424 # Simulate some work load then make sure time have increased | |
425 # between calls. | |
426 tot1 = psutil.cpu_times(percpu=True) | |
427 giveup_at = time.time() + GLOBAL_TIMEOUT | |
428 while True: | |
429 if time.time() >= giveup_at: | |
430 return self.fail("timeout") | |
431 tot2 = psutil.cpu_times(percpu=True) | |
432 for t1, t2 in zip(tot1, tot2): | |
433 t1, t2 = psutil._cpu_busy_time(t1), psutil._cpu_busy_time(t2) | |
434 difference = t2 - t1 | |
435 if difference >= 0.05: | |
436 return | |
437 | |
438 def test_cpu_times_comparison(self): | |
439 # Make sure the sum of all per cpu times is almost equal to | |
440 # base "one cpu" times. | |
441 base = psutil.cpu_times() | |
442 per_cpu = psutil.cpu_times(percpu=True) | |
443 summed_values = base._make([sum(num) for num in zip(*per_cpu)]) | |
444 for field in base._fields: | |
445 self.assertAlmostEqual( | |
446 getattr(base, field), getattr(summed_values, field), delta=1) | |
447 | |
448 def _test_cpu_percent(self, percent, last_ret, new_ret): | |
449 try: | |
450 self.assertIsInstance(percent, float) | |
451 self.assertGreaterEqual(percent, 0.0) | |
452 self.assertIsNot(percent, -0.0) | |
453 self.assertLessEqual(percent, 100.0 * psutil.cpu_count()) | |
454 except AssertionError as err: | |
455 raise AssertionError("\n%s\nlast=%s\nnew=%s" % ( | |
456 err, pprint.pformat(last_ret), pprint.pformat(new_ret))) | |
457 | |
458 def test_cpu_percent(self): | |
459 last = psutil.cpu_percent(interval=0.001) | |
460 for x in range(100): | |
461 new = psutil.cpu_percent(interval=None) | |
462 self._test_cpu_percent(new, last, new) | |
463 last = new | |
464 with self.assertRaises(ValueError): | |
465 psutil.cpu_percent(interval=-1) | |
466 | |
467 def test_per_cpu_percent(self): | |
468 last = psutil.cpu_percent(interval=0.001, percpu=True) | |
469 self.assertEqual(len(last), psutil.cpu_count()) | |
470 for x in range(100): | |
471 new = psutil.cpu_percent(interval=None, percpu=True) | |
472 for percent in new: | |
473 self._test_cpu_percent(percent, last, new) | |
474 last = new | |
475 with self.assertRaises(ValueError): | |
476 psutil.cpu_percent(interval=-1, percpu=True) | |
477 | |
478 def test_cpu_times_percent(self): | |
479 last = psutil.cpu_times_percent(interval=0.001) | |
480 for x in range(100): | |
481 new = psutil.cpu_times_percent(interval=None) | |
482 for percent in new: | |
483 self._test_cpu_percent(percent, last, new) | |
484 self._test_cpu_percent(sum(new), last, new) | |
485 last = new | |
486 | |
487 def test_per_cpu_times_percent(self): | |
488 last = psutil.cpu_times_percent(interval=0.001, percpu=True) | |
489 self.assertEqual(len(last), psutil.cpu_count()) | |
490 for x in range(100): | |
491 new = psutil.cpu_times_percent(interval=None, percpu=True) | |
492 for cpu in new: | |
493 for percent in cpu: | |
494 self._test_cpu_percent(percent, last, new) | |
495 self._test_cpu_percent(sum(cpu), last, new) | |
496 last = new | |
497 | |
498 def test_per_cpu_times_percent_negative(self): | |
499 # see: https://github.com/giampaolo/psutil/issues/645 | |
500 psutil.cpu_times_percent(percpu=True) | |
501 zero_times = [x._make([0 for x in range(len(x._fields))]) | |
502 for x in psutil.cpu_times(percpu=True)] | |
503 with mock.patch('psutil.cpu_times', return_value=zero_times): | |
504 for cpu in psutil.cpu_times_percent(percpu=True): | |
505 for percent in cpu: | |
506 self._test_cpu_percent(percent, None, None) | |
507 | |
508 def test_cpu_stats(self): | |
509 # Tested more extensively in per-platform test modules. | |
510 infos = psutil.cpu_stats() | |
511 self.assertEqual( | |
512 infos._fields, | |
513 ('ctx_switches', 'interrupts', 'soft_interrupts', 'syscalls')) | |
514 for name in infos._fields: | |
515 value = getattr(infos, name) | |
516 self.assertGreaterEqual(value, 0) | |
517 # on AIX, ctx_switches is always 0 | |
518 if not AIX and name in ('ctx_switches', 'interrupts'): | |
519 self.assertGreater(value, 0) | |
520 | |
521 @unittest.skipIf(not HAS_CPU_FREQ, "not suported") | |
522 def test_cpu_freq(self): | |
523 def check_ls(ls): | |
524 for nt in ls: | |
525 self.assertEqual(nt._fields, ('current', 'min', 'max')) | |
526 if nt.max != 0.0: | |
527 self.assertLessEqual(nt.current, nt.max) | |
528 for name in nt._fields: | |
529 value = getattr(nt, name) | |
530 self.assertIsInstance(value, (int, long, float)) | |
531 self.assertGreaterEqual(value, 0) | |
532 | |
533 ls = psutil.cpu_freq(percpu=True) | |
534 if TRAVIS and not ls: | |
535 raise self.skipTest("skipped on Travis") | |
536 if FREEBSD and not ls: | |
537 raise self.skipTest("returns empty list on FreeBSD") | |
538 | |
539 assert ls, ls | |
540 check_ls([psutil.cpu_freq(percpu=False)]) | |
541 | |
542 if LINUX: | |
543 self.assertEqual(len(ls), psutil.cpu_count()) | |
544 | |
545 @unittest.skipIf(not HAS_GETLOADAVG, "not supported") | |
546 def test_getloadavg(self): | |
547 loadavg = psutil.getloadavg() | |
548 self.assertEqual(len(loadavg), 3) | |
549 for load in loadavg: | |
550 self.assertIsInstance(load, float) | |
551 self.assertGreaterEqual(load, 0.0) | |
552 | |
553 | |
554 class TestDiskAPIs(PsutilTestCase): | |
555 | |
556 @unittest.skipIf(PYPY and not IS_64BIT, "unreliable on PYPY32 + 32BIT") | |
557 def test_disk_usage(self): | |
558 usage = psutil.disk_usage(os.getcwd()) | |
559 self.assertEqual(usage._fields, ('total', 'used', 'free', 'percent')) | |
560 | |
561 assert usage.total > 0, usage | |
562 assert usage.used > 0, usage | |
563 assert usage.free > 0, usage | |
564 assert usage.total > usage.used, usage | |
565 assert usage.total > usage.free, usage | |
566 assert 0 <= usage.percent <= 100, usage.percent | |
567 if hasattr(shutil, 'disk_usage'): | |
568 # py >= 3.3, see: http://bugs.python.org/issue12442 | |
569 shutil_usage = shutil.disk_usage(os.getcwd()) | |
570 tolerance = 5 * 1024 * 1024 # 5MB | |
571 self.assertEqual(usage.total, shutil_usage.total) | |
572 self.assertAlmostEqual(usage.free, shutil_usage.free, | |
573 delta=tolerance) | |
574 self.assertAlmostEqual(usage.used, shutil_usage.used, | |
575 delta=tolerance) | |
576 | |
577 # if path does not exist OSError ENOENT is expected across | |
578 # all platforms | |
579 fname = self.get_testfn() | |
580 with self.assertRaises(FileNotFoundError): | |
581 psutil.disk_usage(fname) | |
582 | |
583 @unittest.skipIf(not ASCII_FS, "not an ASCII fs") | |
584 def test_disk_usage_unicode(self): | |
585 # See: https://github.com/giampaolo/psutil/issues/416 | |
586 with self.assertRaises(UnicodeEncodeError): | |
587 psutil.disk_usage(UNICODE_SUFFIX) | |
588 | |
589 def test_disk_usage_bytes(self): | |
590 psutil.disk_usage(b'.') | |
591 | |
592 def test_disk_partitions(self): | |
593 # all = False | |
594 ls = psutil.disk_partitions(all=False) | |
595 # on travis we get: | |
596 # self.assertEqual(p.cpu_affinity(), [n]) | |
597 # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0] | |
598 self.assertTrue(ls, msg=ls) | |
599 for disk in ls: | |
600 self.assertIsInstance(disk.device, str) | |
601 self.assertIsInstance(disk.mountpoint, str) | |
602 self.assertIsInstance(disk.fstype, str) | |
603 self.assertIsInstance(disk.opts, str) | |
604 if WINDOWS and 'cdrom' in disk.opts: | |
605 continue | |
606 if not POSIX: | |
607 assert os.path.exists(disk.device), disk | |
608 else: | |
609 # we cannot make any assumption about this, see: | |
610 # http://goo.gl/p9c43 | |
611 disk.device | |
612 # on modern systems mount points can also be files | |
613 assert os.path.exists(disk.mountpoint), disk | |
614 assert disk.fstype, disk | |
615 | |
616 # all = True | |
617 ls = psutil.disk_partitions(all=True) | |
618 self.assertTrue(ls, msg=ls) | |
619 for disk in psutil.disk_partitions(all=True): | |
620 if not WINDOWS and disk.mountpoint: | |
621 try: | |
622 os.stat(disk.mountpoint) | |
623 except OSError as err: | |
624 if (GITHUB_WHEELS or TRAVIS) and \ | |
625 MACOS and err.errno == errno.EIO: | |
626 continue | |
627 # http://mail.python.org/pipermail/python-dev/ | |
628 # 2012-June/120787.html | |
629 if err.errno not in (errno.EPERM, errno.EACCES): | |
630 raise | |
631 else: | |
632 assert os.path.exists(disk.mountpoint), disk | |
633 self.assertIsInstance(disk.fstype, str) | |
634 self.assertIsInstance(disk.opts, str) | |
635 | |
636 def find_mount_point(path): | |
637 path = os.path.abspath(path) | |
638 while not os.path.ismount(path): | |
639 path = os.path.dirname(path) | |
640 return path.lower() | |
641 | |
642 mount = find_mount_point(__file__) | |
643 mounts = [x.mountpoint.lower() for x in | |
644 psutil.disk_partitions(all=True) if x.mountpoint] | |
645 self.assertIn(mount, mounts) | |
646 psutil.disk_usage(mount) | |
647 | |
648 @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), | |
649 '/proc/diskstats not available on this linux version') | |
650 @unittest.skipIf(CI_TESTING and not psutil.disk_io_counters(), | |
651 "unreliable on CI") # no visible disks | |
652 def test_disk_io_counters(self): | |
653 def check_ntuple(nt): | |
654 self.assertEqual(nt[0], nt.read_count) | |
655 self.assertEqual(nt[1], nt.write_count) | |
656 self.assertEqual(nt[2], nt.read_bytes) | |
657 self.assertEqual(nt[3], nt.write_bytes) | |
658 if not (OPENBSD or NETBSD): | |
659 self.assertEqual(nt[4], nt.read_time) | |
660 self.assertEqual(nt[5], nt.write_time) | |
661 if LINUX: | |
662 self.assertEqual(nt[6], nt.read_merged_count) | |
663 self.assertEqual(nt[7], nt.write_merged_count) | |
664 self.assertEqual(nt[8], nt.busy_time) | |
665 elif FREEBSD: | |
666 self.assertEqual(nt[6], nt.busy_time) | |
667 for name in nt._fields: | |
668 assert getattr(nt, name) >= 0, nt | |
669 | |
670 ret = psutil.disk_io_counters(perdisk=False) | |
671 assert ret is not None, "no disks on this system?" | |
672 check_ntuple(ret) | |
673 ret = psutil.disk_io_counters(perdisk=True) | |
674 # make sure there are no duplicates | |
675 self.assertEqual(len(ret), len(set(ret))) | |
676 for key in ret: | |
677 assert key, key | |
678 check_ntuple(ret[key]) | |
679 | |
680 def test_disk_io_counters_no_disks(self): | |
681 # Emulate a case where no disks are installed, see: | |
682 # https://github.com/giampaolo/psutil/issues/1062 | |
683 with mock.patch('psutil._psplatform.disk_io_counters', | |
684 return_value={}) as m: | |
685 self.assertIsNone(psutil.disk_io_counters(perdisk=False)) | |
686 self.assertEqual(psutil.disk_io_counters(perdisk=True), {}) | |
687 assert m.called | |
688 | |
689 | |
690 class TestNetAPIs(PsutilTestCase): | |
691 | |
692 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
693 def test_net_io_counters(self): | |
694 def check_ntuple(nt): | |
695 self.assertEqual(nt[0], nt.bytes_sent) | |
696 self.assertEqual(nt[1], nt.bytes_recv) | |
697 self.assertEqual(nt[2], nt.packets_sent) | |
698 self.assertEqual(nt[3], nt.packets_recv) | |
699 self.assertEqual(nt[4], nt.errin) | |
700 self.assertEqual(nt[5], nt.errout) | |
701 self.assertEqual(nt[6], nt.dropin) | |
702 self.assertEqual(nt[7], nt.dropout) | |
703 assert nt.bytes_sent >= 0, nt | |
704 assert nt.bytes_recv >= 0, nt | |
705 assert nt.packets_sent >= 0, nt | |
706 assert nt.packets_recv >= 0, nt | |
707 assert nt.errin >= 0, nt | |
708 assert nt.errout >= 0, nt | |
709 assert nt.dropin >= 0, nt | |
710 assert nt.dropout >= 0, nt | |
711 | |
712 ret = psutil.net_io_counters(pernic=False) | |
713 check_ntuple(ret) | |
714 ret = psutil.net_io_counters(pernic=True) | |
715 self.assertNotEqual(ret, []) | |
716 for key in ret: | |
717 self.assertTrue(key) | |
718 self.assertIsInstance(key, str) | |
719 check_ntuple(ret[key]) | |
720 | |
721 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
722 def test_net_io_counters_no_nics(self): | |
723 # Emulate a case where no NICs are installed, see: | |
724 # https://github.com/giampaolo/psutil/issues/1062 | |
725 with mock.patch('psutil._psplatform.net_io_counters', | |
726 return_value={}) as m: | |
727 self.assertIsNone(psutil.net_io_counters(pernic=False)) | |
728 self.assertEqual(psutil.net_io_counters(pernic=True), {}) | |
729 assert m.called | |
730 | |
731 def test_net_if_addrs(self): | |
732 nics = psutil.net_if_addrs() | |
733 assert nics, nics | |
734 | |
735 nic_stats = psutil.net_if_stats() | |
736 | |
737 # Not reliable on all platforms (net_if_addrs() reports more | |
738 # interfaces). | |
739 # self.assertEqual(sorted(nics.keys()), | |
740 # sorted(psutil.net_io_counters(pernic=True).keys())) | |
741 | |
742 families = set([socket.AF_INET, socket.AF_INET6, psutil.AF_LINK]) | |
743 for nic, addrs in nics.items(): | |
744 self.assertIsInstance(nic, str) | |
745 self.assertEqual(len(set(addrs)), len(addrs)) | |
746 for addr in addrs: | |
747 self.assertIsInstance(addr.family, int) | |
748 self.assertIsInstance(addr.address, str) | |
749 self.assertIsInstance(addr.netmask, (str, type(None))) | |
750 self.assertIsInstance(addr.broadcast, (str, type(None))) | |
751 self.assertIn(addr.family, families) | |
752 if sys.version_info >= (3, 4) and not PYPY: | |
753 self.assertIsInstance(addr.family, enum.IntEnum) | |
754 if nic_stats[nic].isup: | |
755 # Do not test binding to addresses of interfaces | |
756 # that are down | |
757 if addr.family == socket.AF_INET: | |
758 s = socket.socket(addr.family) | |
759 with contextlib.closing(s): | |
760 s.bind((addr.address, 0)) | |
761 elif addr.family == socket.AF_INET6: | |
762 info = socket.getaddrinfo( | |
763 addr.address, 0, socket.AF_INET6, | |
764 socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0] | |
765 af, socktype, proto, canonname, sa = info | |
766 s = socket.socket(af, socktype, proto) | |
767 with contextlib.closing(s): | |
768 s.bind(sa) | |
769 for ip in (addr.address, addr.netmask, addr.broadcast, | |
770 addr.ptp): | |
771 if ip is not None: | |
772 # TODO: skip AF_INET6 for now because I get: | |
773 # AddressValueError: Only hex digits permitted in | |
774 # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' | |
775 if addr.family != socket.AF_INET6: | |
776 check_net_address(ip, addr.family) | |
777 # broadcast and ptp addresses are mutually exclusive | |
778 if addr.broadcast: | |
779 self.assertIsNone(addr.ptp) | |
780 elif addr.ptp: | |
781 self.assertIsNone(addr.broadcast) | |
782 | |
783 if BSD or MACOS or SUNOS: | |
784 if hasattr(socket, "AF_LINK"): | |
785 self.assertEqual(psutil.AF_LINK, socket.AF_LINK) | |
786 elif LINUX: | |
787 self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) | |
788 elif WINDOWS: | |
789 self.assertEqual(psutil.AF_LINK, -1) | |
790 | |
791 def test_net_if_addrs_mac_null_bytes(self): | |
792 # Simulate that the underlying C function returns an incomplete | |
793 # MAC address. psutil is supposed to fill it with null bytes. | |
794 # https://github.com/giampaolo/psutil/issues/786 | |
795 if POSIX: | |
796 ret = [('em1', psutil.AF_LINK, '06:3d:29', None, None, None)] | |
797 else: | |
798 ret = [('em1', -1, '06-3d-29', None, None, None)] | |
799 with mock.patch('psutil._psplatform.net_if_addrs', | |
800 return_value=ret) as m: | |
801 addr = psutil.net_if_addrs()['em1'][0] | |
802 assert m.called | |
803 if POSIX: | |
804 self.assertEqual(addr.address, '06:3d:29:00:00:00') | |
805 else: | |
806 self.assertEqual(addr.address, '06-3d-29-00-00-00') | |
807 | |
808 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") # raises EPERM | |
809 def test_net_if_stats(self): | |
810 nics = psutil.net_if_stats() | |
811 assert nics, nics | |
812 all_duplexes = (psutil.NIC_DUPLEX_FULL, | |
813 psutil.NIC_DUPLEX_HALF, | |
814 psutil.NIC_DUPLEX_UNKNOWN) | |
815 for name, stats in nics.items(): | |
816 self.assertIsInstance(name, str) | |
817 isup, duplex, speed, mtu = stats | |
818 self.assertIsInstance(isup, bool) | |
819 self.assertIn(duplex, all_duplexes) | |
820 self.assertIn(duplex, all_duplexes) | |
821 self.assertGreaterEqual(speed, 0) | |
822 self.assertGreaterEqual(mtu, 0) | |
823 | |
824 @unittest.skipIf(not (LINUX or BSD or MACOS), | |
825 "LINUX or BSD or MACOS specific") | |
826 def test_net_if_stats_enodev(self): | |
827 # See: https://github.com/giampaolo/psutil/issues/1279 | |
828 with mock.patch('psutil._psutil_posix.net_if_mtu', | |
829 side_effect=OSError(errno.ENODEV, "")) as m: | |
830 ret = psutil.net_if_stats() | |
831 self.assertEqual(ret, {}) | |
832 assert m.called | |
833 | |
834 | |
835 class TestSensorsAPIs(PsutilTestCase): | |
836 | |
837 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
838 def test_sensors_temperatures(self): | |
839 temps = psutil.sensors_temperatures() | |
840 for name, entries in temps.items(): | |
841 self.assertIsInstance(name, str) | |
842 for entry in entries: | |
843 self.assertIsInstance(entry.label, str) | |
844 if entry.current is not None: | |
845 self.assertGreaterEqual(entry.current, 0) | |
846 if entry.high is not None: | |
847 self.assertGreaterEqual(entry.high, 0) | |
848 if entry.critical is not None: | |
849 self.assertGreaterEqual(entry.critical, 0) | |
850 | |
851 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
852 def test_sensors_temperatures_fahreneit(self): | |
853 d = {'coretemp': [('label', 50.0, 60.0, 70.0)]} | |
854 with mock.patch("psutil._psplatform.sensors_temperatures", | |
855 return_value=d) as m: | |
856 temps = psutil.sensors_temperatures( | |
857 fahrenheit=True)['coretemp'][0] | |
858 assert m.called | |
859 self.assertEqual(temps.current, 122.0) | |
860 self.assertEqual(temps.high, 140.0) | |
861 self.assertEqual(temps.critical, 158.0) | |
862 | |
863 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
864 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
865 def test_sensors_battery(self): | |
866 ret = psutil.sensors_battery() | |
867 self.assertGreaterEqual(ret.percent, 0) | |
868 self.assertLessEqual(ret.percent, 100) | |
869 if ret.secsleft not in (psutil.POWER_TIME_UNKNOWN, | |
870 psutil.POWER_TIME_UNLIMITED): | |
871 self.assertGreaterEqual(ret.secsleft, 0) | |
872 else: | |
873 if ret.secsleft == psutil.POWER_TIME_UNLIMITED: | |
874 self.assertTrue(ret.power_plugged) | |
875 self.assertIsInstance(ret.power_plugged, bool) | |
876 | |
877 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
878 def test_sensors_fans(self): | |
879 fans = psutil.sensors_fans() | |
880 for name, entries in fans.items(): | |
881 self.assertIsInstance(name, str) | |
882 for entry in entries: | |
883 self.assertIsInstance(entry.label, str) | |
884 self.assertIsInstance(entry.current, (int, long)) | |
885 self.assertGreaterEqual(entry.current, 0) | |
886 | |
887 | |
888 if __name__ == '__main__': | |
889 from psutil.tests.runner import run_from_name | |
890 run_from_name(__file__) |