comparison env/lib/python3.7/site-packages/psutil/tests/test_contracts.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """Contracts tests. These tests mainly check API sanity in terms of
8 returned types and APIs availability.
9 Some of these are duplicates of tests test_system.py and test_process.py
10 """
11
12 import errno
13 import os
14 import stat
15 import time
16 import traceback
17
18 from psutil import AIX
19 from psutil import BSD
20 from psutil import FREEBSD
21 from psutil import LINUX
22 from psutil import MACOS
23 from psutil import NETBSD
24 from psutil import OPENBSD
25 from psutil import OSX
26 from psutil import POSIX
27 from psutil import SUNOS
28 from psutil import WINDOWS
29 from psutil._compat import long
30 from psutil.tests import create_sockets
31 from psutil.tests import enum
32 from psutil.tests import get_kernel_version
33 from psutil.tests import HAS_CPU_FREQ
34 from psutil.tests import HAS_NET_IO_COUNTERS
35 from psutil.tests import HAS_RLIMIT
36 from psutil.tests import HAS_SENSORS_FANS
37 from psutil.tests import HAS_SENSORS_TEMPERATURES
38 from psutil.tests import is_namedtuple
39 from psutil.tests import safe_rmpath
40 from psutil.tests import SKIP_SYSCONS
41 from psutil.tests import TESTFN
42 from psutil.tests import unittest
43 from psutil.tests import VALID_PROC_STATUSES
44 from psutil.tests import warn
45 import psutil
46
47
48 # ===================================================================
49 # --- APIs availability
50 # ===================================================================
51
52 # Make sure code reflects what doc promises in terms of APIs
53 # availability.
54
55 class TestAvailConstantsAPIs(unittest.TestCase):
56
57 def test_PROCFS_PATH(self):
58 self.assertEqual(hasattr(psutil, "PROCFS_PATH"),
59 LINUX or SUNOS or AIX)
60
61 def test_win_priority(self):
62 ae = self.assertEqual
63 ae(hasattr(psutil, "ABOVE_NORMAL_PRIORITY_CLASS"), WINDOWS)
64 ae(hasattr(psutil, "BELOW_NORMAL_PRIORITY_CLASS"), WINDOWS)
65 ae(hasattr(psutil, "HIGH_PRIORITY_CLASS"), WINDOWS)
66 ae(hasattr(psutil, "IDLE_PRIORITY_CLASS"), WINDOWS)
67 ae(hasattr(psutil, "NORMAL_PRIORITY_CLASS"), WINDOWS)
68 ae(hasattr(psutil, "REALTIME_PRIORITY_CLASS"), WINDOWS)
69
70 def test_linux_ioprio_linux(self):
71 ae = self.assertEqual
72 ae(hasattr(psutil, "IOPRIO_CLASS_NONE"), LINUX)
73 ae(hasattr(psutil, "IOPRIO_CLASS_RT"), LINUX)
74 ae(hasattr(psutil, "IOPRIO_CLASS_BE"), LINUX)
75 ae(hasattr(psutil, "IOPRIO_CLASS_IDLE"), LINUX)
76
77 def test_linux_ioprio_windows(self):
78 ae = self.assertEqual
79 ae(hasattr(psutil, "IOPRIO_HIGH"), WINDOWS)
80 ae(hasattr(psutil, "IOPRIO_NORMAL"), WINDOWS)
81 ae(hasattr(psutil, "IOPRIO_LOW"), WINDOWS)
82 ae(hasattr(psutil, "IOPRIO_VERYLOW"), WINDOWS)
83
84 def test_linux_rlimit(self):
85 ae = self.assertEqual
86 hasit = LINUX and get_kernel_version() >= (2, 6, 36)
87 ae(hasattr(psutil.Process, "rlimit"), hasit)
88 ae(hasattr(psutil, "RLIM_INFINITY"), hasit)
89 ae(hasattr(psutil, "RLIMIT_AS"), hasit)
90 ae(hasattr(psutil, "RLIMIT_CORE"), hasit)
91 ae(hasattr(psutil, "RLIMIT_CPU"), hasit)
92 ae(hasattr(psutil, "RLIMIT_DATA"), hasit)
93 ae(hasattr(psutil, "RLIMIT_FSIZE"), hasit)
94 ae(hasattr(psutil, "RLIMIT_LOCKS"), hasit)
95 ae(hasattr(psutil, "RLIMIT_MEMLOCK"), hasit)
96 ae(hasattr(psutil, "RLIMIT_NOFILE"), hasit)
97 ae(hasattr(psutil, "RLIMIT_NPROC"), hasit)
98 ae(hasattr(psutil, "RLIMIT_RSS"), hasit)
99 ae(hasattr(psutil, "RLIMIT_STACK"), hasit)
100
101 hasit = LINUX and get_kernel_version() >= (3, 0)
102 ae(hasattr(psutil, "RLIMIT_MSGQUEUE"), hasit)
103 ae(hasattr(psutil, "RLIMIT_NICE"), hasit)
104 ae(hasattr(psutil, "RLIMIT_RTPRIO"), hasit)
105 ae(hasattr(psutil, "RLIMIT_RTTIME"), hasit)
106 ae(hasattr(psutil, "RLIMIT_SIGPENDING"), hasit)
107
108
109 class TestAvailSystemAPIs(unittest.TestCase):
110
111 def test_win_service_iter(self):
112 self.assertEqual(hasattr(psutil, "win_service_iter"), WINDOWS)
113
114 def test_win_service_get(self):
115 self.assertEqual(hasattr(psutil, "win_service_get"), WINDOWS)
116
117 def test_cpu_freq(self):
118 linux = (LINUX and
119 (os.path.exists("/sys/devices/system/cpu/cpufreq") or
120 os.path.exists("/sys/devices/system/cpu/cpu0/cpufreq")))
121 self.assertEqual(hasattr(psutil, "cpu_freq"),
122 linux or MACOS or WINDOWS or FREEBSD)
123
124 def test_sensors_temperatures(self):
125 self.assertEqual(
126 hasattr(psutil, "sensors_temperatures"), LINUX or FREEBSD)
127
128 def test_sensors_fans(self):
129 self.assertEqual(hasattr(psutil, "sensors_fans"), LINUX)
130
131 def test_battery(self):
132 self.assertEqual(hasattr(psutil, "sensors_battery"),
133 LINUX or WINDOWS or FREEBSD or MACOS)
134
135
136 class TestAvailProcessAPIs(unittest.TestCase):
137
138 def test_environ(self):
139 self.assertEqual(hasattr(psutil.Process, "environ"),
140 LINUX or MACOS or WINDOWS or AIX or SUNOS)
141
142 def test_uids(self):
143 self.assertEqual(hasattr(psutil.Process, "uids"), POSIX)
144
145 def test_gids(self):
146 self.assertEqual(hasattr(psutil.Process, "uids"), POSIX)
147
148 def test_terminal(self):
149 self.assertEqual(hasattr(psutil.Process, "terminal"), POSIX)
150
151 def test_ionice(self):
152 self.assertEqual(hasattr(psutil.Process, "ionice"), LINUX or WINDOWS)
153
154 def test_rlimit(self):
155 self.assertEqual(hasattr(psutil.Process, "rlimit"), LINUX)
156
157 def test_io_counters(self):
158 hasit = hasattr(psutil.Process, "io_counters")
159 self.assertEqual(hasit, False if MACOS or SUNOS else True)
160
161 def test_num_fds(self):
162 self.assertEqual(hasattr(psutil.Process, "num_fds"), POSIX)
163
164 def test_num_handles(self):
165 self.assertEqual(hasattr(psutil.Process, "num_handles"), WINDOWS)
166
167 def test_cpu_affinity(self):
168 self.assertEqual(hasattr(psutil.Process, "cpu_affinity"),
169 LINUX or WINDOWS or FREEBSD)
170
171 def test_cpu_num(self):
172 self.assertEqual(hasattr(psutil.Process, "cpu_num"),
173 LINUX or FREEBSD or SUNOS)
174
175 def test_memory_maps(self):
176 hasit = hasattr(psutil.Process, "memory_maps")
177 self.assertEqual(
178 hasit, False if OPENBSD or NETBSD or AIX or MACOS else True)
179
180
181 # ===================================================================
182 # --- System API types
183 # ===================================================================
184
185
186 class TestSystemAPITypes(unittest.TestCase):
187 """Check the return types of system related APIs.
188 Mainly we want to test we never return unicode on Python 2, see:
189 https://github.com/giampaolo/psutil/issues/1039
190 """
191
192 @classmethod
193 def setUpClass(cls):
194 cls.proc = psutil.Process()
195
196 def tearDown(self):
197 safe_rmpath(TESTFN)
198
199 def assert_ntuple_of_nums(self, nt, type_=float, gezero=True):
200 assert is_namedtuple(nt)
201 for n in nt:
202 self.assertIsInstance(n, type_)
203 if gezero:
204 self.assertGreaterEqual(n, 0)
205
206 def test_cpu_times(self):
207 self.assert_ntuple_of_nums(psutil.cpu_times())
208 for nt in psutil.cpu_times(percpu=True):
209 self.assert_ntuple_of_nums(nt)
210
211 def test_cpu_percent(self):
212 self.assertIsInstance(psutil.cpu_percent(interval=None), float)
213 self.assertIsInstance(psutil.cpu_percent(interval=0.00001), float)
214
215 def test_cpu_times_percent(self):
216 self.assert_ntuple_of_nums(psutil.cpu_times_percent(interval=None))
217 self.assert_ntuple_of_nums(psutil.cpu_times_percent(interval=0.0001))
218
219 def test_cpu_count(self):
220 self.assertIsInstance(psutil.cpu_count(), int)
221
222 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
223 def test_cpu_freq(self):
224 if psutil.cpu_freq() is None:
225 raise self.skipTest("cpu_freq() returns None")
226 self.assert_ntuple_of_nums(psutil.cpu_freq(), type_=(float, int, long))
227
228 def test_disk_io_counters(self):
229 # Duplicate of test_system.py. Keep it anyway.
230 for k, v in psutil.disk_io_counters(perdisk=True).items():
231 self.assertIsInstance(k, str)
232 self.assert_ntuple_of_nums(v, type_=(int, long))
233
234 def test_disk_partitions(self):
235 # Duplicate of test_system.py. Keep it anyway.
236 for disk in psutil.disk_partitions():
237 self.assertIsInstance(disk.device, str)
238 self.assertIsInstance(disk.mountpoint, str)
239 self.assertIsInstance(disk.fstype, str)
240 self.assertIsInstance(disk.opts, str)
241
242 @unittest.skipIf(SKIP_SYSCONS, "requires root")
243 def test_net_connections(self):
244 with create_sockets():
245 ret = psutil.net_connections('all')
246 self.assertEqual(len(ret), len(set(ret)))
247 for conn in ret:
248 assert is_namedtuple(conn)
249
250 def test_net_if_addrs(self):
251 # Duplicate of test_system.py. Keep it anyway.
252 for ifname, addrs in psutil.net_if_addrs().items():
253 self.assertIsInstance(ifname, str)
254 for addr in addrs:
255 if enum is not None:
256 assert isinstance(addr.family, enum.IntEnum), addr
257 else:
258 assert isinstance(addr.family, int), addr
259 self.assertIsInstance(addr.address, str)
260 self.assertIsInstance(addr.netmask, (str, type(None)))
261 self.assertIsInstance(addr.broadcast, (str, type(None)))
262
263 def test_net_if_stats(self):
264 # Duplicate of test_system.py. Keep it anyway.
265 for ifname, info in psutil.net_if_stats().items():
266 self.assertIsInstance(ifname, str)
267 self.assertIsInstance(info.isup, bool)
268 if enum is not None:
269 self.assertIsInstance(info.duplex, enum.IntEnum)
270 else:
271 self.assertIsInstance(info.duplex, int)
272 self.assertIsInstance(info.speed, int)
273 self.assertIsInstance(info.mtu, int)
274
275 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
276 def test_net_io_counters(self):
277 # Duplicate of test_system.py. Keep it anyway.
278 for ifname, _ in psutil.net_io_counters(pernic=True).items():
279 self.assertIsInstance(ifname, str)
280
281 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
282 def test_sensors_fans(self):
283 # Duplicate of test_system.py. Keep it anyway.
284 for name, units in psutil.sensors_fans().items():
285 self.assertIsInstance(name, str)
286 for unit in units:
287 self.assertIsInstance(unit.label, str)
288 self.assertIsInstance(unit.current, (float, int, type(None)))
289
290 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
291 def test_sensors_temperatures(self):
292 # Duplicate of test_system.py. Keep it anyway.
293 for name, units in psutil.sensors_temperatures().items():
294 self.assertIsInstance(name, str)
295 for unit in units:
296 self.assertIsInstance(unit.label, str)
297 self.assertIsInstance(unit.current, (float, int, type(None)))
298 self.assertIsInstance(unit.high, (float, int, type(None)))
299 self.assertIsInstance(unit.critical, (float, int, type(None)))
300
301 def test_boot_time(self):
302 # Duplicate of test_system.py. Keep it anyway.
303 self.assertIsInstance(psutil.boot_time(), float)
304
305 def test_users(self):
306 # Duplicate of test_system.py. Keep it anyway.
307 for user in psutil.users():
308 self.assertIsInstance(user.name, str)
309 self.assertIsInstance(user.terminal, (str, type(None)))
310 self.assertIsInstance(user.host, (str, type(None)))
311 self.assertIsInstance(user.pid, (int, type(None)))
312
313
314 # ===================================================================
315 # --- Featch all processes test
316 # ===================================================================
317
318
319 class TestFetchAllProcesses(unittest.TestCase):
320 """Test which iterates over all running processes and performs
321 some sanity checks against Process API's returned values.
322 """
323
324 def get_attr_names(self):
325 excluded_names = set([
326 'send_signal', 'suspend', 'resume', 'terminate', 'kill', 'wait',
327 'as_dict', 'parent', 'parents', 'children', 'memory_info_ex',
328 'oneshot',
329 ])
330 if LINUX and not HAS_RLIMIT:
331 excluded_names.add('rlimit')
332 attrs = []
333 for name in dir(psutil.Process):
334 if name.startswith("_"):
335 continue
336 if name in excluded_names:
337 continue
338 attrs.append(name)
339 return attrs
340
341 def iter_procs(self):
342 attrs = self.get_attr_names()
343 for p in psutil.process_iter():
344 with p.oneshot():
345 for name in attrs:
346 yield (p, name)
347
348 def call_meth(self, p, name):
349 args = ()
350 kwargs = {}
351 attr = getattr(p, name, None)
352 if attr is not None and callable(attr):
353 if name == 'rlimit':
354 args = (psutil.RLIMIT_NOFILE,)
355 elif name == 'memory_maps':
356 kwargs = {'grouped': False}
357 return attr(*args, **kwargs)
358 else:
359 return attr
360
361 def test_fetch_all(self):
362 valid_procs = 0
363 default = object()
364 failures = []
365 for p, name in self.iter_procs():
366 ret = default
367 try:
368 ret = self.call_meth(p, name)
369 except NotImplementedError:
370 msg = "%r was skipped because not implemented" % (
371 self.__class__.__name__ + '.test_' + name)
372 warn(msg)
373 except (psutil.NoSuchProcess, psutil.AccessDenied) as err:
374 self.assertEqual(err.pid, p.pid)
375 if err.name:
376 # make sure exception's name attr is set
377 # with the actual process name
378 self.assertEqual(err.name, p.name())
379 assert str(err)
380 assert err.msg
381 except Exception:
382 s = '\n' + '=' * 70 + '\n'
383 s += "FAIL: test_%s (proc=%s" % (name, p)
384 if ret != default:
385 s += ", ret=%s)" % repr(ret)
386 s += ')\n'
387 s += '-' * 70
388 s += "\n%s" % traceback.format_exc()
389 s = "\n".join((" " * 4) + i for i in s.splitlines())
390 s += '\n'
391 failures.append(s)
392 break
393 else:
394 valid_procs += 1
395 if ret not in (0, 0.0, [], None, '', {}):
396 assert ret, ret
397 meth = getattr(self, name)
398 meth(ret, p)
399
400 if failures:
401 self.fail(''.join(failures))
402
403 # we should always have a non-empty list, not including PID 0 etc.
404 # special cases.
405 assert valid_procs
406
407 def cmdline(self, ret, proc):
408 self.assertIsInstance(ret, list)
409 for part in ret:
410 self.assertIsInstance(part, str)
411
412 def exe(self, ret, proc):
413 self.assertIsInstance(ret, (str, type(None)))
414 if not ret:
415 self.assertEqual(ret, '')
416 else:
417 if WINDOWS and not ret.endswith('.exe'):
418 return # May be "Registry", "MemCompression", ...
419 assert os.path.isabs(ret), ret
420 # Note: os.stat() may return False even if the file is there
421 # hence we skip the test, see:
422 # http://stackoverflow.com/questions/3112546/os-path-exists-lies
423 if POSIX and os.path.isfile(ret):
424 if hasattr(os, 'access') and hasattr(os, "X_OK"):
425 # XXX may fail on MACOS
426 assert os.access(ret, os.X_OK)
427
428 def pid(self, ret, proc):
429 self.assertIsInstance(ret, int)
430 self.assertGreaterEqual(ret, 0)
431
432 def ppid(self, ret, proc):
433 self.assertIsInstance(ret, (int, long))
434 self.assertGreaterEqual(ret, 0)
435
436 def name(self, ret, proc):
437 self.assertIsInstance(ret, str)
438 # on AIX, "<exiting>" processes don't have names
439 if not AIX:
440 assert ret
441
442 def create_time(self, ret, proc):
443 self.assertIsInstance(ret, float)
444 try:
445 self.assertGreaterEqual(ret, 0)
446 except AssertionError:
447 # XXX
448 if OPENBSD and proc.status() == psutil.STATUS_ZOMBIE:
449 pass
450 else:
451 raise
452 # this can't be taken for granted on all platforms
453 # self.assertGreaterEqual(ret, psutil.boot_time())
454 # make sure returned value can be pretty printed
455 # with strftime
456 time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
457
458 def uids(self, ret, proc):
459 assert is_namedtuple(ret)
460 for uid in ret:
461 self.assertIsInstance(uid, int)
462 self.assertGreaterEqual(uid, 0)
463
464 def gids(self, ret, proc):
465 assert is_namedtuple(ret)
466 # note: testing all gids as above seems not to be reliable for
467 # gid == 30 (nodoby); not sure why.
468 for gid in ret:
469 self.assertIsInstance(gid, int)
470 if not MACOS and not NETBSD:
471 self.assertGreaterEqual(gid, 0)
472
473 def username(self, ret, proc):
474 self.assertIsInstance(ret, str)
475 assert ret
476
477 def status(self, ret, proc):
478 self.assertIsInstance(ret, str)
479 assert ret
480 self.assertNotEqual(ret, '?') # XXX
481 self.assertIn(ret, VALID_PROC_STATUSES)
482
483 def io_counters(self, ret, proc):
484 assert is_namedtuple(ret)
485 for field in ret:
486 self.assertIsInstance(field, (int, long))
487 if field != -1:
488 self.assertGreaterEqual(field, 0)
489
490 def ionice(self, ret, proc):
491 if LINUX:
492 self.assertIsInstance(ret.ioclass, int)
493 self.assertIsInstance(ret.value, int)
494 self.assertGreaterEqual(ret.ioclass, 0)
495 self.assertGreaterEqual(ret.value, 0)
496 else: # Windows, Cygwin
497 choices = [
498 psutil.IOPRIO_VERYLOW,
499 psutil.IOPRIO_LOW,
500 psutil.IOPRIO_NORMAL,
501 psutil.IOPRIO_HIGH]
502 self.assertIsInstance(ret, int)
503 self.assertGreaterEqual(ret, 0)
504 self.assertIn(ret, choices)
505
506 def num_threads(self, ret, proc):
507 self.assertIsInstance(ret, int)
508 self.assertGreaterEqual(ret, 1)
509
510 def threads(self, ret, proc):
511 self.assertIsInstance(ret, list)
512 for t in ret:
513 assert is_namedtuple(t)
514 self.assertGreaterEqual(t.id, 0)
515 self.assertGreaterEqual(t.user_time, 0)
516 self.assertGreaterEqual(t.system_time, 0)
517 for field in t:
518 self.assertIsInstance(field, (int, float))
519
520 def cpu_times(self, ret, proc):
521 assert is_namedtuple(ret)
522 for n in ret:
523 self.assertIsInstance(n, float)
524 self.assertGreaterEqual(n, 0)
525 # TODO: check ntuple fields
526
527 def cpu_percent(self, ret, proc):
528 self.assertIsInstance(ret, float)
529 assert 0.0 <= ret <= 100.0, ret
530
531 def cpu_num(self, ret, proc):
532 self.assertIsInstance(ret, int)
533 if FREEBSD and ret == -1:
534 return
535 self.assertGreaterEqual(ret, 0)
536 if psutil.cpu_count() == 1:
537 self.assertEqual(ret, 0)
538 self.assertIn(ret, list(range(psutil.cpu_count())))
539
540 def memory_info(self, ret, proc):
541 assert is_namedtuple(ret)
542 for value in ret:
543 self.assertIsInstance(value, (int, long))
544 self.assertGreaterEqual(value, 0)
545 if WINDOWS:
546 self.assertGreaterEqual(ret.peak_wset, ret.wset)
547 self.assertGreaterEqual(ret.peak_paged_pool, ret.paged_pool)
548 self.assertGreaterEqual(ret.peak_nonpaged_pool, ret.nonpaged_pool)
549 self.assertGreaterEqual(ret.peak_pagefile, ret.pagefile)
550
551 def memory_full_info(self, ret, proc):
552 assert is_namedtuple(ret)
553 total = psutil.virtual_memory().total
554 for name in ret._fields:
555 value = getattr(ret, name)
556 self.assertIsInstance(value, (int, long))
557 self.assertGreaterEqual(value, 0, msg=(name, value))
558 if LINUX or OSX and name in ('vms', 'data'):
559 # On Linux there are processes (e.g. 'goa-daemon') whose
560 # VMS is incredibly high for some reason.
561 continue
562 self.assertLessEqual(value, total, msg=(name, value, total))
563
564 if LINUX:
565 self.assertGreaterEqual(ret.pss, ret.uss)
566
567 def open_files(self, ret, proc):
568 self.assertIsInstance(ret, list)
569 for f in ret:
570 self.assertIsInstance(f.fd, int)
571 self.assertIsInstance(f.path, str)
572 if WINDOWS:
573 self.assertEqual(f.fd, -1)
574 elif LINUX:
575 self.assertIsInstance(f.position, int)
576 self.assertIsInstance(f.mode, str)
577 self.assertIsInstance(f.flags, int)
578 self.assertGreaterEqual(f.position, 0)
579 self.assertIn(f.mode, ('r', 'w', 'a', 'r+', 'a+'))
580 self.assertGreater(f.flags, 0)
581 elif BSD and not f.path:
582 # XXX see: https://github.com/giampaolo/psutil/issues/595
583 continue
584 assert os.path.isabs(f.path), f
585 assert os.path.isfile(f.path), f
586
587 def num_fds(self, ret, proc):
588 self.assertIsInstance(ret, int)
589 self.assertGreaterEqual(ret, 0)
590
591 def connections(self, ret, proc):
592 with create_sockets():
593 self.assertEqual(len(ret), len(set(ret)))
594 for conn in ret:
595 assert is_namedtuple(conn)
596
597 def cwd(self, ret, proc):
598 if ret: # 'ret' can be None or empty
599 self.assertIsInstance(ret, str)
600 assert os.path.isabs(ret), ret
601 try:
602 st = os.stat(ret)
603 except OSError as err:
604 if WINDOWS and err.errno in \
605 psutil._psplatform.ACCESS_DENIED_SET:
606 pass
607 # directory has been removed in mean time
608 elif err.errno != errno.ENOENT:
609 raise
610 else:
611 assert stat.S_ISDIR(st.st_mode)
612
613 def memory_percent(self, ret, proc):
614 self.assertIsInstance(ret, float)
615 assert 0 <= ret <= 100, ret
616
617 def is_running(self, ret, proc):
618 self.assertIsInstance(ret, bool)
619
620 def cpu_affinity(self, ret, proc):
621 self.assertIsInstance(ret, list)
622 assert ret != [], ret
623 cpus = range(psutil.cpu_count())
624 for n in ret:
625 self.assertIsInstance(n, int)
626 self.assertIn(n, cpus)
627
628 def terminal(self, ret, proc):
629 self.assertIsInstance(ret, (str, type(None)))
630 if ret is not None:
631 assert os.path.isabs(ret), ret
632 assert os.path.exists(ret), ret
633
634 def memory_maps(self, ret, proc):
635 for nt in ret:
636 self.assertIsInstance(nt.addr, str)
637 self.assertIsInstance(nt.perms, str)
638 self.assertIsInstance(nt.path, str)
639 for fname in nt._fields:
640 value = getattr(nt, fname)
641 if fname == 'path':
642 if not value.startswith('['):
643 assert os.path.isabs(nt.path), nt.path
644 # commented as on Linux we might get
645 # '/foo/bar (deleted)'
646 # assert os.path.exists(nt.path), nt.path
647 elif fname == 'addr':
648 assert value, repr(value)
649 elif fname == 'perms':
650 if not WINDOWS:
651 assert value, repr(value)
652 else:
653 self.assertIsInstance(value, (int, long))
654 self.assertGreaterEqual(value, 0)
655
656 def num_handles(self, ret, proc):
657 self.assertIsInstance(ret, int)
658 self.assertGreaterEqual(ret, 0)
659
660 def nice(self, ret, proc):
661 self.assertIsInstance(ret, int)
662 if POSIX:
663 assert -20 <= ret <= 20, ret
664 else:
665 priorities = [getattr(psutil, x) for x in dir(psutil)
666 if x.endswith('_PRIORITY_CLASS')]
667 self.assertIn(ret, priorities)
668
669 def num_ctx_switches(self, ret, proc):
670 assert is_namedtuple(ret)
671 for value in ret:
672 self.assertIsInstance(value, (int, long))
673 self.assertGreaterEqual(value, 0)
674
675 def rlimit(self, ret, proc):
676 self.assertIsInstance(ret, tuple)
677 self.assertEqual(len(ret), 2)
678 self.assertGreaterEqual(ret[0], -1)
679 self.assertGreaterEqual(ret[1], -1)
680
681 def environ(self, ret, proc):
682 self.assertIsInstance(ret, dict)
683 for k, v in ret.items():
684 self.assertIsInstance(k, str)
685 self.assertIsInstance(v, str)
686
687
688 if __name__ == '__main__':
689 from psutil.tests.runner import run
690 run(__file__)