Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_contracts.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
4 # Use of this source code is governed by a BSD-style license that can be | |
5 # found in the LICENSE file. | |
6 | |
7 """Contracts tests. These tests mainly check API sanity in terms of | |
8 returned types and APIs availability. | |
9 Some of these are duplicates of tests test_system.py and test_process.py | |
10 """ | |
11 | |
12 import errno | |
13 import multiprocessing | |
14 import os | |
15 import signal | |
16 import stat | |
17 import sys | |
18 import time | |
19 import traceback | |
20 | |
21 from psutil import AIX | |
22 from psutil import BSD | |
23 from psutil import FREEBSD | |
24 from psutil import LINUX | |
25 from psutil import MACOS | |
26 from psutil import NETBSD | |
27 from psutil import OPENBSD | |
28 from psutil import OSX | |
29 from psutil import POSIX | |
30 from psutil import SUNOS | |
31 from psutil import WINDOWS | |
32 from psutil._compat import FileNotFoundError | |
33 from psutil._compat import long | |
34 from psutil._compat import range | |
35 from psutil.tests import create_sockets | |
36 from psutil.tests import enum | |
37 from psutil.tests import GITHUB_WHEELS | |
38 from psutil.tests import HAS_CPU_FREQ | |
39 from psutil.tests import HAS_NET_IO_COUNTERS | |
40 from psutil.tests import HAS_SENSORS_FANS | |
41 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
42 from psutil.tests import is_namedtuple | |
43 from psutil.tests import process_namespace | |
44 from psutil.tests import PsutilTestCase | |
45 from psutil.tests import PYPY | |
46 from psutil.tests import serialrun | |
47 from psutil.tests import SKIP_SYSCONS | |
48 from psutil.tests import unittest | |
49 from psutil.tests import VALID_PROC_STATUSES | |
50 import psutil | |
51 | |
52 | |
53 # =================================================================== | |
54 # --- APIs availability | |
55 # =================================================================== | |
56 | |
57 # Make sure code reflects what doc promises in terms of APIs | |
58 # availability. | |
59 | |
60 class TestAvailConstantsAPIs(PsutilTestCase): | |
61 | |
62 def test_PROCFS_PATH(self): | |
63 self.assertEqual(hasattr(psutil, "PROCFS_PATH"), | |
64 LINUX or SUNOS or AIX) | |
65 | |
66 def test_win_priority(self): | |
67 ae = self.assertEqual | |
68 ae(hasattr(psutil, "ABOVE_NORMAL_PRIORITY_CLASS"), WINDOWS) | |
69 ae(hasattr(psutil, "BELOW_NORMAL_PRIORITY_CLASS"), WINDOWS) | |
70 ae(hasattr(psutil, "HIGH_PRIORITY_CLASS"), WINDOWS) | |
71 ae(hasattr(psutil, "IDLE_PRIORITY_CLASS"), WINDOWS) | |
72 ae(hasattr(psutil, "NORMAL_PRIORITY_CLASS"), WINDOWS) | |
73 ae(hasattr(psutil, "REALTIME_PRIORITY_CLASS"), WINDOWS) | |
74 | |
75 def test_linux_ioprio_linux(self): | |
76 ae = self.assertEqual | |
77 ae(hasattr(psutil, "IOPRIO_CLASS_NONE"), LINUX) | |
78 ae(hasattr(psutil, "IOPRIO_CLASS_RT"), LINUX) | |
79 ae(hasattr(psutil, "IOPRIO_CLASS_BE"), LINUX) | |
80 ae(hasattr(psutil, "IOPRIO_CLASS_IDLE"), LINUX) | |
81 | |
82 def test_linux_ioprio_windows(self): | |
83 ae = self.assertEqual | |
84 ae(hasattr(psutil, "IOPRIO_HIGH"), WINDOWS) | |
85 ae(hasattr(psutil, "IOPRIO_NORMAL"), WINDOWS) | |
86 ae(hasattr(psutil, "IOPRIO_LOW"), WINDOWS) | |
87 ae(hasattr(psutil, "IOPRIO_VERYLOW"), WINDOWS) | |
88 | |
89 @unittest.skipIf(GITHUB_WHEELS, "not exposed via GITHUB_WHEELS") | |
90 def test_linux_rlimit(self): | |
91 ae = self.assertEqual | |
92 ae(hasattr(psutil, "RLIM_INFINITY"), LINUX) | |
93 ae(hasattr(psutil, "RLIMIT_AS"), LINUX) | |
94 ae(hasattr(psutil, "RLIMIT_CORE"), LINUX) | |
95 ae(hasattr(psutil, "RLIMIT_CPU"), LINUX) | |
96 ae(hasattr(psutil, "RLIMIT_DATA"), LINUX) | |
97 ae(hasattr(psutil, "RLIMIT_FSIZE"), LINUX) | |
98 ae(hasattr(psutil, "RLIMIT_LOCKS"), LINUX) | |
99 ae(hasattr(psutil, "RLIMIT_MEMLOCK"), LINUX) | |
100 ae(hasattr(psutil, "RLIMIT_NOFILE"), LINUX) | |
101 ae(hasattr(psutil, "RLIMIT_NPROC"), LINUX) | |
102 ae(hasattr(psutil, "RLIMIT_RSS"), LINUX) | |
103 ae(hasattr(psutil, "RLIMIT_STACK"), LINUX) | |
104 | |
105 ae(hasattr(psutil, "RLIMIT_MSGQUEUE"), LINUX) # requires Linux 2.6.8 | |
106 ae(hasattr(psutil, "RLIMIT_NICE"), LINUX) # requires Linux 2.6.12 | |
107 ae(hasattr(psutil, "RLIMIT_RTPRIO"), LINUX) # requires Linux 2.6.12 | |
108 ae(hasattr(psutil, "RLIMIT_RTTIME"), LINUX) # requires Linux 2.6.25 | |
109 ae(hasattr(psutil, "RLIMIT_SIGPENDING"), LINUX) # requires Linux 2.6.8 | |
110 | |
111 | |
112 class TestAvailSystemAPIs(PsutilTestCase): | |
113 | |
114 def test_win_service_iter(self): | |
115 self.assertEqual(hasattr(psutil, "win_service_iter"), WINDOWS) | |
116 | |
117 def test_win_service_get(self): | |
118 self.assertEqual(hasattr(psutil, "win_service_get"), WINDOWS) | |
119 | |
120 def test_cpu_freq(self): | |
121 self.assertEqual(hasattr(psutil, "cpu_freq"), | |
122 LINUX or MACOS or WINDOWS or FREEBSD) | |
123 | |
124 def test_sensors_temperatures(self): | |
125 self.assertEqual( | |
126 hasattr(psutil, "sensors_temperatures"), LINUX or FREEBSD) | |
127 | |
128 def test_sensors_fans(self): | |
129 self.assertEqual(hasattr(psutil, "sensors_fans"), LINUX) | |
130 | |
131 def test_battery(self): | |
132 self.assertEqual(hasattr(psutil, "sensors_battery"), | |
133 LINUX or WINDOWS or FREEBSD or MACOS) | |
134 | |
135 | |
136 class TestAvailProcessAPIs(PsutilTestCase): | |
137 | |
138 def test_environ(self): | |
139 self.assertEqual(hasattr(psutil.Process, "environ"), | |
140 LINUX or MACOS or WINDOWS or AIX or SUNOS) | |
141 | |
142 def test_uids(self): | |
143 self.assertEqual(hasattr(psutil.Process, "uids"), POSIX) | |
144 | |
145 def test_gids(self): | |
146 self.assertEqual(hasattr(psutil.Process, "uids"), POSIX) | |
147 | |
148 def test_terminal(self): | |
149 self.assertEqual(hasattr(psutil.Process, "terminal"), POSIX) | |
150 | |
151 def test_ionice(self): | |
152 self.assertEqual(hasattr(psutil.Process, "ionice"), LINUX or WINDOWS) | |
153 | |
154 @unittest.skipIf(GITHUB_WHEELS, "not exposed via GITHUB_WHEELS") | |
155 def test_rlimit(self): | |
156 # requires Linux 2.6.36 | |
157 self.assertEqual(hasattr(psutil.Process, "rlimit"), LINUX) | |
158 | |
159 def test_io_counters(self): | |
160 hasit = hasattr(psutil.Process, "io_counters") | |
161 self.assertEqual(hasit, False if MACOS or SUNOS else True) | |
162 | |
163 def test_num_fds(self): | |
164 self.assertEqual(hasattr(psutil.Process, "num_fds"), POSIX) | |
165 | |
166 def test_num_handles(self): | |
167 self.assertEqual(hasattr(psutil.Process, "num_handles"), WINDOWS) | |
168 | |
169 def test_cpu_affinity(self): | |
170 self.assertEqual(hasattr(psutil.Process, "cpu_affinity"), | |
171 LINUX or WINDOWS or FREEBSD) | |
172 | |
173 def test_cpu_num(self): | |
174 self.assertEqual(hasattr(psutil.Process, "cpu_num"), | |
175 LINUX or FREEBSD or SUNOS) | |
176 | |
177 def test_memory_maps(self): | |
178 hasit = hasattr(psutil.Process, "memory_maps") | |
179 self.assertEqual( | |
180 hasit, False if OPENBSD or NETBSD or AIX or MACOS else True) | |
181 | |
182 | |
183 # =================================================================== | |
184 # --- API types | |
185 # =================================================================== | |
186 | |
187 | |
188 class TestSystemAPITypes(PsutilTestCase): | |
189 """Check the return types of system related APIs. | |
190 Mainly we want to test we never return unicode on Python 2, see: | |
191 https://github.com/giampaolo/psutil/issues/1039 | |
192 """ | |
193 | |
194 @classmethod | |
195 def setUpClass(cls): | |
196 cls.proc = psutil.Process() | |
197 | |
198 def assert_ntuple_of_nums(self, nt, type_=float, gezero=True): | |
199 assert is_namedtuple(nt) | |
200 for n in nt: | |
201 self.assertIsInstance(n, type_) | |
202 if gezero: | |
203 self.assertGreaterEqual(n, 0) | |
204 | |
205 def test_cpu_times(self): | |
206 self.assert_ntuple_of_nums(psutil.cpu_times()) | |
207 for nt in psutil.cpu_times(percpu=True): | |
208 self.assert_ntuple_of_nums(nt) | |
209 | |
210 def test_cpu_percent(self): | |
211 self.assertIsInstance(psutil.cpu_percent(interval=None), float) | |
212 self.assertIsInstance(psutil.cpu_percent(interval=0.00001), float) | |
213 | |
214 def test_cpu_times_percent(self): | |
215 self.assert_ntuple_of_nums(psutil.cpu_times_percent(interval=None)) | |
216 self.assert_ntuple_of_nums(psutil.cpu_times_percent(interval=0.0001)) | |
217 | |
218 def test_cpu_count(self): | |
219 self.assertIsInstance(psutil.cpu_count(), int) | |
220 | |
221 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
222 def test_cpu_freq(self): | |
223 if psutil.cpu_freq() is None: | |
224 raise self.skipTest("cpu_freq() returns None") | |
225 self.assert_ntuple_of_nums(psutil.cpu_freq(), type_=(float, int, long)) | |
226 | |
227 def test_disk_io_counters(self): | |
228 # Duplicate of test_system.py. Keep it anyway. | |
229 for k, v in psutil.disk_io_counters(perdisk=True).items(): | |
230 self.assertIsInstance(k, str) | |
231 self.assert_ntuple_of_nums(v, type_=(int, long)) | |
232 | |
233 def test_disk_partitions(self): | |
234 # Duplicate of test_system.py. Keep it anyway. | |
235 for disk in psutil.disk_partitions(): | |
236 self.assertIsInstance(disk.device, str) | |
237 self.assertIsInstance(disk.mountpoint, str) | |
238 self.assertIsInstance(disk.fstype, str) | |
239 self.assertIsInstance(disk.opts, str) | |
240 | |
241 @unittest.skipIf(SKIP_SYSCONS, "requires root") | |
242 def test_net_connections(self): | |
243 with create_sockets(): | |
244 ret = psutil.net_connections('all') | |
245 self.assertEqual(len(ret), len(set(ret))) | |
246 for conn in ret: | |
247 assert is_namedtuple(conn) | |
248 | |
249 def test_net_if_addrs(self): | |
250 # Duplicate of test_system.py. Keep it anyway. | |
251 for ifname, addrs in psutil.net_if_addrs().items(): | |
252 self.assertIsInstance(ifname, str) | |
253 for addr in addrs: | |
254 if enum is not None and not PYPY: | |
255 self.assertIsInstance(addr.family, enum.IntEnum) | |
256 else: | |
257 self.assertIsInstance(addr.family, int) | |
258 self.assertIsInstance(addr.address, str) | |
259 self.assertIsInstance(addr.netmask, (str, type(None))) | |
260 self.assertIsInstance(addr.broadcast, (str, type(None))) | |
261 | |
262 def test_net_if_stats(self): | |
263 # Duplicate of test_system.py. Keep it anyway. | |
264 for ifname, info in psutil.net_if_stats().items(): | |
265 self.assertIsInstance(ifname, str) | |
266 self.assertIsInstance(info.isup, bool) | |
267 if enum is not None: | |
268 self.assertIsInstance(info.duplex, enum.IntEnum) | |
269 else: | |
270 self.assertIsInstance(info.duplex, int) | |
271 self.assertIsInstance(info.speed, int) | |
272 self.assertIsInstance(info.mtu, int) | |
273 | |
274 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
275 def test_net_io_counters(self): | |
276 # Duplicate of test_system.py. Keep it anyway. | |
277 for ifname, _ in psutil.net_io_counters(pernic=True).items(): | |
278 self.assertIsInstance(ifname, str) | |
279 | |
280 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
281 def test_sensors_fans(self): | |
282 # Duplicate of test_system.py. Keep it anyway. | |
283 for name, units in psutil.sensors_fans().items(): | |
284 self.assertIsInstance(name, str) | |
285 for unit in units: | |
286 self.assertIsInstance(unit.label, str) | |
287 self.assertIsInstance(unit.current, (float, int, type(None))) | |
288 | |
289 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
290 def test_sensors_temperatures(self): | |
291 # Duplicate of test_system.py. Keep it anyway. | |
292 for name, units in psutil.sensors_temperatures().items(): | |
293 self.assertIsInstance(name, str) | |
294 for unit in units: | |
295 self.assertIsInstance(unit.label, str) | |
296 self.assertIsInstance(unit.current, (float, int, type(None))) | |
297 self.assertIsInstance(unit.high, (float, int, type(None))) | |
298 self.assertIsInstance(unit.critical, (float, int, type(None))) | |
299 | |
300 def test_boot_time(self): | |
301 # Duplicate of test_system.py. Keep it anyway. | |
302 self.assertIsInstance(psutil.boot_time(), float) | |
303 | |
304 def test_users(self): | |
305 # Duplicate of test_system.py. Keep it anyway. | |
306 for user in psutil.users(): | |
307 self.assertIsInstance(user.name, str) | |
308 self.assertIsInstance(user.terminal, (str, type(None))) | |
309 self.assertIsInstance(user.host, (str, type(None))) | |
310 self.assertIsInstance(user.pid, (int, type(None))) | |
311 | |
312 | |
313 class TestProcessWaitType(PsutilTestCase): | |
314 | |
315 @unittest.skipIf(not POSIX, "not POSIX") | |
316 def test_negative_signal(self): | |
317 p = psutil.Process(self.spawn_testproc().pid) | |
318 p.terminate() | |
319 code = p.wait() | |
320 self.assertEqual(code, -signal.SIGTERM) | |
321 if enum is not None: | |
322 self.assertIsInstance(code, enum.IntEnum) | |
323 else: | |
324 self.assertIsInstance(code, int) | |
325 | |
326 | |
327 # =================================================================== | |
328 # --- Featch all processes test | |
329 # =================================================================== | |
330 | |
331 | |
332 def proc_info(pid): | |
333 tcase = PsutilTestCase() | |
334 | |
335 def check_exception(exc, proc, name, ppid): | |
336 tcase.assertEqual(exc.pid, pid) | |
337 tcase.assertEqual(exc.name, name) | |
338 if isinstance(exc, psutil.ZombieProcess): | |
339 if exc.ppid is not None: | |
340 tcase.assertGreaterEqual(exc.ppid, 0) | |
341 tcase.assertEqual(exc.ppid, ppid) | |
342 elif isinstance(exc, psutil.NoSuchProcess): | |
343 tcase.assertProcessGone(proc) | |
344 str(exc) | |
345 assert exc.msg | |
346 | |
347 def do_wait(): | |
348 if pid != 0: | |
349 try: | |
350 proc.wait(0) | |
351 except psutil.Error as exc: | |
352 check_exception(exc, proc, name, ppid) | |
353 | |
354 try: | |
355 proc = psutil.Process(pid) | |
356 d = proc.as_dict(['ppid', 'name']) | |
357 except psutil.NoSuchProcess: | |
358 return {} | |
359 | |
360 name, ppid = d['name'], d['ppid'] | |
361 info = {'pid': proc.pid} | |
362 ns = process_namespace(proc) | |
363 with proc.oneshot(): | |
364 for fun, fun_name in ns.iter(ns.getters, clear_cache=False): | |
365 try: | |
366 info[fun_name] = fun() | |
367 except psutil.Error as exc: | |
368 check_exception(exc, proc, name, ppid) | |
369 continue | |
370 do_wait() | |
371 return info | |
372 | |
373 | |
374 @serialrun | |
375 class TestFetchAllProcesses(PsutilTestCase): | |
376 """Test which iterates over all running processes and performs | |
377 some sanity checks against Process API's returned values. | |
378 Uses a process pool to get info about all processes. | |
379 """ | |
380 | |
381 def setUp(self): | |
382 self.pool = multiprocessing.Pool() | |
383 | |
384 def tearDown(self): | |
385 self.pool.terminate() | |
386 self.pool.join() | |
387 | |
388 def iter_proc_info(self): | |
389 # Fixes "can't pickle <function proc_info>: it's not the | |
390 # same object as test_contracts.proc_info". | |
391 from psutil.tests.test_contracts import proc_info | |
392 return self.pool.imap_unordered(proc_info, psutil.pids()) | |
393 | |
394 def test_all(self): | |
395 failures = [] | |
396 for info in self.iter_proc_info(): | |
397 for name, value in info.items(): | |
398 meth = getattr(self, name) | |
399 try: | |
400 meth(value, info) | |
401 except AssertionError: | |
402 s = '\n' + '=' * 70 + '\n' | |
403 s += "FAIL: test_%s pid=%s, ret=%s\n" % ( | |
404 name, info['pid'], repr(value)) | |
405 s += '-' * 70 | |
406 s += "\n%s" % traceback.format_exc() | |
407 s = "\n".join((" " * 4) + i for i in s.splitlines()) | |
408 s += '\n' | |
409 failures.append(s) | |
410 else: | |
411 if value not in (0, 0.0, [], None, '', {}): | |
412 assert value, value | |
413 if failures: | |
414 raise self.fail(''.join(failures)) | |
415 | |
416 def cmdline(self, ret, info): | |
417 self.assertIsInstance(ret, list) | |
418 for part in ret: | |
419 self.assertIsInstance(part, str) | |
420 | |
421 def exe(self, ret, info): | |
422 self.assertIsInstance(ret, (str, type(None))) | |
423 if not ret: | |
424 self.assertEqual(ret, '') | |
425 else: | |
426 if WINDOWS and not ret.endswith('.exe'): | |
427 return # May be "Registry", "MemCompression", ... | |
428 assert os.path.isabs(ret), ret | |
429 # Note: os.stat() may return False even if the file is there | |
430 # hence we skip the test, see: | |
431 # http://stackoverflow.com/questions/3112546/os-path-exists-lies | |
432 if POSIX and os.path.isfile(ret): | |
433 if hasattr(os, 'access') and hasattr(os, "X_OK"): | |
434 # XXX may fail on MACOS | |
435 assert os.access(ret, os.X_OK) | |
436 | |
437 def pid(self, ret, info): | |
438 self.assertIsInstance(ret, int) | |
439 self.assertGreaterEqual(ret, 0) | |
440 | |
441 def ppid(self, ret, info): | |
442 self.assertIsInstance(ret, (int, long)) | |
443 self.assertGreaterEqual(ret, 0) | |
444 | |
445 def name(self, ret, info): | |
446 self.assertIsInstance(ret, str) | |
447 # on AIX, "<exiting>" processes don't have names | |
448 if not AIX: | |
449 assert ret | |
450 | |
451 def create_time(self, ret, info): | |
452 self.assertIsInstance(ret, float) | |
453 try: | |
454 self.assertGreaterEqual(ret, 0) | |
455 except AssertionError: | |
456 # XXX | |
457 if OPENBSD and info['status'] == psutil.STATUS_ZOMBIE: | |
458 pass | |
459 else: | |
460 raise | |
461 # this can't be taken for granted on all platforms | |
462 # self.assertGreaterEqual(ret, psutil.boot_time()) | |
463 # make sure returned value can be pretty printed | |
464 # with strftime | |
465 time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) | |
466 | |
467 def uids(self, ret, info): | |
468 assert is_namedtuple(ret) | |
469 for uid in ret: | |
470 self.assertIsInstance(uid, int) | |
471 self.assertGreaterEqual(uid, 0) | |
472 | |
473 def gids(self, ret, info): | |
474 assert is_namedtuple(ret) | |
475 # note: testing all gids as above seems not to be reliable for | |
476 # gid == 30 (nodoby); not sure why. | |
477 for gid in ret: | |
478 self.assertIsInstance(gid, int) | |
479 if not MACOS and not NETBSD: | |
480 self.assertGreaterEqual(gid, 0) | |
481 | |
482 def username(self, ret, info): | |
483 self.assertIsInstance(ret, str) | |
484 assert ret | |
485 | |
486 def status(self, ret, info): | |
487 self.assertIsInstance(ret, str) | |
488 assert ret | |
489 self.assertNotEqual(ret, '?') # XXX | |
490 self.assertIn(ret, VALID_PROC_STATUSES) | |
491 | |
492 def io_counters(self, ret, info): | |
493 assert is_namedtuple(ret) | |
494 for field in ret: | |
495 self.assertIsInstance(field, (int, long)) | |
496 if field != -1: | |
497 self.assertGreaterEqual(field, 0) | |
498 | |
499 def ionice(self, ret, info): | |
500 if LINUX: | |
501 self.assertIsInstance(ret.ioclass, int) | |
502 self.assertIsInstance(ret.value, int) | |
503 self.assertGreaterEqual(ret.ioclass, 0) | |
504 self.assertGreaterEqual(ret.value, 0) | |
505 else: # Windows, Cygwin | |
506 choices = [ | |
507 psutil.IOPRIO_VERYLOW, | |
508 psutil.IOPRIO_LOW, | |
509 psutil.IOPRIO_NORMAL, | |
510 psutil.IOPRIO_HIGH] | |
511 self.assertIsInstance(ret, int) | |
512 self.assertGreaterEqual(ret, 0) | |
513 self.assertIn(ret, choices) | |
514 | |
515 def num_threads(self, ret, info): | |
516 self.assertIsInstance(ret, int) | |
517 self.assertGreaterEqual(ret, 1) | |
518 | |
519 def threads(self, ret, info): | |
520 self.assertIsInstance(ret, list) | |
521 for t in ret: | |
522 assert is_namedtuple(t) | |
523 self.assertGreaterEqual(t.id, 0) | |
524 self.assertGreaterEqual(t.user_time, 0) | |
525 self.assertGreaterEqual(t.system_time, 0) | |
526 for field in t: | |
527 self.assertIsInstance(field, (int, float)) | |
528 | |
529 def cpu_times(self, ret, info): | |
530 assert is_namedtuple(ret) | |
531 for n in ret: | |
532 self.assertIsInstance(n, float) | |
533 self.assertGreaterEqual(n, 0) | |
534 # TODO: check ntuple fields | |
535 | |
536 def cpu_percent(self, ret, info): | |
537 self.assertIsInstance(ret, float) | |
538 assert 0.0 <= ret <= 100.0, ret | |
539 | |
540 def cpu_num(self, ret, info): | |
541 self.assertIsInstance(ret, int) | |
542 if FREEBSD and ret == -1: | |
543 return | |
544 self.assertGreaterEqual(ret, 0) | |
545 if psutil.cpu_count() == 1: | |
546 self.assertEqual(ret, 0) | |
547 self.assertIn(ret, list(range(psutil.cpu_count()))) | |
548 | |
549 def memory_info(self, ret, info): | |
550 assert is_namedtuple(ret) | |
551 for value in ret: | |
552 self.assertIsInstance(value, (int, long)) | |
553 self.assertGreaterEqual(value, 0) | |
554 if WINDOWS: | |
555 self.assertGreaterEqual(ret.peak_wset, ret.wset) | |
556 self.assertGreaterEqual(ret.peak_paged_pool, ret.paged_pool) | |
557 self.assertGreaterEqual(ret.peak_nonpaged_pool, ret.nonpaged_pool) | |
558 self.assertGreaterEqual(ret.peak_pagefile, ret.pagefile) | |
559 | |
560 def memory_full_info(self, ret, info): | |
561 assert is_namedtuple(ret) | |
562 total = psutil.virtual_memory().total | |
563 for name in ret._fields: | |
564 value = getattr(ret, name) | |
565 self.assertIsInstance(value, (int, long)) | |
566 self.assertGreaterEqual(value, 0, msg=(name, value)) | |
567 if LINUX or OSX and name in ('vms', 'data'): | |
568 # On Linux there are processes (e.g. 'goa-daemon') whose | |
569 # VMS is incredibly high for some reason. | |
570 continue | |
571 self.assertLessEqual(value, total, msg=(name, value, total)) | |
572 | |
573 if LINUX: | |
574 self.assertGreaterEqual(ret.pss, ret.uss) | |
575 | |
576 def open_files(self, ret, info): | |
577 self.assertIsInstance(ret, list) | |
578 for f in ret: | |
579 self.assertIsInstance(f.fd, int) | |
580 self.assertIsInstance(f.path, str) | |
581 if WINDOWS: | |
582 self.assertEqual(f.fd, -1) | |
583 elif LINUX: | |
584 self.assertIsInstance(f.position, int) | |
585 self.assertIsInstance(f.mode, str) | |
586 self.assertIsInstance(f.flags, int) | |
587 self.assertGreaterEqual(f.position, 0) | |
588 self.assertIn(f.mode, ('r', 'w', 'a', 'r+', 'a+')) | |
589 self.assertGreater(f.flags, 0) | |
590 elif BSD and not f.path: | |
591 # XXX see: https://github.com/giampaolo/psutil/issues/595 | |
592 continue | |
593 assert os.path.isabs(f.path), f | |
594 try: | |
595 st = os.stat(f.path) | |
596 except FileNotFoundError: | |
597 pass | |
598 else: | |
599 assert stat.S_ISREG(st.st_mode), f | |
600 | |
601 def num_fds(self, ret, info): | |
602 self.assertIsInstance(ret, int) | |
603 self.assertGreaterEqual(ret, 0) | |
604 | |
605 def connections(self, ret, info): | |
606 with create_sockets(): | |
607 self.assertEqual(len(ret), len(set(ret))) | |
608 for conn in ret: | |
609 assert is_namedtuple(conn) | |
610 | |
611 def cwd(self, ret, info): | |
612 if ret: # 'ret' can be None or empty | |
613 self.assertIsInstance(ret, str) | |
614 assert os.path.isabs(ret), ret | |
615 try: | |
616 st = os.stat(ret) | |
617 except OSError as err: | |
618 if WINDOWS and err.errno in \ | |
619 psutil._psplatform.ACCESS_DENIED_SET: | |
620 pass | |
621 # directory has been removed in mean time | |
622 elif err.errno != errno.ENOENT: | |
623 raise | |
624 else: | |
625 assert stat.S_ISDIR(st.st_mode) | |
626 | |
627 def memory_percent(self, ret, info): | |
628 self.assertIsInstance(ret, float) | |
629 assert 0 <= ret <= 100, ret | |
630 | |
631 def is_running(self, ret, info): | |
632 self.assertIsInstance(ret, bool) | |
633 | |
634 def cpu_affinity(self, ret, info): | |
635 self.assertIsInstance(ret, list) | |
636 assert ret != [], ret | |
637 cpus = list(range(psutil.cpu_count())) | |
638 for n in ret: | |
639 self.assertIsInstance(n, int) | |
640 self.assertIn(n, cpus) | |
641 | |
642 def terminal(self, ret, info): | |
643 self.assertIsInstance(ret, (str, type(None))) | |
644 if ret is not None: | |
645 assert os.path.isabs(ret), ret | |
646 assert os.path.exists(ret), ret | |
647 | |
648 def memory_maps(self, ret, info): | |
649 for nt in ret: | |
650 self.assertIsInstance(nt.addr, str) | |
651 self.assertIsInstance(nt.perms, str) | |
652 self.assertIsInstance(nt.path, str) | |
653 for fname in nt._fields: | |
654 value = getattr(nt, fname) | |
655 if fname == 'path': | |
656 if not value.startswith('['): | |
657 assert os.path.isabs(nt.path), nt.path | |
658 # commented as on Linux we might get | |
659 # '/foo/bar (deleted)' | |
660 # assert os.path.exists(nt.path), nt.path | |
661 elif fname == 'addr': | |
662 assert value, repr(value) | |
663 elif fname == 'perms': | |
664 if not WINDOWS: | |
665 assert value, repr(value) | |
666 else: | |
667 self.assertIsInstance(value, (int, long)) | |
668 self.assertGreaterEqual(value, 0) | |
669 | |
670 def num_handles(self, ret, info): | |
671 self.assertIsInstance(ret, int) | |
672 self.assertGreaterEqual(ret, 0) | |
673 | |
674 def nice(self, ret, info): | |
675 self.assertIsInstance(ret, int) | |
676 if POSIX: | |
677 assert -20 <= ret <= 20, ret | |
678 else: | |
679 priorities = [getattr(psutil, x) for x in dir(psutil) | |
680 if x.endswith('_PRIORITY_CLASS')] | |
681 self.assertIn(ret, priorities) | |
682 if sys.version_info > (3, 4): | |
683 self.assertIsInstance(ret, enum.IntEnum) | |
684 else: | |
685 self.assertIsInstance(ret, int) | |
686 | |
687 def num_ctx_switches(self, ret, info): | |
688 assert is_namedtuple(ret) | |
689 for value in ret: | |
690 self.assertIsInstance(value, (int, long)) | |
691 self.assertGreaterEqual(value, 0) | |
692 | |
693 def rlimit(self, ret, info): | |
694 self.assertIsInstance(ret, tuple) | |
695 self.assertEqual(len(ret), 2) | |
696 self.assertGreaterEqual(ret[0], -1) | |
697 self.assertGreaterEqual(ret[1], -1) | |
698 | |
699 def environ(self, ret, info): | |
700 self.assertIsInstance(ret, dict) | |
701 for k, v in ret.items(): | |
702 self.assertIsInstance(k, str) | |
703 self.assertIsInstance(v, str) | |
704 | |
705 | |
706 if __name__ == '__main__': | |
707 from psutil.tests.runner import run_from_name | |
708 run_from_name(__file__) |