Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/psutil/tests/test_misc.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 #!/usr/bin/env python3 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
5 # Use of this source code is governed by a BSD-style license that can be | |
6 # found in the LICENSE file. | |
7 | |
8 """ | |
9 Miscellaneous tests. | |
10 """ | |
11 | |
12 import ast | |
13 import collections | |
14 import contextlib | |
15 import errno | |
16 import json | |
17 import os | |
18 import pickle | |
19 import socket | |
20 import stat | |
21 | |
22 from psutil import FREEBSD | |
23 from psutil import LINUX | |
24 from psutil import NETBSD | |
25 from psutil import POSIX | |
26 from psutil import WINDOWS | |
27 from psutil._common import memoize | |
28 from psutil._common import memoize_when_activated | |
29 from psutil._common import supports_ipv6 | |
30 from psutil._common import wrap_numbers | |
31 from psutil._common import open_text | |
32 from psutil._common import open_binary | |
33 from psutil._compat import PY3 | |
34 from psutil.tests import APPVEYOR | |
35 from psutil.tests import bind_socket | |
36 from psutil.tests import bind_unix_socket | |
37 from psutil.tests import call_until | |
38 from psutil.tests import chdir | |
39 from psutil.tests import CI_TESTING | |
40 from psutil.tests import create_proc_children_pair | |
41 from psutil.tests import create_sockets | |
42 from psutil.tests import create_zombie_proc | |
43 from psutil.tests import DEVNULL | |
44 from psutil.tests import get_free_port | |
45 from psutil.tests import get_test_subprocess | |
46 from psutil.tests import HAS_BATTERY | |
47 from psutil.tests import HAS_CONNECTIONS_UNIX | |
48 from psutil.tests import HAS_MEMORY_MAPS | |
49 from psutil.tests import HAS_NET_IO_COUNTERS | |
50 from psutil.tests import HAS_SENSORS_BATTERY | |
51 from psutil.tests import HAS_SENSORS_FANS | |
52 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
53 from psutil.tests import import_module_by_path | |
54 from psutil.tests import is_namedtuple | |
55 from psutil.tests import mock | |
56 from psutil.tests import PYTHON_EXE | |
57 from psutil.tests import reap_children | |
58 from psutil.tests import reload_module | |
59 from psutil.tests import retry | |
60 from psutil.tests import ROOT_DIR | |
61 from psutil.tests import safe_mkdir | |
62 from psutil.tests import safe_rmpath | |
63 from psutil.tests import SCRIPTS_DIR | |
64 from psutil.tests import sh | |
65 from psutil.tests import tcp_socketpair | |
66 from psutil.tests import TESTFN | |
67 from psutil.tests import TOX | |
68 from psutil.tests import TRAVIS | |
69 from psutil.tests import unittest | |
70 from psutil.tests import unix_socket_path | |
71 from psutil.tests import unix_socketpair | |
72 from psutil.tests import wait_for_file | |
73 from psutil.tests import wait_for_pid | |
74 import psutil | |
75 import psutil.tests | |
76 | |
77 | |
78 # =================================================================== | |
79 # --- Misc / generic tests. | |
80 # =================================================================== | |
81 | |
82 | |
83 class TestMisc(unittest.TestCase): | |
84 | |
85 def test_process__repr__(self, func=repr): | |
86 p = psutil.Process() | |
87 r = func(p) | |
88 self.assertIn("psutil.Process", r) | |
89 self.assertIn("pid=%s" % p.pid, r) | |
90 self.assertIn("name=", r) | |
91 self.assertIn(p.name(), r) | |
92 with mock.patch.object(psutil.Process, "name", | |
93 side_effect=psutil.ZombieProcess(os.getpid())): | |
94 p = psutil.Process() | |
95 r = func(p) | |
96 self.assertIn("pid=%s" % p.pid, r) | |
97 self.assertIn("zombie", r) | |
98 self.assertNotIn("name=", r) | |
99 with mock.patch.object(psutil.Process, "name", | |
100 side_effect=psutil.NoSuchProcess(os.getpid())): | |
101 p = psutil.Process() | |
102 r = func(p) | |
103 self.assertIn("pid=%s" % p.pid, r) | |
104 self.assertIn("terminated", r) | |
105 self.assertNotIn("name=", r) | |
106 with mock.patch.object(psutil.Process, "name", | |
107 side_effect=psutil.AccessDenied(os.getpid())): | |
108 p = psutil.Process() | |
109 r = func(p) | |
110 self.assertIn("pid=%s" % p.pid, r) | |
111 self.assertNotIn("name=", r) | |
112 | |
113 def test_process__str__(self): | |
114 self.test_process__repr__(func=str) | |
115 | |
116 def test_no_such_process__repr__(self, func=repr): | |
117 self.assertEqual( | |
118 repr(psutil.NoSuchProcess(321)), | |
119 "psutil.NoSuchProcess process no longer exists (pid=321)") | |
120 self.assertEqual( | |
121 repr(psutil.NoSuchProcess(321, name='foo')), | |
122 "psutil.NoSuchProcess process no longer exists (pid=321, " | |
123 "name='foo')") | |
124 self.assertEqual( | |
125 repr(psutil.NoSuchProcess(321, msg='foo')), | |
126 "psutil.NoSuchProcess foo") | |
127 | |
128 def test_zombie_process__repr__(self, func=repr): | |
129 self.assertEqual( | |
130 repr(psutil.ZombieProcess(321)), | |
131 "psutil.ZombieProcess process still exists but it's a zombie " | |
132 "(pid=321)") | |
133 self.assertEqual( | |
134 repr(psutil.ZombieProcess(321, name='foo')), | |
135 "psutil.ZombieProcess process still exists but it's a zombie " | |
136 "(pid=321, name='foo')") | |
137 self.assertEqual( | |
138 repr(psutil.ZombieProcess(321, name='foo', ppid=1)), | |
139 "psutil.ZombieProcess process still exists but it's a zombie " | |
140 "(pid=321, name='foo', ppid=1)") | |
141 self.assertEqual( | |
142 repr(psutil.ZombieProcess(321, msg='foo')), | |
143 "psutil.ZombieProcess foo") | |
144 | |
145 def test_access_denied__repr__(self, func=repr): | |
146 self.assertEqual( | |
147 repr(psutil.AccessDenied(321)), | |
148 "psutil.AccessDenied (pid=321)") | |
149 self.assertEqual( | |
150 repr(psutil.AccessDenied(321, name='foo')), | |
151 "psutil.AccessDenied (pid=321, name='foo')") | |
152 self.assertEqual( | |
153 repr(psutil.AccessDenied(321, msg='foo')), | |
154 "psutil.AccessDenied foo") | |
155 | |
156 def test_timeout_expired__repr__(self, func=repr): | |
157 self.assertEqual( | |
158 repr(psutil.TimeoutExpired(321)), | |
159 "psutil.TimeoutExpired timeout after 321 seconds") | |
160 self.assertEqual( | |
161 repr(psutil.TimeoutExpired(321, pid=111)), | |
162 "psutil.TimeoutExpired timeout after 321 seconds (pid=111)") | |
163 self.assertEqual( | |
164 repr(psutil.TimeoutExpired(321, pid=111, name='foo')), | |
165 "psutil.TimeoutExpired timeout after 321 seconds " | |
166 "(pid=111, name='foo')") | |
167 | |
168 def test_process__eq__(self): | |
169 p1 = psutil.Process() | |
170 p2 = psutil.Process() | |
171 self.assertEqual(p1, p2) | |
172 p2._ident = (0, 0) | |
173 self.assertNotEqual(p1, p2) | |
174 self.assertNotEqual(p1, 'foo') | |
175 | |
176 def test_process__hash__(self): | |
177 s = set([psutil.Process(), psutil.Process()]) | |
178 self.assertEqual(len(s), 1) | |
179 | |
180 def test__all__(self): | |
181 dir_psutil = dir(psutil) | |
182 for name in dir_psutil: | |
183 if name in ('callable', 'error', 'namedtuple', 'tests', | |
184 'long', 'test', 'NUM_CPUS', 'BOOT_TIME', | |
185 'TOTAL_PHYMEM', 'PermissionError', | |
186 'ProcessLookupError'): | |
187 continue | |
188 if not name.startswith('_'): | |
189 try: | |
190 __import__(name) | |
191 except ImportError: | |
192 if name not in psutil.__all__: | |
193 fun = getattr(psutil, name) | |
194 if fun is None: | |
195 continue | |
196 if (fun.__doc__ is not None and | |
197 'deprecated' not in fun.__doc__.lower()): | |
198 self.fail('%r not in psutil.__all__' % name) | |
199 | |
200 # Import 'star' will break if __all__ is inconsistent, see: | |
201 # https://github.com/giampaolo/psutil/issues/656 | |
202 # Can't do `from psutil import *` as it won't work on python 3 | |
203 # so we simply iterate over __all__. | |
204 for name in psutil.__all__: | |
205 self.assertIn(name, dir_psutil) | |
206 | |
207 def test_version(self): | |
208 self.assertEqual('.'.join([str(x) for x in psutil.version_info]), | |
209 psutil.__version__) | |
210 | |
211 def test_process_as_dict_no_new_names(self): | |
212 # See https://github.com/giampaolo/psutil/issues/813 | |
213 p = psutil.Process() | |
214 p.foo = '1' | |
215 self.assertNotIn('foo', p.as_dict()) | |
216 | |
217 def test_memoize(self): | |
218 @memoize | |
219 def foo(*args, **kwargs): | |
220 "foo docstring" | |
221 calls.append(None) | |
222 return (args, kwargs) | |
223 | |
224 calls = [] | |
225 # no args | |
226 for x in range(2): | |
227 ret = foo() | |
228 expected = ((), {}) | |
229 self.assertEqual(ret, expected) | |
230 self.assertEqual(len(calls), 1) | |
231 # with args | |
232 for x in range(2): | |
233 ret = foo(1) | |
234 expected = ((1, ), {}) | |
235 self.assertEqual(ret, expected) | |
236 self.assertEqual(len(calls), 2) | |
237 # with args + kwargs | |
238 for x in range(2): | |
239 ret = foo(1, bar=2) | |
240 expected = ((1, ), {'bar': 2}) | |
241 self.assertEqual(ret, expected) | |
242 self.assertEqual(len(calls), 3) | |
243 # clear cache | |
244 foo.cache_clear() | |
245 ret = foo() | |
246 expected = ((), {}) | |
247 self.assertEqual(ret, expected) | |
248 self.assertEqual(len(calls), 4) | |
249 # docstring | |
250 self.assertEqual(foo.__doc__, "foo docstring") | |
251 | |
252 def test_memoize_when_activated(self): | |
253 class Foo: | |
254 | |
255 @memoize_when_activated | |
256 def foo(self): | |
257 calls.append(None) | |
258 | |
259 f = Foo() | |
260 calls = [] | |
261 f.foo() | |
262 f.foo() | |
263 self.assertEqual(len(calls), 2) | |
264 | |
265 # activate | |
266 calls = [] | |
267 f.foo.cache_activate(f) | |
268 f.foo() | |
269 f.foo() | |
270 self.assertEqual(len(calls), 1) | |
271 | |
272 # deactivate | |
273 calls = [] | |
274 f.foo.cache_deactivate(f) | |
275 f.foo() | |
276 f.foo() | |
277 self.assertEqual(len(calls), 2) | |
278 | |
279 def test_parse_environ_block(self): | |
280 from psutil._common import parse_environ_block | |
281 | |
282 def k(s): | |
283 return s.upper() if WINDOWS else s | |
284 | |
285 self.assertEqual(parse_environ_block("a=1\0"), | |
286 {k("a"): "1"}) | |
287 self.assertEqual(parse_environ_block("a=1\0b=2\0\0"), | |
288 {k("a"): "1", k("b"): "2"}) | |
289 self.assertEqual(parse_environ_block("a=1\0b=\0\0"), | |
290 {k("a"): "1", k("b"): ""}) | |
291 # ignore everything after \0\0 | |
292 self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"), | |
293 {k("a"): "1", k("b"): "2"}) | |
294 # ignore everything that is not an assignment | |
295 self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"}) | |
296 self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"}) | |
297 # do not fail if the block is incomplete | |
298 self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"}) | |
299 | |
300 def test_supports_ipv6(self): | |
301 self.addCleanup(supports_ipv6.cache_clear) | |
302 if supports_ipv6(): | |
303 with mock.patch('psutil._common.socket') as s: | |
304 s.has_ipv6 = False | |
305 supports_ipv6.cache_clear() | |
306 assert not supports_ipv6() | |
307 | |
308 supports_ipv6.cache_clear() | |
309 with mock.patch('psutil._common.socket.socket', | |
310 side_effect=socket.error) as s: | |
311 assert not supports_ipv6() | |
312 assert s.called | |
313 | |
314 supports_ipv6.cache_clear() | |
315 with mock.patch('psutil._common.socket.socket', | |
316 side_effect=socket.gaierror) as s: | |
317 assert not supports_ipv6() | |
318 supports_ipv6.cache_clear() | |
319 assert s.called | |
320 | |
321 supports_ipv6.cache_clear() | |
322 with mock.patch('psutil._common.socket.socket.bind', | |
323 side_effect=socket.gaierror) as s: | |
324 assert not supports_ipv6() | |
325 supports_ipv6.cache_clear() | |
326 assert s.called | |
327 else: | |
328 with self.assertRaises(Exception): | |
329 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
330 sock.bind(("::1", 0)) | |
331 | |
332 def test_isfile_strict(self): | |
333 from psutil._common import isfile_strict | |
334 this_file = os.path.abspath(__file__) | |
335 assert isfile_strict(this_file) | |
336 assert not isfile_strict(os.path.dirname(this_file)) | |
337 with mock.patch('psutil._common.os.stat', | |
338 side_effect=OSError(errno.EPERM, "foo")): | |
339 self.assertRaises(OSError, isfile_strict, this_file) | |
340 with mock.patch('psutil._common.os.stat', | |
341 side_effect=OSError(errno.EACCES, "foo")): | |
342 self.assertRaises(OSError, isfile_strict, this_file) | |
343 with mock.patch('psutil._common.os.stat', | |
344 side_effect=OSError(errno.EINVAL, "foo")): | |
345 assert not isfile_strict(this_file) | |
346 with mock.patch('psutil._common.stat.S_ISREG', return_value=False): | |
347 assert not isfile_strict(this_file) | |
348 | |
349 def test_serialization(self): | |
350 def check(ret): | |
351 if json is not None: | |
352 json.loads(json.dumps(ret)) | |
353 a = pickle.dumps(ret) | |
354 b = pickle.loads(a) | |
355 self.assertEqual(ret, b) | |
356 | |
357 check(psutil.Process().as_dict()) | |
358 check(psutil.virtual_memory()) | |
359 check(psutil.swap_memory()) | |
360 check(psutil.cpu_times()) | |
361 check(psutil.cpu_times_percent(interval=0)) | |
362 check(psutil.net_io_counters()) | |
363 if LINUX and not os.path.exists('/proc/diskstats'): | |
364 pass | |
365 else: | |
366 if not APPVEYOR: | |
367 check(psutil.disk_io_counters()) | |
368 check(psutil.disk_partitions()) | |
369 check(psutil.disk_usage(os.getcwd())) | |
370 check(psutil.users()) | |
371 | |
372 def test_setup_script(self): | |
373 setup_py = os.path.join(ROOT_DIR, 'setup.py') | |
374 if TRAVIS and not os.path.exists(setup_py): | |
375 return self.skipTest("can't find setup.py") | |
376 module = import_module_by_path(setup_py) | |
377 self.assertRaises(SystemExit, module.setup) | |
378 self.assertEqual(module.get_version(), psutil.__version__) | |
379 | |
380 def test_ad_on_process_creation(self): | |
381 # We are supposed to be able to instantiate Process also in case | |
382 # of zombie processes or access denied. | |
383 with mock.patch.object(psutil.Process, 'create_time', | |
384 side_effect=psutil.AccessDenied) as meth: | |
385 psutil.Process() | |
386 assert meth.called | |
387 with mock.patch.object(psutil.Process, 'create_time', | |
388 side_effect=psutil.ZombieProcess(1)) as meth: | |
389 psutil.Process() | |
390 assert meth.called | |
391 with mock.patch.object(psutil.Process, 'create_time', | |
392 side_effect=ValueError) as meth: | |
393 with self.assertRaises(ValueError): | |
394 psutil.Process() | |
395 assert meth.called | |
396 | |
397 def test_sanity_version_check(self): | |
398 # see: https://github.com/giampaolo/psutil/issues/564 | |
399 with mock.patch( | |
400 "psutil._psplatform.cext.version", return_value="0.0.0"): | |
401 with self.assertRaises(ImportError) as cm: | |
402 reload_module(psutil) | |
403 self.assertIn("version conflict", str(cm.exception).lower()) | |
404 | |
405 | |
406 # =================================================================== | |
407 # --- Tests for wrap_numbers() function. | |
408 # =================================================================== | |
409 | |
410 | |
411 nt = collections.namedtuple('foo', 'a b c') | |
412 | |
413 | |
414 class TestWrapNumbers(unittest.TestCase): | |
415 | |
416 def setUp(self): | |
417 wrap_numbers.cache_clear() | |
418 | |
419 tearDown = setUp | |
420 | |
421 def test_first_call(self): | |
422 input = {'disk1': nt(5, 5, 5)} | |
423 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
424 | |
425 def test_input_hasnt_changed(self): | |
426 input = {'disk1': nt(5, 5, 5)} | |
427 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
428 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
429 | |
430 def test_increase_but_no_wrap(self): | |
431 input = {'disk1': nt(5, 5, 5)} | |
432 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
433 input = {'disk1': nt(10, 15, 20)} | |
434 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
435 input = {'disk1': nt(20, 25, 30)} | |
436 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
437 input = {'disk1': nt(20, 25, 30)} | |
438 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
439 | |
440 def test_wrap(self): | |
441 # let's say 100 is the threshold | |
442 input = {'disk1': nt(100, 100, 100)} | |
443 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
444 # first wrap restarts from 10 | |
445 input = {'disk1': nt(100, 100, 10)} | |
446 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
447 {'disk1': nt(100, 100, 110)}) | |
448 # then it remains the same | |
449 input = {'disk1': nt(100, 100, 10)} | |
450 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
451 {'disk1': nt(100, 100, 110)}) | |
452 # then it goes up | |
453 input = {'disk1': nt(100, 100, 90)} | |
454 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
455 {'disk1': nt(100, 100, 190)}) | |
456 # then it wraps again | |
457 input = {'disk1': nt(100, 100, 20)} | |
458 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
459 {'disk1': nt(100, 100, 210)}) | |
460 # and remains the same | |
461 input = {'disk1': nt(100, 100, 20)} | |
462 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
463 {'disk1': nt(100, 100, 210)}) | |
464 # now wrap another num | |
465 input = {'disk1': nt(50, 100, 20)} | |
466 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
467 {'disk1': nt(150, 100, 210)}) | |
468 # and again | |
469 input = {'disk1': nt(40, 100, 20)} | |
470 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
471 {'disk1': nt(190, 100, 210)}) | |
472 # keep it the same | |
473 input = {'disk1': nt(40, 100, 20)} | |
474 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
475 {'disk1': nt(190, 100, 210)}) | |
476 | |
477 def test_changing_keys(self): | |
478 # Emulate a case where the second call to disk_io() | |
479 # (or whatever) provides a new disk, then the new disk | |
480 # disappears on the third call. | |
481 input = {'disk1': nt(5, 5, 5)} | |
482 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
483 input = {'disk1': nt(5, 5, 5), | |
484 'disk2': nt(7, 7, 7)} | |
485 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
486 input = {'disk1': nt(8, 8, 8)} | |
487 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
488 | |
489 def test_changing_keys_w_wrap(self): | |
490 input = {'disk1': nt(50, 50, 50), | |
491 'disk2': nt(100, 100, 100)} | |
492 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
493 # disk 2 wraps | |
494 input = {'disk1': nt(50, 50, 50), | |
495 'disk2': nt(100, 100, 10)} | |
496 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
497 {'disk1': nt(50, 50, 50), | |
498 'disk2': nt(100, 100, 110)}) | |
499 # disk 2 disappears | |
500 input = {'disk1': nt(50, 50, 50)} | |
501 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
502 | |
503 # then it appears again; the old wrap is supposed to be | |
504 # gone. | |
505 input = {'disk1': nt(50, 50, 50), | |
506 'disk2': nt(100, 100, 100)} | |
507 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
508 # remains the same | |
509 input = {'disk1': nt(50, 50, 50), | |
510 'disk2': nt(100, 100, 100)} | |
511 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
512 # and then wraps again | |
513 input = {'disk1': nt(50, 50, 50), | |
514 'disk2': nt(100, 100, 10)} | |
515 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
516 {'disk1': nt(50, 50, 50), | |
517 'disk2': nt(100, 100, 110)}) | |
518 | |
519 def test_real_data(self): | |
520 d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), | |
521 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), | |
522 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), | |
523 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)} | |
524 self.assertEqual(wrap_numbers(d, 'disk_io'), d) | |
525 self.assertEqual(wrap_numbers(d, 'disk_io'), d) | |
526 # decrease this ↓ | |
527 d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), | |
528 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), | |
529 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), | |
530 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)} | |
531 out = wrap_numbers(d, 'disk_io') | |
532 self.assertEqual(out['nvme0n1'][0], 400) | |
533 | |
534 # --- cache tests | |
535 | |
536 def test_cache_first_call(self): | |
537 input = {'disk1': nt(5, 5, 5)} | |
538 wrap_numbers(input, 'disk_io') | |
539 cache = wrap_numbers.cache_info() | |
540 self.assertEqual(cache[0], {'disk_io': input}) | |
541 self.assertEqual(cache[1], {'disk_io': {}}) | |
542 self.assertEqual(cache[2], {'disk_io': {}}) | |
543 | |
544 def test_cache_call_twice(self): | |
545 input = {'disk1': nt(5, 5, 5)} | |
546 wrap_numbers(input, 'disk_io') | |
547 input = {'disk1': nt(10, 10, 10)} | |
548 wrap_numbers(input, 'disk_io') | |
549 cache = wrap_numbers.cache_info() | |
550 self.assertEqual(cache[0], {'disk_io': input}) | |
551 self.assertEqual( | |
552 cache[1], | |
553 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}}) | |
554 self.assertEqual(cache[2], {'disk_io': {}}) | |
555 | |
556 def test_cache_wrap(self): | |
557 # let's say 100 is the threshold | |
558 input = {'disk1': nt(100, 100, 100)} | |
559 wrap_numbers(input, 'disk_io') | |
560 | |
561 # first wrap restarts from 10 | |
562 input = {'disk1': nt(100, 100, 10)} | |
563 wrap_numbers(input, 'disk_io') | |
564 cache = wrap_numbers.cache_info() | |
565 self.assertEqual(cache[0], {'disk_io': input}) | |
566 self.assertEqual( | |
567 cache[1], | |
568 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}}) | |
569 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
570 | |
571 def assert_(): | |
572 cache = wrap_numbers.cache_info() | |
573 self.assertEqual( | |
574 cache[1], | |
575 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, | |
576 ('disk1', 2): 100}}) | |
577 self.assertEqual(cache[2], | |
578 {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
579 | |
580 # then it remains the same | |
581 input = {'disk1': nt(100, 100, 10)} | |
582 wrap_numbers(input, 'disk_io') | |
583 cache = wrap_numbers.cache_info() | |
584 self.assertEqual(cache[0], {'disk_io': input}) | |
585 assert_() | |
586 | |
587 # then it goes up | |
588 input = {'disk1': nt(100, 100, 90)} | |
589 wrap_numbers(input, 'disk_io') | |
590 cache = wrap_numbers.cache_info() | |
591 self.assertEqual(cache[0], {'disk_io': input}) | |
592 assert_() | |
593 | |
594 # then it wraps again | |
595 input = {'disk1': nt(100, 100, 20)} | |
596 wrap_numbers(input, 'disk_io') | |
597 cache = wrap_numbers.cache_info() | |
598 self.assertEqual(cache[0], {'disk_io': input}) | |
599 self.assertEqual( | |
600 cache[1], | |
601 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}}) | |
602 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
603 | |
604 def test_cache_changing_keys(self): | |
605 input = {'disk1': nt(5, 5, 5)} | |
606 wrap_numbers(input, 'disk_io') | |
607 input = {'disk1': nt(5, 5, 5), | |
608 'disk2': nt(7, 7, 7)} | |
609 wrap_numbers(input, 'disk_io') | |
610 cache = wrap_numbers.cache_info() | |
611 self.assertEqual(cache[0], {'disk_io': input}) | |
612 self.assertEqual( | |
613 cache[1], | |
614 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}}) | |
615 self.assertEqual(cache[2], {'disk_io': {}}) | |
616 | |
617 def test_cache_clear(self): | |
618 input = {'disk1': nt(5, 5, 5)} | |
619 wrap_numbers(input, 'disk_io') | |
620 wrap_numbers(input, 'disk_io') | |
621 wrap_numbers.cache_clear('disk_io') | |
622 self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {})) | |
623 wrap_numbers.cache_clear('disk_io') | |
624 wrap_numbers.cache_clear('?!?') | |
625 | |
626 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
627 def test_cache_clear_public_apis(self): | |
628 if not psutil.disk_io_counters() or not psutil.net_io_counters(): | |
629 return self.skipTest("no disks or NICs available") | |
630 psutil.disk_io_counters() | |
631 psutil.net_io_counters() | |
632 caches = wrap_numbers.cache_info() | |
633 for cache in caches: | |
634 self.assertIn('psutil.disk_io_counters', cache) | |
635 self.assertIn('psutil.net_io_counters', cache) | |
636 | |
637 psutil.disk_io_counters.cache_clear() | |
638 caches = wrap_numbers.cache_info() | |
639 for cache in caches: | |
640 self.assertIn('psutil.net_io_counters', cache) | |
641 self.assertNotIn('psutil.disk_io_counters', cache) | |
642 | |
643 psutil.net_io_counters.cache_clear() | |
644 caches = wrap_numbers.cache_info() | |
645 self.assertEqual(caches, ({}, {}, {})) | |
646 | |
647 | |
648 # =================================================================== | |
649 # --- Example script tests | |
650 # =================================================================== | |
651 | |
652 | |
653 @unittest.skipIf(TOX, "can't test on TOX") | |
654 # See: https://travis-ci.org/giampaolo/psutil/jobs/295224806 | |
655 @unittest.skipIf(TRAVIS and not os.path.exists(SCRIPTS_DIR), | |
656 "can't locate scripts directory") | |
657 class TestScripts(unittest.TestCase): | |
658 """Tests for scripts in the "scripts" directory.""" | |
659 | |
660 @staticmethod | |
661 def assert_stdout(exe, *args, **kwargs): | |
662 exe = '%s' % os.path.join(SCRIPTS_DIR, exe) | |
663 cmd = [PYTHON_EXE, exe] | |
664 for arg in args: | |
665 cmd.append(arg) | |
666 try: | |
667 out = sh(cmd, **kwargs).strip() | |
668 except RuntimeError as err: | |
669 if 'AccessDenied' in str(err): | |
670 return str(err) | |
671 else: | |
672 raise | |
673 assert out, out | |
674 return out | |
675 | |
676 @staticmethod | |
677 def assert_syntax(exe, args=None): | |
678 exe = os.path.join(SCRIPTS_DIR, exe) | |
679 if PY3: | |
680 f = open(exe, 'rt', encoding='utf8') | |
681 else: | |
682 f = open(exe, 'rt') | |
683 with f: | |
684 src = f.read() | |
685 ast.parse(src) | |
686 | |
687 def test_coverage(self): | |
688 # make sure all example scripts have a test method defined | |
689 meths = dir(self) | |
690 for name in os.listdir(SCRIPTS_DIR): | |
691 if name.endswith('.py'): | |
692 if 'test_' + os.path.splitext(name)[0] not in meths: | |
693 # self.assert_stdout(name) | |
694 self.fail('no test defined for %r script' | |
695 % os.path.join(SCRIPTS_DIR, name)) | |
696 | |
697 @unittest.skipIf(not POSIX, "POSIX only") | |
698 def test_executable(self): | |
699 for name in os.listdir(SCRIPTS_DIR): | |
700 if name.endswith('.py'): | |
701 path = os.path.join(SCRIPTS_DIR, name) | |
702 if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]: | |
703 self.fail('%r is not executable' % path) | |
704 | |
705 def test_disk_usage(self): | |
706 self.assert_stdout('disk_usage.py') | |
707 | |
708 def test_free(self): | |
709 self.assert_stdout('free.py') | |
710 | |
711 def test_meminfo(self): | |
712 self.assert_stdout('meminfo.py') | |
713 | |
714 def test_procinfo(self): | |
715 self.assert_stdout('procinfo.py', str(os.getpid())) | |
716 | |
717 @unittest.skipIf(CI_TESTING and not psutil.users(), "no users") | |
718 def test_who(self): | |
719 self.assert_stdout('who.py') | |
720 | |
721 def test_ps(self): | |
722 self.assert_stdout('ps.py') | |
723 | |
724 def test_pstree(self): | |
725 self.assert_stdout('pstree.py') | |
726 | |
727 def test_netstat(self): | |
728 self.assert_stdout('netstat.py') | |
729 | |
730 # permission denied on travis | |
731 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
732 def test_ifconfig(self): | |
733 self.assert_stdout('ifconfig.py') | |
734 | |
735 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
736 def test_pmap(self): | |
737 self.assert_stdout('pmap.py', str(os.getpid())) | |
738 | |
739 def test_procsmem(self): | |
740 if 'uss' not in psutil.Process().memory_full_info()._fields: | |
741 raise self.skipTest("not supported") | |
742 self.assert_stdout('procsmem.py', stderr=DEVNULL) | |
743 | |
744 def test_killall(self): | |
745 self.assert_syntax('killall.py') | |
746 | |
747 def test_nettop(self): | |
748 self.assert_syntax('nettop.py') | |
749 | |
750 def test_top(self): | |
751 self.assert_syntax('top.py') | |
752 | |
753 def test_iotop(self): | |
754 self.assert_syntax('iotop.py') | |
755 | |
756 def test_pidof(self): | |
757 output = self.assert_stdout('pidof.py', psutil.Process().name()) | |
758 self.assertIn(str(os.getpid()), output) | |
759 | |
760 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
761 def test_winservices(self): | |
762 self.assert_stdout('winservices.py') | |
763 | |
764 def test_cpu_distribution(self): | |
765 self.assert_syntax('cpu_distribution.py') | |
766 | |
767 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
768 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
769 def test_temperatures(self): | |
770 if not psutil.sensors_temperatures(): | |
771 self.skipTest("no temperatures") | |
772 self.assert_stdout('temperatures.py') | |
773 | |
774 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
775 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
776 def test_fans(self): | |
777 if not psutil.sensors_fans(): | |
778 self.skipTest("no fans") | |
779 self.assert_stdout('fans.py') | |
780 | |
781 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
782 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
783 def test_battery(self): | |
784 self.assert_stdout('battery.py') | |
785 | |
786 def test_sensors(self): | |
787 self.assert_stdout('sensors.py') | |
788 | |
789 | |
790 # =================================================================== | |
791 # --- Unit tests for test utilities. | |
792 # =================================================================== | |
793 | |
794 | |
795 class TestRetryDecorator(unittest.TestCase): | |
796 | |
797 @mock.patch('time.sleep') | |
798 def test_retry_success(self, sleep): | |
799 # Fail 3 times out of 5; make sure the decorated fun returns. | |
800 | |
801 @retry(retries=5, interval=1, logfun=None) | |
802 def foo(): | |
803 while queue: | |
804 queue.pop() | |
805 1 / 0 | |
806 return 1 | |
807 | |
808 queue = list(range(3)) | |
809 self.assertEqual(foo(), 1) | |
810 self.assertEqual(sleep.call_count, 3) | |
811 | |
812 @mock.patch('time.sleep') | |
813 def test_retry_failure(self, sleep): | |
814 # Fail 6 times out of 5; th function is supposed to raise exc. | |
815 | |
816 @retry(retries=5, interval=1, logfun=None) | |
817 def foo(): | |
818 while queue: | |
819 queue.pop() | |
820 1 / 0 | |
821 return 1 | |
822 | |
823 queue = list(range(6)) | |
824 self.assertRaises(ZeroDivisionError, foo) | |
825 self.assertEqual(sleep.call_count, 5) | |
826 | |
827 @mock.patch('time.sleep') | |
828 def test_exception_arg(self, sleep): | |
829 @retry(exception=ValueError, interval=1) | |
830 def foo(): | |
831 raise TypeError | |
832 | |
833 self.assertRaises(TypeError, foo) | |
834 self.assertEqual(sleep.call_count, 0) | |
835 | |
836 @mock.patch('time.sleep') | |
837 def test_no_interval_arg(self, sleep): | |
838 # if interval is not specified sleep is not supposed to be called | |
839 | |
840 @retry(retries=5, interval=None, logfun=None) | |
841 def foo(): | |
842 1 / 0 | |
843 | |
844 self.assertRaises(ZeroDivisionError, foo) | |
845 self.assertEqual(sleep.call_count, 0) | |
846 | |
847 @mock.patch('time.sleep') | |
848 def test_retries_arg(self, sleep): | |
849 | |
850 @retry(retries=5, interval=1, logfun=None) | |
851 def foo(): | |
852 1 / 0 | |
853 | |
854 self.assertRaises(ZeroDivisionError, foo) | |
855 self.assertEqual(sleep.call_count, 5) | |
856 | |
857 @mock.patch('time.sleep') | |
858 def test_retries_and_timeout_args(self, sleep): | |
859 self.assertRaises(ValueError, retry, retries=5, timeout=1) | |
860 | |
861 | |
862 class TestSyncTestUtils(unittest.TestCase): | |
863 | |
864 def tearDown(self): | |
865 safe_rmpath(TESTFN) | |
866 | |
867 def test_wait_for_pid(self): | |
868 wait_for_pid(os.getpid()) | |
869 nopid = max(psutil.pids()) + 99999 | |
870 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): | |
871 self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid) | |
872 | |
873 def test_wait_for_file(self): | |
874 with open(TESTFN, 'w') as f: | |
875 f.write('foo') | |
876 wait_for_file(TESTFN) | |
877 assert not os.path.exists(TESTFN) | |
878 | |
879 def test_wait_for_file_empty(self): | |
880 with open(TESTFN, 'w'): | |
881 pass | |
882 wait_for_file(TESTFN, empty=True) | |
883 assert not os.path.exists(TESTFN) | |
884 | |
885 def test_wait_for_file_no_file(self): | |
886 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): | |
887 self.assertRaises(IOError, wait_for_file, TESTFN) | |
888 | |
889 def test_wait_for_file_no_delete(self): | |
890 with open(TESTFN, 'w') as f: | |
891 f.write('foo') | |
892 wait_for_file(TESTFN, delete=False) | |
893 assert os.path.exists(TESTFN) | |
894 | |
895 def test_call_until(self): | |
896 ret = call_until(lambda: 1, "ret == 1") | |
897 self.assertEqual(ret, 1) | |
898 | |
899 | |
900 class TestFSTestUtils(unittest.TestCase): | |
901 | |
902 def setUp(self): | |
903 safe_rmpath(TESTFN) | |
904 | |
905 tearDown = setUp | |
906 | |
907 def test_open_text(self): | |
908 with open_text(__file__) as f: | |
909 self.assertEqual(f.mode, 'rt') | |
910 | |
911 def test_open_binary(self): | |
912 with open_binary(__file__) as f: | |
913 self.assertEqual(f.mode, 'rb') | |
914 | |
915 def test_safe_mkdir(self): | |
916 safe_mkdir(TESTFN) | |
917 assert os.path.isdir(TESTFN) | |
918 safe_mkdir(TESTFN) | |
919 assert os.path.isdir(TESTFN) | |
920 | |
921 def test_safe_rmpath(self): | |
922 # test file is removed | |
923 open(TESTFN, 'w').close() | |
924 safe_rmpath(TESTFN) | |
925 assert not os.path.exists(TESTFN) | |
926 # test no exception if path does not exist | |
927 safe_rmpath(TESTFN) | |
928 # test dir is removed | |
929 os.mkdir(TESTFN) | |
930 safe_rmpath(TESTFN) | |
931 assert not os.path.exists(TESTFN) | |
932 # test other exceptions are raised | |
933 with mock.patch('psutil.tests.os.stat', | |
934 side_effect=OSError(errno.EINVAL, "")) as m: | |
935 with self.assertRaises(OSError): | |
936 safe_rmpath(TESTFN) | |
937 assert m.called | |
938 | |
939 def test_chdir(self): | |
940 base = os.getcwd() | |
941 os.mkdir(TESTFN) | |
942 with chdir(TESTFN): | |
943 self.assertEqual(os.getcwd(), os.path.join(base, TESTFN)) | |
944 self.assertEqual(os.getcwd(), base) | |
945 | |
946 | |
947 class TestProcessUtils(unittest.TestCase): | |
948 | |
949 def test_reap_children(self): | |
950 subp = get_test_subprocess() | |
951 p = psutil.Process(subp.pid) | |
952 assert p.is_running() | |
953 reap_children() | |
954 assert not p.is_running() | |
955 assert not psutil.tests._pids_started | |
956 assert not psutil.tests._subprocesses_started | |
957 | |
958 def test_create_proc_children_pair(self): | |
959 p1, p2 = create_proc_children_pair() | |
960 self.assertNotEqual(p1.pid, p2.pid) | |
961 assert p1.is_running() | |
962 assert p2.is_running() | |
963 children = psutil.Process().children(recursive=True) | |
964 self.assertEqual(len(children), 2) | |
965 self.assertIn(p1, children) | |
966 self.assertIn(p2, children) | |
967 self.assertEqual(p1.ppid(), os.getpid()) | |
968 self.assertEqual(p2.ppid(), p1.pid) | |
969 | |
970 # make sure both of them are cleaned up | |
971 reap_children() | |
972 assert not p1.is_running() | |
973 assert not p2.is_running() | |
974 assert not psutil.tests._pids_started | |
975 assert not psutil.tests._subprocesses_started | |
976 | |
977 @unittest.skipIf(not POSIX, "POSIX only") | |
978 def test_create_zombie_proc(self): | |
979 zpid = create_zombie_proc() | |
980 self.addCleanup(reap_children, recursive=True) | |
981 p = psutil.Process(zpid) | |
982 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) | |
983 | |
984 | |
985 class TestNetUtils(unittest.TestCase): | |
986 | |
987 def bind_socket(self): | |
988 port = get_free_port() | |
989 with contextlib.closing(bind_socket(addr=('', port))) as s: | |
990 self.assertEqual(s.getsockname()[1], port) | |
991 | |
992 @unittest.skipIf(not POSIX, "POSIX only") | |
993 def test_bind_unix_socket(self): | |
994 with unix_socket_path() as name: | |
995 sock = bind_unix_socket(name) | |
996 with contextlib.closing(sock): | |
997 self.assertEqual(sock.family, socket.AF_UNIX) | |
998 self.assertEqual(sock.type, socket.SOCK_STREAM) | |
999 self.assertEqual(sock.getsockname(), name) | |
1000 assert os.path.exists(name) | |
1001 assert stat.S_ISSOCK(os.stat(name).st_mode) | |
1002 # UDP | |
1003 with unix_socket_path() as name: | |
1004 sock = bind_unix_socket(name, type=socket.SOCK_DGRAM) | |
1005 with contextlib.closing(sock): | |
1006 self.assertEqual(sock.type, socket.SOCK_DGRAM) | |
1007 | |
1008 def tcp_tcp_socketpair(self): | |
1009 addr = ("127.0.0.1", get_free_port()) | |
1010 server, client = tcp_socketpair(socket.AF_INET, addr=addr) | |
1011 with contextlib.closing(server): | |
1012 with contextlib.closing(client): | |
1013 # Ensure they are connected and the positions are | |
1014 # correct. | |
1015 self.assertEqual(server.getsockname(), addr) | |
1016 self.assertEqual(client.getpeername(), addr) | |
1017 self.assertNotEqual(client.getsockname(), addr) | |
1018 | |
1019 @unittest.skipIf(not POSIX, "POSIX only") | |
1020 @unittest.skipIf(NETBSD or FREEBSD, | |
1021 "/var/run/log UNIX socket opened by default") | |
1022 def test_unix_socketpair(self): | |
1023 p = psutil.Process() | |
1024 num_fds = p.num_fds() | |
1025 assert not p.connections(kind='unix') | |
1026 with unix_socket_path() as name: | |
1027 server, client = unix_socketpair(name) | |
1028 try: | |
1029 assert os.path.exists(name) | |
1030 assert stat.S_ISSOCK(os.stat(name).st_mode) | |
1031 self.assertEqual(p.num_fds() - num_fds, 2) | |
1032 self.assertEqual(len(p.connections(kind='unix')), 2) | |
1033 self.assertEqual(server.getsockname(), name) | |
1034 self.assertEqual(client.getpeername(), name) | |
1035 finally: | |
1036 client.close() | |
1037 server.close() | |
1038 | |
1039 def test_create_sockets(self): | |
1040 with create_sockets() as socks: | |
1041 fams = collections.defaultdict(int) | |
1042 types = collections.defaultdict(int) | |
1043 for s in socks: | |
1044 fams[s.family] += 1 | |
1045 # work around http://bugs.python.org/issue30204 | |
1046 types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1 | |
1047 self.assertGreaterEqual(fams[socket.AF_INET], 2) | |
1048 if supports_ipv6(): | |
1049 self.assertGreaterEqual(fams[socket.AF_INET6], 2) | |
1050 if POSIX and HAS_CONNECTIONS_UNIX: | |
1051 self.assertGreaterEqual(fams[socket.AF_UNIX], 2) | |
1052 self.assertGreaterEqual(types[socket.SOCK_STREAM], 2) | |
1053 self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2) | |
1054 | |
1055 | |
1056 class TestOtherUtils(unittest.TestCase): | |
1057 | |
1058 def test_is_namedtuple(self): | |
1059 assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3)) | |
1060 assert not is_namedtuple(tuple()) | |
1061 | |
1062 | |
1063 if __name__ == '__main__': | |
1064 from psutil.tests.runner import run | |
1065 run(__file__) |