Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_misc.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 #!/usr/bin/env python3 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
5 # Use of this source code is governed by a BSD-style license that can be | |
6 # found in the LICENSE file. | |
7 | |
8 """ | |
9 Miscellaneous tests. | |
10 """ | |
11 | |
12 import ast | |
13 import collections | |
14 import errno | |
15 import json | |
16 import os | |
17 import pickle | |
18 import socket | |
19 import stat | |
20 | |
21 from psutil import LINUX | |
22 from psutil import POSIX | |
23 from psutil import WINDOWS | |
24 from psutil._common import memoize | |
25 from psutil._common import memoize_when_activated | |
26 from psutil._common import supports_ipv6 | |
27 from psutil._common import wrap_numbers | |
28 from psutil._compat import PY3 | |
29 from psutil.tests import APPVEYOR | |
30 from psutil.tests import CI_TESTING | |
31 from psutil.tests import DEVNULL | |
32 from psutil.tests import HAS_BATTERY | |
33 from psutil.tests import HAS_MEMORY_MAPS | |
34 from psutil.tests import HAS_NET_IO_COUNTERS | |
35 from psutil.tests import HAS_SENSORS_BATTERY | |
36 from psutil.tests import HAS_SENSORS_FANS | |
37 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
38 from psutil.tests import import_module_by_path | |
39 from psutil.tests import mock | |
40 from psutil.tests import PsutilTestCase | |
41 from psutil.tests import PYTHON_EXE | |
42 from psutil.tests import reload_module | |
43 from psutil.tests import ROOT_DIR | |
44 from psutil.tests import SCRIPTS_DIR | |
45 from psutil.tests import sh | |
46 from psutil.tests import TRAVIS | |
47 from psutil.tests import unittest | |
48 import psutil | |
49 import psutil.tests | |
50 | |
51 | |
52 # =================================================================== | |
53 # --- Misc / generic tests. | |
54 # =================================================================== | |
55 | |
56 | |
57 class TestMisc(PsutilTestCase): | |
58 | |
59 def test_process__repr__(self, func=repr): | |
60 p = psutil.Process(self.spawn_testproc().pid) | |
61 r = func(p) | |
62 self.assertIn("psutil.Process", r) | |
63 self.assertIn("pid=%s" % p.pid, r) | |
64 self.assertIn("name='%s'" % p.name(), r) | |
65 self.assertIn("status=", r) | |
66 self.assertNotIn("exitcode=", r) | |
67 p.terminate() | |
68 p.wait() | |
69 r = func(p) | |
70 self.assertIn("status='terminated'", r) | |
71 self.assertIn("exitcode=", r) | |
72 | |
73 with mock.patch.object(psutil.Process, "name", | |
74 side_effect=psutil.ZombieProcess(os.getpid())): | |
75 p = psutil.Process() | |
76 r = func(p) | |
77 self.assertIn("pid=%s" % p.pid, r) | |
78 self.assertIn("status='zombie'", r) | |
79 self.assertNotIn("name=", r) | |
80 with mock.patch.object(psutil.Process, "name", | |
81 side_effect=psutil.NoSuchProcess(os.getpid())): | |
82 p = psutil.Process() | |
83 r = func(p) | |
84 self.assertIn("pid=%s" % p.pid, r) | |
85 self.assertIn("terminated", r) | |
86 self.assertNotIn("name=", r) | |
87 with mock.patch.object(psutil.Process, "name", | |
88 side_effect=psutil.AccessDenied(os.getpid())): | |
89 p = psutil.Process() | |
90 r = func(p) | |
91 self.assertIn("pid=%s" % p.pid, r) | |
92 self.assertNotIn("name=", r) | |
93 | |
94 def test_process__str__(self): | |
95 self.test_process__repr__(func=str) | |
96 | |
97 def test_no_such_process__repr__(self, func=repr): | |
98 self.assertEqual( | |
99 repr(psutil.NoSuchProcess(321)), | |
100 "psutil.NoSuchProcess process no longer exists (pid=321)") | |
101 self.assertEqual( | |
102 repr(psutil.NoSuchProcess(321, name='foo')), | |
103 "psutil.NoSuchProcess process no longer exists (pid=321, " | |
104 "name='foo')") | |
105 self.assertEqual( | |
106 repr(psutil.NoSuchProcess(321, msg='foo')), | |
107 "psutil.NoSuchProcess foo") | |
108 | |
109 def test_zombie_process__repr__(self, func=repr): | |
110 self.assertEqual( | |
111 repr(psutil.ZombieProcess(321)), | |
112 "psutil.ZombieProcess process still exists but it's a zombie " | |
113 "(pid=321)") | |
114 self.assertEqual( | |
115 repr(psutil.ZombieProcess(321, name='foo')), | |
116 "psutil.ZombieProcess process still exists but it's a zombie " | |
117 "(pid=321, name='foo')") | |
118 self.assertEqual( | |
119 repr(psutil.ZombieProcess(321, name='foo', ppid=1)), | |
120 "psutil.ZombieProcess process still exists but it's a zombie " | |
121 "(pid=321, name='foo', ppid=1)") | |
122 self.assertEqual( | |
123 repr(psutil.ZombieProcess(321, msg='foo')), | |
124 "psutil.ZombieProcess foo") | |
125 | |
126 def test_access_denied__repr__(self, func=repr): | |
127 self.assertEqual( | |
128 repr(psutil.AccessDenied(321)), | |
129 "psutil.AccessDenied (pid=321)") | |
130 self.assertEqual( | |
131 repr(psutil.AccessDenied(321, name='foo')), | |
132 "psutil.AccessDenied (pid=321, name='foo')") | |
133 self.assertEqual( | |
134 repr(psutil.AccessDenied(321, msg='foo')), | |
135 "psutil.AccessDenied foo") | |
136 | |
137 def test_timeout_expired__repr__(self, func=repr): | |
138 self.assertEqual( | |
139 repr(psutil.TimeoutExpired(321)), | |
140 "psutil.TimeoutExpired timeout after 321 seconds") | |
141 self.assertEqual( | |
142 repr(psutil.TimeoutExpired(321, pid=111)), | |
143 "psutil.TimeoutExpired timeout after 321 seconds (pid=111)") | |
144 self.assertEqual( | |
145 repr(psutil.TimeoutExpired(321, pid=111, name='foo')), | |
146 "psutil.TimeoutExpired timeout after 321 seconds " | |
147 "(pid=111, name='foo')") | |
148 | |
149 def test_process__eq__(self): | |
150 p1 = psutil.Process() | |
151 p2 = psutil.Process() | |
152 self.assertEqual(p1, p2) | |
153 p2._ident = (0, 0) | |
154 self.assertNotEqual(p1, p2) | |
155 self.assertNotEqual(p1, 'foo') | |
156 | |
157 def test_process__hash__(self): | |
158 s = set([psutil.Process(), psutil.Process()]) | |
159 self.assertEqual(len(s), 1) | |
160 | |
161 def test__all__(self): | |
162 dir_psutil = dir(psutil) | |
163 for name in dir_psutil: | |
164 if name in ('long', 'tests', 'test', 'PermissionError', | |
165 'ProcessLookupError'): | |
166 continue | |
167 if not name.startswith('_'): | |
168 try: | |
169 __import__(name) | |
170 except ImportError: | |
171 if name not in psutil.__all__: | |
172 fun = getattr(psutil, name) | |
173 if fun is None: | |
174 continue | |
175 if (fun.__doc__ is not None and | |
176 'deprecated' not in fun.__doc__.lower()): | |
177 self.fail('%r not in psutil.__all__' % name) | |
178 | |
179 # Import 'star' will break if __all__ is inconsistent, see: | |
180 # https://github.com/giampaolo/psutil/issues/656 | |
181 # Can't do `from psutil import *` as it won't work on python 3 | |
182 # so we simply iterate over __all__. | |
183 for name in psutil.__all__: | |
184 self.assertIn(name, dir_psutil) | |
185 | |
186 def test_version(self): | |
187 self.assertEqual('.'.join([str(x) for x in psutil.version_info]), | |
188 psutil.__version__) | |
189 | |
190 def test_process_as_dict_no_new_names(self): | |
191 # See https://github.com/giampaolo/psutil/issues/813 | |
192 p = psutil.Process() | |
193 p.foo = '1' | |
194 self.assertNotIn('foo', p.as_dict()) | |
195 | |
196 def test_memoize(self): | |
197 @memoize | |
198 def foo(*args, **kwargs): | |
199 "foo docstring" | |
200 calls.append(None) | |
201 return (args, kwargs) | |
202 | |
203 calls = [] | |
204 # no args | |
205 for x in range(2): | |
206 ret = foo() | |
207 expected = ((), {}) | |
208 self.assertEqual(ret, expected) | |
209 self.assertEqual(len(calls), 1) | |
210 # with args | |
211 for x in range(2): | |
212 ret = foo(1) | |
213 expected = ((1, ), {}) | |
214 self.assertEqual(ret, expected) | |
215 self.assertEqual(len(calls), 2) | |
216 # with args + kwargs | |
217 for x in range(2): | |
218 ret = foo(1, bar=2) | |
219 expected = ((1, ), {'bar': 2}) | |
220 self.assertEqual(ret, expected) | |
221 self.assertEqual(len(calls), 3) | |
222 # clear cache | |
223 foo.cache_clear() | |
224 ret = foo() | |
225 expected = ((), {}) | |
226 self.assertEqual(ret, expected) | |
227 self.assertEqual(len(calls), 4) | |
228 # docstring | |
229 self.assertEqual(foo.__doc__, "foo docstring") | |
230 | |
231 def test_memoize_when_activated(self): | |
232 class Foo: | |
233 | |
234 @memoize_when_activated | |
235 def foo(self): | |
236 calls.append(None) | |
237 | |
238 f = Foo() | |
239 calls = [] | |
240 f.foo() | |
241 f.foo() | |
242 self.assertEqual(len(calls), 2) | |
243 | |
244 # activate | |
245 calls = [] | |
246 f.foo.cache_activate(f) | |
247 f.foo() | |
248 f.foo() | |
249 self.assertEqual(len(calls), 1) | |
250 | |
251 # deactivate | |
252 calls = [] | |
253 f.foo.cache_deactivate(f) | |
254 f.foo() | |
255 f.foo() | |
256 self.assertEqual(len(calls), 2) | |
257 | |
258 def test_parse_environ_block(self): | |
259 from psutil._common import parse_environ_block | |
260 | |
261 def k(s): | |
262 return s.upper() if WINDOWS else s | |
263 | |
264 self.assertEqual(parse_environ_block("a=1\0"), | |
265 {k("a"): "1"}) | |
266 self.assertEqual(parse_environ_block("a=1\0b=2\0\0"), | |
267 {k("a"): "1", k("b"): "2"}) | |
268 self.assertEqual(parse_environ_block("a=1\0b=\0\0"), | |
269 {k("a"): "1", k("b"): ""}) | |
270 # ignore everything after \0\0 | |
271 self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"), | |
272 {k("a"): "1", k("b"): "2"}) | |
273 # ignore everything that is not an assignment | |
274 self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"}) | |
275 self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"}) | |
276 # do not fail if the block is incomplete | |
277 self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"}) | |
278 | |
279 def test_supports_ipv6(self): | |
280 self.addCleanup(supports_ipv6.cache_clear) | |
281 if supports_ipv6(): | |
282 with mock.patch('psutil._common.socket') as s: | |
283 s.has_ipv6 = False | |
284 supports_ipv6.cache_clear() | |
285 assert not supports_ipv6() | |
286 | |
287 supports_ipv6.cache_clear() | |
288 with mock.patch('psutil._common.socket.socket', | |
289 side_effect=socket.error) as s: | |
290 assert not supports_ipv6() | |
291 assert s.called | |
292 | |
293 supports_ipv6.cache_clear() | |
294 with mock.patch('psutil._common.socket.socket', | |
295 side_effect=socket.gaierror) as s: | |
296 assert not supports_ipv6() | |
297 supports_ipv6.cache_clear() | |
298 assert s.called | |
299 | |
300 supports_ipv6.cache_clear() | |
301 with mock.patch('psutil._common.socket.socket.bind', | |
302 side_effect=socket.gaierror) as s: | |
303 assert not supports_ipv6() | |
304 supports_ipv6.cache_clear() | |
305 assert s.called | |
306 else: | |
307 with self.assertRaises(Exception): | |
308 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
309 try: | |
310 sock.bind(("::1", 0)) | |
311 finally: | |
312 sock.close() | |
313 | |
314 def test_isfile_strict(self): | |
315 from psutil._common import isfile_strict | |
316 this_file = os.path.abspath(__file__) | |
317 assert isfile_strict(this_file) | |
318 assert not isfile_strict(os.path.dirname(this_file)) | |
319 with mock.patch('psutil._common.os.stat', | |
320 side_effect=OSError(errno.EPERM, "foo")): | |
321 self.assertRaises(OSError, isfile_strict, this_file) | |
322 with mock.patch('psutil._common.os.stat', | |
323 side_effect=OSError(errno.EACCES, "foo")): | |
324 self.assertRaises(OSError, isfile_strict, this_file) | |
325 with mock.patch('psutil._common.os.stat', | |
326 side_effect=OSError(errno.ENOENT, "foo")): | |
327 assert not isfile_strict(this_file) | |
328 with mock.patch('psutil._common.stat.S_ISREG', return_value=False): | |
329 assert not isfile_strict(this_file) | |
330 | |
331 def test_serialization(self): | |
332 def check(ret): | |
333 if json is not None: | |
334 json.loads(json.dumps(ret)) | |
335 a = pickle.dumps(ret) | |
336 b = pickle.loads(a) | |
337 self.assertEqual(ret, b) | |
338 | |
339 check(psutil.Process().as_dict()) | |
340 check(psutil.virtual_memory()) | |
341 check(psutil.swap_memory()) | |
342 check(psutil.cpu_times()) | |
343 check(psutil.cpu_times_percent(interval=0)) | |
344 check(psutil.net_io_counters()) | |
345 if LINUX and not os.path.exists('/proc/diskstats'): | |
346 pass | |
347 else: | |
348 if not APPVEYOR: | |
349 check(psutil.disk_io_counters()) | |
350 check(psutil.disk_partitions()) | |
351 check(psutil.disk_usage(os.getcwd())) | |
352 check(psutil.users()) | |
353 | |
354 def test_setup_script(self): | |
355 setup_py = os.path.join(ROOT_DIR, 'setup.py') | |
356 if CI_TESTING and not os.path.exists(setup_py): | |
357 return self.skipTest("can't find setup.py") | |
358 module = import_module_by_path(setup_py) | |
359 self.assertRaises(SystemExit, module.setup) | |
360 self.assertEqual(module.get_version(), psutil.__version__) | |
361 | |
362 def test_ad_on_process_creation(self): | |
363 # We are supposed to be able to instantiate Process also in case | |
364 # of zombie processes or access denied. | |
365 with mock.patch.object(psutil.Process, 'create_time', | |
366 side_effect=psutil.AccessDenied) as meth: | |
367 psutil.Process() | |
368 assert meth.called | |
369 with mock.patch.object(psutil.Process, 'create_time', | |
370 side_effect=psutil.ZombieProcess(1)) as meth: | |
371 psutil.Process() | |
372 assert meth.called | |
373 with mock.patch.object(psutil.Process, 'create_time', | |
374 side_effect=ValueError) as meth: | |
375 with self.assertRaises(ValueError): | |
376 psutil.Process() | |
377 assert meth.called | |
378 | |
379 def test_sanity_version_check(self): | |
380 # see: https://github.com/giampaolo/psutil/issues/564 | |
381 with mock.patch( | |
382 "psutil._psplatform.cext.version", return_value="0.0.0"): | |
383 with self.assertRaises(ImportError) as cm: | |
384 reload_module(psutil) | |
385 self.assertIn("version conflict", str(cm.exception).lower()) | |
386 | |
387 | |
388 # =================================================================== | |
389 # --- Tests for wrap_numbers() function. | |
390 # =================================================================== | |
391 | |
392 | |
393 nt = collections.namedtuple('foo', 'a b c') | |
394 | |
395 | |
396 class TestWrapNumbers(PsutilTestCase): | |
397 | |
398 def setUp(self): | |
399 wrap_numbers.cache_clear() | |
400 | |
401 tearDown = setUp | |
402 | |
403 def test_first_call(self): | |
404 input = {'disk1': nt(5, 5, 5)} | |
405 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
406 | |
407 def test_input_hasnt_changed(self): | |
408 input = {'disk1': nt(5, 5, 5)} | |
409 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
410 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
411 | |
412 def test_increase_but_no_wrap(self): | |
413 input = {'disk1': nt(5, 5, 5)} | |
414 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
415 input = {'disk1': nt(10, 15, 20)} | |
416 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
417 input = {'disk1': nt(20, 25, 30)} | |
418 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
419 input = {'disk1': nt(20, 25, 30)} | |
420 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
421 | |
422 def test_wrap(self): | |
423 # let's say 100 is the threshold | |
424 input = {'disk1': nt(100, 100, 100)} | |
425 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
426 # first wrap restarts from 10 | |
427 input = {'disk1': nt(100, 100, 10)} | |
428 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
429 {'disk1': nt(100, 100, 110)}) | |
430 # then it remains the same | |
431 input = {'disk1': nt(100, 100, 10)} | |
432 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
433 {'disk1': nt(100, 100, 110)}) | |
434 # then it goes up | |
435 input = {'disk1': nt(100, 100, 90)} | |
436 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
437 {'disk1': nt(100, 100, 190)}) | |
438 # then it wraps again | |
439 input = {'disk1': nt(100, 100, 20)} | |
440 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
441 {'disk1': nt(100, 100, 210)}) | |
442 # and remains the same | |
443 input = {'disk1': nt(100, 100, 20)} | |
444 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
445 {'disk1': nt(100, 100, 210)}) | |
446 # now wrap another num | |
447 input = {'disk1': nt(50, 100, 20)} | |
448 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
449 {'disk1': nt(150, 100, 210)}) | |
450 # and again | |
451 input = {'disk1': nt(40, 100, 20)} | |
452 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
453 {'disk1': nt(190, 100, 210)}) | |
454 # keep it the same | |
455 input = {'disk1': nt(40, 100, 20)} | |
456 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
457 {'disk1': nt(190, 100, 210)}) | |
458 | |
459 def test_changing_keys(self): | |
460 # Emulate a case where the second call to disk_io() | |
461 # (or whatever) provides a new disk, then the new disk | |
462 # disappears on the third call. | |
463 input = {'disk1': nt(5, 5, 5)} | |
464 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
465 input = {'disk1': nt(5, 5, 5), | |
466 'disk2': nt(7, 7, 7)} | |
467 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
468 input = {'disk1': nt(8, 8, 8)} | |
469 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
470 | |
471 def test_changing_keys_w_wrap(self): | |
472 input = {'disk1': nt(50, 50, 50), | |
473 'disk2': nt(100, 100, 100)} | |
474 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
475 # disk 2 wraps | |
476 input = {'disk1': nt(50, 50, 50), | |
477 'disk2': nt(100, 100, 10)} | |
478 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
479 {'disk1': nt(50, 50, 50), | |
480 'disk2': nt(100, 100, 110)}) | |
481 # disk 2 disappears | |
482 input = {'disk1': nt(50, 50, 50)} | |
483 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
484 | |
485 # then it appears again; the old wrap is supposed to be | |
486 # gone. | |
487 input = {'disk1': nt(50, 50, 50), | |
488 'disk2': nt(100, 100, 100)} | |
489 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
490 # remains the same | |
491 input = {'disk1': nt(50, 50, 50), | |
492 'disk2': nt(100, 100, 100)} | |
493 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
494 # and then wraps again | |
495 input = {'disk1': nt(50, 50, 50), | |
496 'disk2': nt(100, 100, 10)} | |
497 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
498 {'disk1': nt(50, 50, 50), | |
499 'disk2': nt(100, 100, 110)}) | |
500 | |
501 def test_real_data(self): | |
502 d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), | |
503 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), | |
504 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), | |
505 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)} | |
506 self.assertEqual(wrap_numbers(d, 'disk_io'), d) | |
507 self.assertEqual(wrap_numbers(d, 'disk_io'), d) | |
508 # decrease this ↓ | |
509 d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), | |
510 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), | |
511 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), | |
512 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)} | |
513 out = wrap_numbers(d, 'disk_io') | |
514 self.assertEqual(out['nvme0n1'][0], 400) | |
515 | |
516 # --- cache tests | |
517 | |
518 def test_cache_first_call(self): | |
519 input = {'disk1': nt(5, 5, 5)} | |
520 wrap_numbers(input, 'disk_io') | |
521 cache = wrap_numbers.cache_info() | |
522 self.assertEqual(cache[0], {'disk_io': input}) | |
523 self.assertEqual(cache[1], {'disk_io': {}}) | |
524 self.assertEqual(cache[2], {'disk_io': {}}) | |
525 | |
526 def test_cache_call_twice(self): | |
527 input = {'disk1': nt(5, 5, 5)} | |
528 wrap_numbers(input, 'disk_io') | |
529 input = {'disk1': nt(10, 10, 10)} | |
530 wrap_numbers(input, 'disk_io') | |
531 cache = wrap_numbers.cache_info() | |
532 self.assertEqual(cache[0], {'disk_io': input}) | |
533 self.assertEqual( | |
534 cache[1], | |
535 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}}) | |
536 self.assertEqual(cache[2], {'disk_io': {}}) | |
537 | |
538 def test_cache_wrap(self): | |
539 # let's say 100 is the threshold | |
540 input = {'disk1': nt(100, 100, 100)} | |
541 wrap_numbers(input, 'disk_io') | |
542 | |
543 # first wrap restarts from 10 | |
544 input = {'disk1': nt(100, 100, 10)} | |
545 wrap_numbers(input, 'disk_io') | |
546 cache = wrap_numbers.cache_info() | |
547 self.assertEqual(cache[0], {'disk_io': input}) | |
548 self.assertEqual( | |
549 cache[1], | |
550 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}}) | |
551 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
552 | |
553 def assert_(): | |
554 cache = wrap_numbers.cache_info() | |
555 self.assertEqual( | |
556 cache[1], | |
557 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, | |
558 ('disk1', 2): 100}}) | |
559 self.assertEqual(cache[2], | |
560 {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
561 | |
562 # then it remains the same | |
563 input = {'disk1': nt(100, 100, 10)} | |
564 wrap_numbers(input, 'disk_io') | |
565 cache = wrap_numbers.cache_info() | |
566 self.assertEqual(cache[0], {'disk_io': input}) | |
567 assert_() | |
568 | |
569 # then it goes up | |
570 input = {'disk1': nt(100, 100, 90)} | |
571 wrap_numbers(input, 'disk_io') | |
572 cache = wrap_numbers.cache_info() | |
573 self.assertEqual(cache[0], {'disk_io': input}) | |
574 assert_() | |
575 | |
576 # then it wraps again | |
577 input = {'disk1': nt(100, 100, 20)} | |
578 wrap_numbers(input, 'disk_io') | |
579 cache = wrap_numbers.cache_info() | |
580 self.assertEqual(cache[0], {'disk_io': input}) | |
581 self.assertEqual( | |
582 cache[1], | |
583 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}}) | |
584 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
585 | |
586 def test_cache_changing_keys(self): | |
587 input = {'disk1': nt(5, 5, 5)} | |
588 wrap_numbers(input, 'disk_io') | |
589 input = {'disk1': nt(5, 5, 5), | |
590 'disk2': nt(7, 7, 7)} | |
591 wrap_numbers(input, 'disk_io') | |
592 cache = wrap_numbers.cache_info() | |
593 self.assertEqual(cache[0], {'disk_io': input}) | |
594 self.assertEqual( | |
595 cache[1], | |
596 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}}) | |
597 self.assertEqual(cache[2], {'disk_io': {}}) | |
598 | |
599 def test_cache_clear(self): | |
600 input = {'disk1': nt(5, 5, 5)} | |
601 wrap_numbers(input, 'disk_io') | |
602 wrap_numbers(input, 'disk_io') | |
603 wrap_numbers.cache_clear('disk_io') | |
604 self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {})) | |
605 wrap_numbers.cache_clear('disk_io') | |
606 wrap_numbers.cache_clear('?!?') | |
607 | |
608 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
609 def test_cache_clear_public_apis(self): | |
610 if not psutil.disk_io_counters() or not psutil.net_io_counters(): | |
611 return self.skipTest("no disks or NICs available") | |
612 psutil.disk_io_counters() | |
613 psutil.net_io_counters() | |
614 caches = wrap_numbers.cache_info() | |
615 for cache in caches: | |
616 self.assertIn('psutil.disk_io_counters', cache) | |
617 self.assertIn('psutil.net_io_counters', cache) | |
618 | |
619 psutil.disk_io_counters.cache_clear() | |
620 caches = wrap_numbers.cache_info() | |
621 for cache in caches: | |
622 self.assertIn('psutil.net_io_counters', cache) | |
623 self.assertNotIn('psutil.disk_io_counters', cache) | |
624 | |
625 psutil.net_io_counters.cache_clear() | |
626 caches = wrap_numbers.cache_info() | |
627 self.assertEqual(caches, ({}, {}, {})) | |
628 | |
629 | |
630 # =================================================================== | |
631 # --- Example script tests | |
632 # =================================================================== | |
633 | |
634 | |
635 @unittest.skipIf(not os.path.exists(SCRIPTS_DIR), | |
636 "can't locate scripts directory") | |
637 class TestScripts(PsutilTestCase): | |
638 """Tests for scripts in the "scripts" directory.""" | |
639 | |
640 @staticmethod | |
641 def assert_stdout(exe, *args, **kwargs): | |
642 exe = '%s' % os.path.join(SCRIPTS_DIR, exe) | |
643 cmd = [PYTHON_EXE, exe] | |
644 for arg in args: | |
645 cmd.append(arg) | |
646 try: | |
647 out = sh(cmd, **kwargs).strip() | |
648 except RuntimeError as err: | |
649 if 'AccessDenied' in str(err): | |
650 return str(err) | |
651 else: | |
652 raise | |
653 assert out, out | |
654 return out | |
655 | |
656 @staticmethod | |
657 def assert_syntax(exe, args=None): | |
658 exe = os.path.join(SCRIPTS_DIR, exe) | |
659 if PY3: | |
660 f = open(exe, 'rt', encoding='utf8') | |
661 else: | |
662 f = open(exe, 'rt') | |
663 with f: | |
664 src = f.read() | |
665 ast.parse(src) | |
666 | |
667 def test_coverage(self): | |
668 # make sure all example scripts have a test method defined | |
669 meths = dir(self) | |
670 for name in os.listdir(SCRIPTS_DIR): | |
671 if name.endswith('.py'): | |
672 if 'test_' + os.path.splitext(name)[0] not in meths: | |
673 # self.assert_stdout(name) | |
674 self.fail('no test defined for %r script' | |
675 % os.path.join(SCRIPTS_DIR, name)) | |
676 | |
677 @unittest.skipIf(not POSIX, "POSIX only") | |
678 def test_executable(self): | |
679 for name in os.listdir(SCRIPTS_DIR): | |
680 if name.endswith('.py'): | |
681 path = os.path.join(SCRIPTS_DIR, name) | |
682 if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]: | |
683 self.fail('%r is not executable' % path) | |
684 | |
685 def test_disk_usage(self): | |
686 self.assert_stdout('disk_usage.py') | |
687 | |
688 def test_free(self): | |
689 self.assert_stdout('free.py') | |
690 | |
691 def test_meminfo(self): | |
692 self.assert_stdout('meminfo.py') | |
693 | |
694 def test_procinfo(self): | |
695 self.assert_stdout('procinfo.py', str(os.getpid())) | |
696 | |
697 @unittest.skipIf(CI_TESTING and not psutil.users(), "no users") | |
698 def test_who(self): | |
699 self.assert_stdout('who.py') | |
700 | |
701 def test_ps(self): | |
702 self.assert_stdout('ps.py') | |
703 | |
704 def test_pstree(self): | |
705 self.assert_stdout('pstree.py') | |
706 | |
707 def test_netstat(self): | |
708 self.assert_stdout('netstat.py') | |
709 | |
710 # permission denied on travis | |
711 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
712 def test_ifconfig(self): | |
713 self.assert_stdout('ifconfig.py') | |
714 | |
715 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
716 def test_pmap(self): | |
717 self.assert_stdout('pmap.py', str(os.getpid())) | |
718 | |
719 def test_procsmem(self): | |
720 if 'uss' not in psutil.Process().memory_full_info()._fields: | |
721 raise self.skipTest("not supported") | |
722 self.assert_stdout('procsmem.py', stderr=DEVNULL) | |
723 | |
724 def test_killall(self): | |
725 self.assert_syntax('killall.py') | |
726 | |
727 def test_nettop(self): | |
728 self.assert_syntax('nettop.py') | |
729 | |
730 def test_top(self): | |
731 self.assert_syntax('top.py') | |
732 | |
733 def test_iotop(self): | |
734 self.assert_syntax('iotop.py') | |
735 | |
736 def test_pidof(self): | |
737 output = self.assert_stdout('pidof.py', psutil.Process().name()) | |
738 self.assertIn(str(os.getpid()), output) | |
739 | |
740 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
741 def test_winservices(self): | |
742 self.assert_stdout('winservices.py') | |
743 | |
744 def test_cpu_distribution(self): | |
745 self.assert_syntax('cpu_distribution.py') | |
746 | |
747 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
748 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
749 def test_temperatures(self): | |
750 if not psutil.sensors_temperatures(): | |
751 self.skipTest("no temperatures") | |
752 self.assert_stdout('temperatures.py') | |
753 | |
754 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
755 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
756 def test_fans(self): | |
757 if not psutil.sensors_fans(): | |
758 self.skipTest("no fans") | |
759 self.assert_stdout('fans.py') | |
760 | |
761 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
762 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
763 def test_battery(self): | |
764 self.assert_stdout('battery.py') | |
765 | |
766 def test_sensors(self): | |
767 self.assert_stdout('sensors.py') | |
768 | |
769 | |
770 if __name__ == '__main__': | |
771 from psutil.tests.runner import run_from_name | |
772 run_from_name(__file__) |