comparison env/lib/python3.7/site-packages/psutil/tests/test_misc.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
5 # Use of this source code is governed by a BSD-style license that can be
6 # found in the LICENSE file.
7
8 """
9 Miscellaneous tests.
10 """
11
12 import ast
13 import collections
14 import contextlib
15 import errno
16 import json
17 import os
18 import pickle
19 import socket
20 import stat
21
22 from psutil import FREEBSD
23 from psutil import LINUX
24 from psutil import NETBSD
25 from psutil import POSIX
26 from psutil import WINDOWS
27 from psutil._common import memoize
28 from psutil._common import memoize_when_activated
29 from psutil._common import supports_ipv6
30 from psutil._common import wrap_numbers
31 from psutil._common import open_text
32 from psutil._common import open_binary
33 from psutil._compat import PY3
34 from psutil.tests import APPVEYOR
35 from psutil.tests import bind_socket
36 from psutil.tests import bind_unix_socket
37 from psutil.tests import call_until
38 from psutil.tests import chdir
39 from psutil.tests import CI_TESTING
40 from psutil.tests import create_proc_children_pair
41 from psutil.tests import create_sockets
42 from psutil.tests import create_zombie_proc
43 from psutil.tests import DEVNULL
44 from psutil.tests import get_free_port
45 from psutil.tests import get_test_subprocess
46 from psutil.tests import HAS_BATTERY
47 from psutil.tests import HAS_CONNECTIONS_UNIX
48 from psutil.tests import HAS_MEMORY_MAPS
49 from psutil.tests import HAS_NET_IO_COUNTERS
50 from psutil.tests import HAS_SENSORS_BATTERY
51 from psutil.tests import HAS_SENSORS_FANS
52 from psutil.tests import HAS_SENSORS_TEMPERATURES
53 from psutil.tests import import_module_by_path
54 from psutil.tests import is_namedtuple
55 from psutil.tests import mock
56 from psutil.tests import PYTHON_EXE
57 from psutil.tests import reap_children
58 from psutil.tests import reload_module
59 from psutil.tests import retry
60 from psutil.tests import ROOT_DIR
61 from psutil.tests import safe_mkdir
62 from psutil.tests import safe_rmpath
63 from psutil.tests import SCRIPTS_DIR
64 from psutil.tests import sh
65 from psutil.tests import tcp_socketpair
66 from psutil.tests import TESTFN
67 from psutil.tests import TOX
68 from psutil.tests import TRAVIS
69 from psutil.tests import unittest
70 from psutil.tests import unix_socket_path
71 from psutil.tests import unix_socketpair
72 from psutil.tests import wait_for_file
73 from psutil.tests import wait_for_pid
74 import psutil
75 import psutil.tests
76
77
78 # ===================================================================
79 # --- Misc / generic tests.
80 # ===================================================================
81
82
83 class TestMisc(unittest.TestCase):
84
85 def test_process__repr__(self, func=repr):
86 p = psutil.Process()
87 r = func(p)
88 self.assertIn("psutil.Process", r)
89 self.assertIn("pid=%s" % p.pid, r)
90 self.assertIn("name=", r)
91 self.assertIn(p.name(), r)
92 with mock.patch.object(psutil.Process, "name",
93 side_effect=psutil.ZombieProcess(os.getpid())):
94 p = psutil.Process()
95 r = func(p)
96 self.assertIn("pid=%s" % p.pid, r)
97 self.assertIn("zombie", r)
98 self.assertNotIn("name=", r)
99 with mock.patch.object(psutil.Process, "name",
100 side_effect=psutil.NoSuchProcess(os.getpid())):
101 p = psutil.Process()
102 r = func(p)
103 self.assertIn("pid=%s" % p.pid, r)
104 self.assertIn("terminated", r)
105 self.assertNotIn("name=", r)
106 with mock.patch.object(psutil.Process, "name",
107 side_effect=psutil.AccessDenied(os.getpid())):
108 p = psutil.Process()
109 r = func(p)
110 self.assertIn("pid=%s" % p.pid, r)
111 self.assertNotIn("name=", r)
112
113 def test_process__str__(self):
114 self.test_process__repr__(func=str)
115
116 def test_no_such_process__repr__(self, func=repr):
117 self.assertEqual(
118 repr(psutil.NoSuchProcess(321)),
119 "psutil.NoSuchProcess process no longer exists (pid=321)")
120 self.assertEqual(
121 repr(psutil.NoSuchProcess(321, name='foo')),
122 "psutil.NoSuchProcess process no longer exists (pid=321, "
123 "name='foo')")
124 self.assertEqual(
125 repr(psutil.NoSuchProcess(321, msg='foo')),
126 "psutil.NoSuchProcess foo")
127
128 def test_zombie_process__repr__(self, func=repr):
129 self.assertEqual(
130 repr(psutil.ZombieProcess(321)),
131 "psutil.ZombieProcess process still exists but it's a zombie "
132 "(pid=321)")
133 self.assertEqual(
134 repr(psutil.ZombieProcess(321, name='foo')),
135 "psutil.ZombieProcess process still exists but it's a zombie "
136 "(pid=321, name='foo')")
137 self.assertEqual(
138 repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
139 "psutil.ZombieProcess process still exists but it's a zombie "
140 "(pid=321, name='foo', ppid=1)")
141 self.assertEqual(
142 repr(psutil.ZombieProcess(321, msg='foo')),
143 "psutil.ZombieProcess foo")
144
145 def test_access_denied__repr__(self, func=repr):
146 self.assertEqual(
147 repr(psutil.AccessDenied(321)),
148 "psutil.AccessDenied (pid=321)")
149 self.assertEqual(
150 repr(psutil.AccessDenied(321, name='foo')),
151 "psutil.AccessDenied (pid=321, name='foo')")
152 self.assertEqual(
153 repr(psutil.AccessDenied(321, msg='foo')),
154 "psutil.AccessDenied foo")
155
156 def test_timeout_expired__repr__(self, func=repr):
157 self.assertEqual(
158 repr(psutil.TimeoutExpired(321)),
159 "psutil.TimeoutExpired timeout after 321 seconds")
160 self.assertEqual(
161 repr(psutil.TimeoutExpired(321, pid=111)),
162 "psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
163 self.assertEqual(
164 repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
165 "psutil.TimeoutExpired timeout after 321 seconds "
166 "(pid=111, name='foo')")
167
168 def test_process__eq__(self):
169 p1 = psutil.Process()
170 p2 = psutil.Process()
171 self.assertEqual(p1, p2)
172 p2._ident = (0, 0)
173 self.assertNotEqual(p1, p2)
174 self.assertNotEqual(p1, 'foo')
175
176 def test_process__hash__(self):
177 s = set([psutil.Process(), psutil.Process()])
178 self.assertEqual(len(s), 1)
179
180 def test__all__(self):
181 dir_psutil = dir(psutil)
182 for name in dir_psutil:
183 if name in ('callable', 'error', 'namedtuple', 'tests',
184 'long', 'test', 'NUM_CPUS', 'BOOT_TIME',
185 'TOTAL_PHYMEM', 'PermissionError',
186 'ProcessLookupError'):
187 continue
188 if not name.startswith('_'):
189 try:
190 __import__(name)
191 except ImportError:
192 if name not in psutil.__all__:
193 fun = getattr(psutil, name)
194 if fun is None:
195 continue
196 if (fun.__doc__ is not None and
197 'deprecated' not in fun.__doc__.lower()):
198 self.fail('%r not in psutil.__all__' % name)
199
200 # Import 'star' will break if __all__ is inconsistent, see:
201 # https://github.com/giampaolo/psutil/issues/656
202 # Can't do `from psutil import *` as it won't work on python 3
203 # so we simply iterate over __all__.
204 for name in psutil.__all__:
205 self.assertIn(name, dir_psutil)
206
207 def test_version(self):
208 self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
209 psutil.__version__)
210
211 def test_process_as_dict_no_new_names(self):
212 # See https://github.com/giampaolo/psutil/issues/813
213 p = psutil.Process()
214 p.foo = '1'
215 self.assertNotIn('foo', p.as_dict())
216
217 def test_memoize(self):
218 @memoize
219 def foo(*args, **kwargs):
220 "foo docstring"
221 calls.append(None)
222 return (args, kwargs)
223
224 calls = []
225 # no args
226 for x in range(2):
227 ret = foo()
228 expected = ((), {})
229 self.assertEqual(ret, expected)
230 self.assertEqual(len(calls), 1)
231 # with args
232 for x in range(2):
233 ret = foo(1)
234 expected = ((1, ), {})
235 self.assertEqual(ret, expected)
236 self.assertEqual(len(calls), 2)
237 # with args + kwargs
238 for x in range(2):
239 ret = foo(1, bar=2)
240 expected = ((1, ), {'bar': 2})
241 self.assertEqual(ret, expected)
242 self.assertEqual(len(calls), 3)
243 # clear cache
244 foo.cache_clear()
245 ret = foo()
246 expected = ((), {})
247 self.assertEqual(ret, expected)
248 self.assertEqual(len(calls), 4)
249 # docstring
250 self.assertEqual(foo.__doc__, "foo docstring")
251
252 def test_memoize_when_activated(self):
253 class Foo:
254
255 @memoize_when_activated
256 def foo(self):
257 calls.append(None)
258
259 f = Foo()
260 calls = []
261 f.foo()
262 f.foo()
263 self.assertEqual(len(calls), 2)
264
265 # activate
266 calls = []
267 f.foo.cache_activate(f)
268 f.foo()
269 f.foo()
270 self.assertEqual(len(calls), 1)
271
272 # deactivate
273 calls = []
274 f.foo.cache_deactivate(f)
275 f.foo()
276 f.foo()
277 self.assertEqual(len(calls), 2)
278
279 def test_parse_environ_block(self):
280 from psutil._common import parse_environ_block
281
282 def k(s):
283 return s.upper() if WINDOWS else s
284
285 self.assertEqual(parse_environ_block("a=1\0"),
286 {k("a"): "1"})
287 self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
288 {k("a"): "1", k("b"): "2"})
289 self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
290 {k("a"): "1", k("b"): ""})
291 # ignore everything after \0\0
292 self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
293 {k("a"): "1", k("b"): "2"})
294 # ignore everything that is not an assignment
295 self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
296 self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
297 # do not fail if the block is incomplete
298 self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
299
300 def test_supports_ipv6(self):
301 self.addCleanup(supports_ipv6.cache_clear)
302 if supports_ipv6():
303 with mock.patch('psutil._common.socket') as s:
304 s.has_ipv6 = False
305 supports_ipv6.cache_clear()
306 assert not supports_ipv6()
307
308 supports_ipv6.cache_clear()
309 with mock.patch('psutil._common.socket.socket',
310 side_effect=socket.error) as s:
311 assert not supports_ipv6()
312 assert s.called
313
314 supports_ipv6.cache_clear()
315 with mock.patch('psutil._common.socket.socket',
316 side_effect=socket.gaierror) as s:
317 assert not supports_ipv6()
318 supports_ipv6.cache_clear()
319 assert s.called
320
321 supports_ipv6.cache_clear()
322 with mock.patch('psutil._common.socket.socket.bind',
323 side_effect=socket.gaierror) as s:
324 assert not supports_ipv6()
325 supports_ipv6.cache_clear()
326 assert s.called
327 else:
328 with self.assertRaises(Exception):
329 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
330 sock.bind(("::1", 0))
331
332 def test_isfile_strict(self):
333 from psutil._common import isfile_strict
334 this_file = os.path.abspath(__file__)
335 assert isfile_strict(this_file)
336 assert not isfile_strict(os.path.dirname(this_file))
337 with mock.patch('psutil._common.os.stat',
338 side_effect=OSError(errno.EPERM, "foo")):
339 self.assertRaises(OSError, isfile_strict, this_file)
340 with mock.patch('psutil._common.os.stat',
341 side_effect=OSError(errno.EACCES, "foo")):
342 self.assertRaises(OSError, isfile_strict, this_file)
343 with mock.patch('psutil._common.os.stat',
344 side_effect=OSError(errno.EINVAL, "foo")):
345 assert not isfile_strict(this_file)
346 with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
347 assert not isfile_strict(this_file)
348
349 def test_serialization(self):
350 def check(ret):
351 if json is not None:
352 json.loads(json.dumps(ret))
353 a = pickle.dumps(ret)
354 b = pickle.loads(a)
355 self.assertEqual(ret, b)
356
357 check(psutil.Process().as_dict())
358 check(psutil.virtual_memory())
359 check(psutil.swap_memory())
360 check(psutil.cpu_times())
361 check(psutil.cpu_times_percent(interval=0))
362 check(psutil.net_io_counters())
363 if LINUX and not os.path.exists('/proc/diskstats'):
364 pass
365 else:
366 if not APPVEYOR:
367 check(psutil.disk_io_counters())
368 check(psutil.disk_partitions())
369 check(psutil.disk_usage(os.getcwd()))
370 check(psutil.users())
371
372 def test_setup_script(self):
373 setup_py = os.path.join(ROOT_DIR, 'setup.py')
374 if TRAVIS and not os.path.exists(setup_py):
375 return self.skipTest("can't find setup.py")
376 module = import_module_by_path(setup_py)
377 self.assertRaises(SystemExit, module.setup)
378 self.assertEqual(module.get_version(), psutil.__version__)
379
380 def test_ad_on_process_creation(self):
381 # We are supposed to be able to instantiate Process also in case
382 # of zombie processes or access denied.
383 with mock.patch.object(psutil.Process, 'create_time',
384 side_effect=psutil.AccessDenied) as meth:
385 psutil.Process()
386 assert meth.called
387 with mock.patch.object(psutil.Process, 'create_time',
388 side_effect=psutil.ZombieProcess(1)) as meth:
389 psutil.Process()
390 assert meth.called
391 with mock.patch.object(psutil.Process, 'create_time',
392 side_effect=ValueError) as meth:
393 with self.assertRaises(ValueError):
394 psutil.Process()
395 assert meth.called
396
397 def test_sanity_version_check(self):
398 # see: https://github.com/giampaolo/psutil/issues/564
399 with mock.patch(
400 "psutil._psplatform.cext.version", return_value="0.0.0"):
401 with self.assertRaises(ImportError) as cm:
402 reload_module(psutil)
403 self.assertIn("version conflict", str(cm.exception).lower())
404
405
406 # ===================================================================
407 # --- Tests for wrap_numbers() function.
408 # ===================================================================
409
410
411 nt = collections.namedtuple('foo', 'a b c')
412
413
414 class TestWrapNumbers(unittest.TestCase):
415
416 def setUp(self):
417 wrap_numbers.cache_clear()
418
419 tearDown = setUp
420
421 def test_first_call(self):
422 input = {'disk1': nt(5, 5, 5)}
423 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
424
425 def test_input_hasnt_changed(self):
426 input = {'disk1': nt(5, 5, 5)}
427 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
428 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
429
430 def test_increase_but_no_wrap(self):
431 input = {'disk1': nt(5, 5, 5)}
432 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
433 input = {'disk1': nt(10, 15, 20)}
434 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
435 input = {'disk1': nt(20, 25, 30)}
436 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
437 input = {'disk1': nt(20, 25, 30)}
438 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
439
440 def test_wrap(self):
441 # let's say 100 is the threshold
442 input = {'disk1': nt(100, 100, 100)}
443 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
444 # first wrap restarts from 10
445 input = {'disk1': nt(100, 100, 10)}
446 self.assertEqual(wrap_numbers(input, 'disk_io'),
447 {'disk1': nt(100, 100, 110)})
448 # then it remains the same
449 input = {'disk1': nt(100, 100, 10)}
450 self.assertEqual(wrap_numbers(input, 'disk_io'),
451 {'disk1': nt(100, 100, 110)})
452 # then it goes up
453 input = {'disk1': nt(100, 100, 90)}
454 self.assertEqual(wrap_numbers(input, 'disk_io'),
455 {'disk1': nt(100, 100, 190)})
456 # then it wraps again
457 input = {'disk1': nt(100, 100, 20)}
458 self.assertEqual(wrap_numbers(input, 'disk_io'),
459 {'disk1': nt(100, 100, 210)})
460 # and remains the same
461 input = {'disk1': nt(100, 100, 20)}
462 self.assertEqual(wrap_numbers(input, 'disk_io'),
463 {'disk1': nt(100, 100, 210)})
464 # now wrap another num
465 input = {'disk1': nt(50, 100, 20)}
466 self.assertEqual(wrap_numbers(input, 'disk_io'),
467 {'disk1': nt(150, 100, 210)})
468 # and again
469 input = {'disk1': nt(40, 100, 20)}
470 self.assertEqual(wrap_numbers(input, 'disk_io'),
471 {'disk1': nt(190, 100, 210)})
472 # keep it the same
473 input = {'disk1': nt(40, 100, 20)}
474 self.assertEqual(wrap_numbers(input, 'disk_io'),
475 {'disk1': nt(190, 100, 210)})
476
477 def test_changing_keys(self):
478 # Emulate a case where the second call to disk_io()
479 # (or whatever) provides a new disk, then the new disk
480 # disappears on the third call.
481 input = {'disk1': nt(5, 5, 5)}
482 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
483 input = {'disk1': nt(5, 5, 5),
484 'disk2': nt(7, 7, 7)}
485 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
486 input = {'disk1': nt(8, 8, 8)}
487 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
488
489 def test_changing_keys_w_wrap(self):
490 input = {'disk1': nt(50, 50, 50),
491 'disk2': nt(100, 100, 100)}
492 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
493 # disk 2 wraps
494 input = {'disk1': nt(50, 50, 50),
495 'disk2': nt(100, 100, 10)}
496 self.assertEqual(wrap_numbers(input, 'disk_io'),
497 {'disk1': nt(50, 50, 50),
498 'disk2': nt(100, 100, 110)})
499 # disk 2 disappears
500 input = {'disk1': nt(50, 50, 50)}
501 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
502
503 # then it appears again; the old wrap is supposed to be
504 # gone.
505 input = {'disk1': nt(50, 50, 50),
506 'disk2': nt(100, 100, 100)}
507 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
508 # remains the same
509 input = {'disk1': nt(50, 50, 50),
510 'disk2': nt(100, 100, 100)}
511 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
512 # and then wraps again
513 input = {'disk1': nt(50, 50, 50),
514 'disk2': nt(100, 100, 10)}
515 self.assertEqual(wrap_numbers(input, 'disk_io'),
516 {'disk1': nt(50, 50, 50),
517 'disk2': nt(100, 100, 110)})
518
519 def test_real_data(self):
520 d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
521 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
522 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
523 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
524 self.assertEqual(wrap_numbers(d, 'disk_io'), d)
525 self.assertEqual(wrap_numbers(d, 'disk_io'), d)
526 # decrease this ↓
527 d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
528 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
529 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
530 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
531 out = wrap_numbers(d, 'disk_io')
532 self.assertEqual(out['nvme0n1'][0], 400)
533
534 # --- cache tests
535
536 def test_cache_first_call(self):
537 input = {'disk1': nt(5, 5, 5)}
538 wrap_numbers(input, 'disk_io')
539 cache = wrap_numbers.cache_info()
540 self.assertEqual(cache[0], {'disk_io': input})
541 self.assertEqual(cache[1], {'disk_io': {}})
542 self.assertEqual(cache[2], {'disk_io': {}})
543
544 def test_cache_call_twice(self):
545 input = {'disk1': nt(5, 5, 5)}
546 wrap_numbers(input, 'disk_io')
547 input = {'disk1': nt(10, 10, 10)}
548 wrap_numbers(input, 'disk_io')
549 cache = wrap_numbers.cache_info()
550 self.assertEqual(cache[0], {'disk_io': input})
551 self.assertEqual(
552 cache[1],
553 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
554 self.assertEqual(cache[2], {'disk_io': {}})
555
556 def test_cache_wrap(self):
557 # let's say 100 is the threshold
558 input = {'disk1': nt(100, 100, 100)}
559 wrap_numbers(input, 'disk_io')
560
561 # first wrap restarts from 10
562 input = {'disk1': nt(100, 100, 10)}
563 wrap_numbers(input, 'disk_io')
564 cache = wrap_numbers.cache_info()
565 self.assertEqual(cache[0], {'disk_io': input})
566 self.assertEqual(
567 cache[1],
568 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}})
569 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
570
571 def assert_():
572 cache = wrap_numbers.cache_info()
573 self.assertEqual(
574 cache[1],
575 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0,
576 ('disk1', 2): 100}})
577 self.assertEqual(cache[2],
578 {'disk_io': {'disk1': set([('disk1', 2)])}})
579
580 # then it remains the same
581 input = {'disk1': nt(100, 100, 10)}
582 wrap_numbers(input, 'disk_io')
583 cache = wrap_numbers.cache_info()
584 self.assertEqual(cache[0], {'disk_io': input})
585 assert_()
586
587 # then it goes up
588 input = {'disk1': nt(100, 100, 90)}
589 wrap_numbers(input, 'disk_io')
590 cache = wrap_numbers.cache_info()
591 self.assertEqual(cache[0], {'disk_io': input})
592 assert_()
593
594 # then it wraps again
595 input = {'disk1': nt(100, 100, 20)}
596 wrap_numbers(input, 'disk_io')
597 cache = wrap_numbers.cache_info()
598 self.assertEqual(cache[0], {'disk_io': input})
599 self.assertEqual(
600 cache[1],
601 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}})
602 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
603
604 def test_cache_changing_keys(self):
605 input = {'disk1': nt(5, 5, 5)}
606 wrap_numbers(input, 'disk_io')
607 input = {'disk1': nt(5, 5, 5),
608 'disk2': nt(7, 7, 7)}
609 wrap_numbers(input, 'disk_io')
610 cache = wrap_numbers.cache_info()
611 self.assertEqual(cache[0], {'disk_io': input})
612 self.assertEqual(
613 cache[1],
614 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
615 self.assertEqual(cache[2], {'disk_io': {}})
616
617 def test_cache_clear(self):
618 input = {'disk1': nt(5, 5, 5)}
619 wrap_numbers(input, 'disk_io')
620 wrap_numbers(input, 'disk_io')
621 wrap_numbers.cache_clear('disk_io')
622 self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {}))
623 wrap_numbers.cache_clear('disk_io')
624 wrap_numbers.cache_clear('?!?')
625
626 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
627 def test_cache_clear_public_apis(self):
628 if not psutil.disk_io_counters() or not psutil.net_io_counters():
629 return self.skipTest("no disks or NICs available")
630 psutil.disk_io_counters()
631 psutil.net_io_counters()
632 caches = wrap_numbers.cache_info()
633 for cache in caches:
634 self.assertIn('psutil.disk_io_counters', cache)
635 self.assertIn('psutil.net_io_counters', cache)
636
637 psutil.disk_io_counters.cache_clear()
638 caches = wrap_numbers.cache_info()
639 for cache in caches:
640 self.assertIn('psutil.net_io_counters', cache)
641 self.assertNotIn('psutil.disk_io_counters', cache)
642
643 psutil.net_io_counters.cache_clear()
644 caches = wrap_numbers.cache_info()
645 self.assertEqual(caches, ({}, {}, {}))
646
647
648 # ===================================================================
649 # --- Example script tests
650 # ===================================================================
651
652
653 @unittest.skipIf(TOX, "can't test on TOX")
654 # See: https://travis-ci.org/giampaolo/psutil/jobs/295224806
655 @unittest.skipIf(TRAVIS and not os.path.exists(SCRIPTS_DIR),
656 "can't locate scripts directory")
657 class TestScripts(unittest.TestCase):
658 """Tests for scripts in the "scripts" directory."""
659
660 @staticmethod
661 def assert_stdout(exe, *args, **kwargs):
662 exe = '%s' % os.path.join(SCRIPTS_DIR, exe)
663 cmd = [PYTHON_EXE, exe]
664 for arg in args:
665 cmd.append(arg)
666 try:
667 out = sh(cmd, **kwargs).strip()
668 except RuntimeError as err:
669 if 'AccessDenied' in str(err):
670 return str(err)
671 else:
672 raise
673 assert out, out
674 return out
675
676 @staticmethod
677 def assert_syntax(exe, args=None):
678 exe = os.path.join(SCRIPTS_DIR, exe)
679 if PY3:
680 f = open(exe, 'rt', encoding='utf8')
681 else:
682 f = open(exe, 'rt')
683 with f:
684 src = f.read()
685 ast.parse(src)
686
687 def test_coverage(self):
688 # make sure all example scripts have a test method defined
689 meths = dir(self)
690 for name in os.listdir(SCRIPTS_DIR):
691 if name.endswith('.py'):
692 if 'test_' + os.path.splitext(name)[0] not in meths:
693 # self.assert_stdout(name)
694 self.fail('no test defined for %r script'
695 % os.path.join(SCRIPTS_DIR, name))
696
697 @unittest.skipIf(not POSIX, "POSIX only")
698 def test_executable(self):
699 for name in os.listdir(SCRIPTS_DIR):
700 if name.endswith('.py'):
701 path = os.path.join(SCRIPTS_DIR, name)
702 if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
703 self.fail('%r is not executable' % path)
704
705 def test_disk_usage(self):
706 self.assert_stdout('disk_usage.py')
707
708 def test_free(self):
709 self.assert_stdout('free.py')
710
711 def test_meminfo(self):
712 self.assert_stdout('meminfo.py')
713
714 def test_procinfo(self):
715 self.assert_stdout('procinfo.py', str(os.getpid()))
716
717 @unittest.skipIf(CI_TESTING and not psutil.users(), "no users")
718 def test_who(self):
719 self.assert_stdout('who.py')
720
721 def test_ps(self):
722 self.assert_stdout('ps.py')
723
724 def test_pstree(self):
725 self.assert_stdout('pstree.py')
726
727 def test_netstat(self):
728 self.assert_stdout('netstat.py')
729
730 # permission denied on travis
731 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
732 def test_ifconfig(self):
733 self.assert_stdout('ifconfig.py')
734
735 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
736 def test_pmap(self):
737 self.assert_stdout('pmap.py', str(os.getpid()))
738
739 def test_procsmem(self):
740 if 'uss' not in psutil.Process().memory_full_info()._fields:
741 raise self.skipTest("not supported")
742 self.assert_stdout('procsmem.py', stderr=DEVNULL)
743
744 def test_killall(self):
745 self.assert_syntax('killall.py')
746
747 def test_nettop(self):
748 self.assert_syntax('nettop.py')
749
750 def test_top(self):
751 self.assert_syntax('top.py')
752
753 def test_iotop(self):
754 self.assert_syntax('iotop.py')
755
756 def test_pidof(self):
757 output = self.assert_stdout('pidof.py', psutil.Process().name())
758 self.assertIn(str(os.getpid()), output)
759
760 @unittest.skipIf(not WINDOWS, "WINDOWS only")
761 def test_winservices(self):
762 self.assert_stdout('winservices.py')
763
764 def test_cpu_distribution(self):
765 self.assert_syntax('cpu_distribution.py')
766
767 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
768 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
769 def test_temperatures(self):
770 if not psutil.sensors_temperatures():
771 self.skipTest("no temperatures")
772 self.assert_stdout('temperatures.py')
773
774 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
775 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
776 def test_fans(self):
777 if not psutil.sensors_fans():
778 self.skipTest("no fans")
779 self.assert_stdout('fans.py')
780
781 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
782 @unittest.skipIf(not HAS_BATTERY, "no battery")
783 def test_battery(self):
784 self.assert_stdout('battery.py')
785
786 def test_sensors(self):
787 self.assert_stdout('sensors.py')
788
789
790 # ===================================================================
791 # --- Unit tests for test utilities.
792 # ===================================================================
793
794
795 class TestRetryDecorator(unittest.TestCase):
796
797 @mock.patch('time.sleep')
798 def test_retry_success(self, sleep):
799 # Fail 3 times out of 5; make sure the decorated fun returns.
800
801 @retry(retries=5, interval=1, logfun=None)
802 def foo():
803 while queue:
804 queue.pop()
805 1 / 0
806 return 1
807
808 queue = list(range(3))
809 self.assertEqual(foo(), 1)
810 self.assertEqual(sleep.call_count, 3)
811
812 @mock.patch('time.sleep')
813 def test_retry_failure(self, sleep):
814 # Fail 6 times out of 5; th function is supposed to raise exc.
815
816 @retry(retries=5, interval=1, logfun=None)
817 def foo():
818 while queue:
819 queue.pop()
820 1 / 0
821 return 1
822
823 queue = list(range(6))
824 self.assertRaises(ZeroDivisionError, foo)
825 self.assertEqual(sleep.call_count, 5)
826
827 @mock.patch('time.sleep')
828 def test_exception_arg(self, sleep):
829 @retry(exception=ValueError, interval=1)
830 def foo():
831 raise TypeError
832
833 self.assertRaises(TypeError, foo)
834 self.assertEqual(sleep.call_count, 0)
835
836 @mock.patch('time.sleep')
837 def test_no_interval_arg(self, sleep):
838 # if interval is not specified sleep is not supposed to be called
839
840 @retry(retries=5, interval=None, logfun=None)
841 def foo():
842 1 / 0
843
844 self.assertRaises(ZeroDivisionError, foo)
845 self.assertEqual(sleep.call_count, 0)
846
847 @mock.patch('time.sleep')
848 def test_retries_arg(self, sleep):
849
850 @retry(retries=5, interval=1, logfun=None)
851 def foo():
852 1 / 0
853
854 self.assertRaises(ZeroDivisionError, foo)
855 self.assertEqual(sleep.call_count, 5)
856
857 @mock.patch('time.sleep')
858 def test_retries_and_timeout_args(self, sleep):
859 self.assertRaises(ValueError, retry, retries=5, timeout=1)
860
861
862 class TestSyncTestUtils(unittest.TestCase):
863
864 def tearDown(self):
865 safe_rmpath(TESTFN)
866
867 def test_wait_for_pid(self):
868 wait_for_pid(os.getpid())
869 nopid = max(psutil.pids()) + 99999
870 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
871 self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
872
873 def test_wait_for_file(self):
874 with open(TESTFN, 'w') as f:
875 f.write('foo')
876 wait_for_file(TESTFN)
877 assert not os.path.exists(TESTFN)
878
879 def test_wait_for_file_empty(self):
880 with open(TESTFN, 'w'):
881 pass
882 wait_for_file(TESTFN, empty=True)
883 assert not os.path.exists(TESTFN)
884
885 def test_wait_for_file_no_file(self):
886 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
887 self.assertRaises(IOError, wait_for_file, TESTFN)
888
889 def test_wait_for_file_no_delete(self):
890 with open(TESTFN, 'w') as f:
891 f.write('foo')
892 wait_for_file(TESTFN, delete=False)
893 assert os.path.exists(TESTFN)
894
895 def test_call_until(self):
896 ret = call_until(lambda: 1, "ret == 1")
897 self.assertEqual(ret, 1)
898
899
900 class TestFSTestUtils(unittest.TestCase):
901
902 def setUp(self):
903 safe_rmpath(TESTFN)
904
905 tearDown = setUp
906
907 def test_open_text(self):
908 with open_text(__file__) as f:
909 self.assertEqual(f.mode, 'rt')
910
911 def test_open_binary(self):
912 with open_binary(__file__) as f:
913 self.assertEqual(f.mode, 'rb')
914
915 def test_safe_mkdir(self):
916 safe_mkdir(TESTFN)
917 assert os.path.isdir(TESTFN)
918 safe_mkdir(TESTFN)
919 assert os.path.isdir(TESTFN)
920
921 def test_safe_rmpath(self):
922 # test file is removed
923 open(TESTFN, 'w').close()
924 safe_rmpath(TESTFN)
925 assert not os.path.exists(TESTFN)
926 # test no exception if path does not exist
927 safe_rmpath(TESTFN)
928 # test dir is removed
929 os.mkdir(TESTFN)
930 safe_rmpath(TESTFN)
931 assert not os.path.exists(TESTFN)
932 # test other exceptions are raised
933 with mock.patch('psutil.tests.os.stat',
934 side_effect=OSError(errno.EINVAL, "")) as m:
935 with self.assertRaises(OSError):
936 safe_rmpath(TESTFN)
937 assert m.called
938
939 def test_chdir(self):
940 base = os.getcwd()
941 os.mkdir(TESTFN)
942 with chdir(TESTFN):
943 self.assertEqual(os.getcwd(), os.path.join(base, TESTFN))
944 self.assertEqual(os.getcwd(), base)
945
946
947 class TestProcessUtils(unittest.TestCase):
948
949 def test_reap_children(self):
950 subp = get_test_subprocess()
951 p = psutil.Process(subp.pid)
952 assert p.is_running()
953 reap_children()
954 assert not p.is_running()
955 assert not psutil.tests._pids_started
956 assert not psutil.tests._subprocesses_started
957
958 def test_create_proc_children_pair(self):
959 p1, p2 = create_proc_children_pair()
960 self.assertNotEqual(p1.pid, p2.pid)
961 assert p1.is_running()
962 assert p2.is_running()
963 children = psutil.Process().children(recursive=True)
964 self.assertEqual(len(children), 2)
965 self.assertIn(p1, children)
966 self.assertIn(p2, children)
967 self.assertEqual(p1.ppid(), os.getpid())
968 self.assertEqual(p2.ppid(), p1.pid)
969
970 # make sure both of them are cleaned up
971 reap_children()
972 assert not p1.is_running()
973 assert not p2.is_running()
974 assert not psutil.tests._pids_started
975 assert not psutil.tests._subprocesses_started
976
977 @unittest.skipIf(not POSIX, "POSIX only")
978 def test_create_zombie_proc(self):
979 zpid = create_zombie_proc()
980 self.addCleanup(reap_children, recursive=True)
981 p = psutil.Process(zpid)
982 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
983
984
985 class TestNetUtils(unittest.TestCase):
986
987 def bind_socket(self):
988 port = get_free_port()
989 with contextlib.closing(bind_socket(addr=('', port))) as s:
990 self.assertEqual(s.getsockname()[1], port)
991
992 @unittest.skipIf(not POSIX, "POSIX only")
993 def test_bind_unix_socket(self):
994 with unix_socket_path() as name:
995 sock = bind_unix_socket(name)
996 with contextlib.closing(sock):
997 self.assertEqual(sock.family, socket.AF_UNIX)
998 self.assertEqual(sock.type, socket.SOCK_STREAM)
999 self.assertEqual(sock.getsockname(), name)
1000 assert os.path.exists(name)
1001 assert stat.S_ISSOCK(os.stat(name).st_mode)
1002 # UDP
1003 with unix_socket_path() as name:
1004 sock = bind_unix_socket(name, type=socket.SOCK_DGRAM)
1005 with contextlib.closing(sock):
1006 self.assertEqual(sock.type, socket.SOCK_DGRAM)
1007
1008 def tcp_tcp_socketpair(self):
1009 addr = ("127.0.0.1", get_free_port())
1010 server, client = tcp_socketpair(socket.AF_INET, addr=addr)
1011 with contextlib.closing(server):
1012 with contextlib.closing(client):
1013 # Ensure they are connected and the positions are
1014 # correct.
1015 self.assertEqual(server.getsockname(), addr)
1016 self.assertEqual(client.getpeername(), addr)
1017 self.assertNotEqual(client.getsockname(), addr)
1018
1019 @unittest.skipIf(not POSIX, "POSIX only")
1020 @unittest.skipIf(NETBSD or FREEBSD,
1021 "/var/run/log UNIX socket opened by default")
1022 def test_unix_socketpair(self):
1023 p = psutil.Process()
1024 num_fds = p.num_fds()
1025 assert not p.connections(kind='unix')
1026 with unix_socket_path() as name:
1027 server, client = unix_socketpair(name)
1028 try:
1029 assert os.path.exists(name)
1030 assert stat.S_ISSOCK(os.stat(name).st_mode)
1031 self.assertEqual(p.num_fds() - num_fds, 2)
1032 self.assertEqual(len(p.connections(kind='unix')), 2)
1033 self.assertEqual(server.getsockname(), name)
1034 self.assertEqual(client.getpeername(), name)
1035 finally:
1036 client.close()
1037 server.close()
1038
1039 def test_create_sockets(self):
1040 with create_sockets() as socks:
1041 fams = collections.defaultdict(int)
1042 types = collections.defaultdict(int)
1043 for s in socks:
1044 fams[s.family] += 1
1045 # work around http://bugs.python.org/issue30204
1046 types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1
1047 self.assertGreaterEqual(fams[socket.AF_INET], 2)
1048 if supports_ipv6():
1049 self.assertGreaterEqual(fams[socket.AF_INET6], 2)
1050 if POSIX and HAS_CONNECTIONS_UNIX:
1051 self.assertGreaterEqual(fams[socket.AF_UNIX], 2)
1052 self.assertGreaterEqual(types[socket.SOCK_STREAM], 2)
1053 self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2)
1054
1055
1056 class TestOtherUtils(unittest.TestCase):
1057
1058 def test_is_namedtuple(self):
1059 assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3))
1060 assert not is_namedtuple(tuple())
1061
1062
1063 if __name__ == '__main__':
1064 from psutil.tests.runner import run
1065 run(__file__)