comparison planemo/lib/python3.7/site-packages/psutil/tests/test_misc.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
5 # Use of this source code is governed by a BSD-style license that can be
6 # found in the LICENSE file.
7
8 """
9 Miscellaneous tests.
10 """
11
12 import ast
13 import collections
14 import errno
15 import json
16 import os
17 import pickle
18 import socket
19 import stat
20
21 from psutil import LINUX
22 from psutil import POSIX
23 from psutil import WINDOWS
24 from psutil._common import memoize
25 from psutil._common import memoize_when_activated
26 from psutil._common import supports_ipv6
27 from psutil._common import wrap_numbers
28 from psutil._compat import PY3
29 from psutil.tests import APPVEYOR
30 from psutil.tests import CI_TESTING
31 from psutil.tests import DEVNULL
32 from psutil.tests import HAS_BATTERY
33 from psutil.tests import HAS_MEMORY_MAPS
34 from psutil.tests import HAS_NET_IO_COUNTERS
35 from psutil.tests import HAS_SENSORS_BATTERY
36 from psutil.tests import HAS_SENSORS_FANS
37 from psutil.tests import HAS_SENSORS_TEMPERATURES
38 from psutil.tests import import_module_by_path
39 from psutil.tests import mock
40 from psutil.tests import PsutilTestCase
41 from psutil.tests import PYTHON_EXE
42 from psutil.tests import reload_module
43 from psutil.tests import ROOT_DIR
44 from psutil.tests import SCRIPTS_DIR
45 from psutil.tests import sh
46 from psutil.tests import TRAVIS
47 from psutil.tests import unittest
48 import psutil
49 import psutil.tests
50
51
52 # ===================================================================
53 # --- Misc / generic tests.
54 # ===================================================================
55
56
57 class TestMisc(PsutilTestCase):
58
59 def test_process__repr__(self, func=repr):
60 p = psutil.Process(self.spawn_testproc().pid)
61 r = func(p)
62 self.assertIn("psutil.Process", r)
63 self.assertIn("pid=%s" % p.pid, r)
64 self.assertIn("name='%s'" % p.name(), r)
65 self.assertIn("status=", r)
66 self.assertNotIn("exitcode=", r)
67 p.terminate()
68 p.wait()
69 r = func(p)
70 self.assertIn("status='terminated'", r)
71 self.assertIn("exitcode=", r)
72
73 with mock.patch.object(psutil.Process, "name",
74 side_effect=psutil.ZombieProcess(os.getpid())):
75 p = psutil.Process()
76 r = func(p)
77 self.assertIn("pid=%s" % p.pid, r)
78 self.assertIn("status='zombie'", r)
79 self.assertNotIn("name=", r)
80 with mock.patch.object(psutil.Process, "name",
81 side_effect=psutil.NoSuchProcess(os.getpid())):
82 p = psutil.Process()
83 r = func(p)
84 self.assertIn("pid=%s" % p.pid, r)
85 self.assertIn("terminated", r)
86 self.assertNotIn("name=", r)
87 with mock.patch.object(psutil.Process, "name",
88 side_effect=psutil.AccessDenied(os.getpid())):
89 p = psutil.Process()
90 r = func(p)
91 self.assertIn("pid=%s" % p.pid, r)
92 self.assertNotIn("name=", r)
93
94 def test_process__str__(self):
95 self.test_process__repr__(func=str)
96
97 def test_no_such_process__repr__(self, func=repr):
98 self.assertEqual(
99 repr(psutil.NoSuchProcess(321)),
100 "psutil.NoSuchProcess process no longer exists (pid=321)")
101 self.assertEqual(
102 repr(psutil.NoSuchProcess(321, name='foo')),
103 "psutil.NoSuchProcess process no longer exists (pid=321, "
104 "name='foo')")
105 self.assertEqual(
106 repr(psutil.NoSuchProcess(321, msg='foo')),
107 "psutil.NoSuchProcess foo")
108
109 def test_zombie_process__repr__(self, func=repr):
110 self.assertEqual(
111 repr(psutil.ZombieProcess(321)),
112 "psutil.ZombieProcess process still exists but it's a zombie "
113 "(pid=321)")
114 self.assertEqual(
115 repr(psutil.ZombieProcess(321, name='foo')),
116 "psutil.ZombieProcess process still exists but it's a zombie "
117 "(pid=321, name='foo')")
118 self.assertEqual(
119 repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
120 "psutil.ZombieProcess process still exists but it's a zombie "
121 "(pid=321, name='foo', ppid=1)")
122 self.assertEqual(
123 repr(psutil.ZombieProcess(321, msg='foo')),
124 "psutil.ZombieProcess foo")
125
126 def test_access_denied__repr__(self, func=repr):
127 self.assertEqual(
128 repr(psutil.AccessDenied(321)),
129 "psutil.AccessDenied (pid=321)")
130 self.assertEqual(
131 repr(psutil.AccessDenied(321, name='foo')),
132 "psutil.AccessDenied (pid=321, name='foo')")
133 self.assertEqual(
134 repr(psutil.AccessDenied(321, msg='foo')),
135 "psutil.AccessDenied foo")
136
137 def test_timeout_expired__repr__(self, func=repr):
138 self.assertEqual(
139 repr(psutil.TimeoutExpired(321)),
140 "psutil.TimeoutExpired timeout after 321 seconds")
141 self.assertEqual(
142 repr(psutil.TimeoutExpired(321, pid=111)),
143 "psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
144 self.assertEqual(
145 repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
146 "psutil.TimeoutExpired timeout after 321 seconds "
147 "(pid=111, name='foo')")
148
149 def test_process__eq__(self):
150 p1 = psutil.Process()
151 p2 = psutil.Process()
152 self.assertEqual(p1, p2)
153 p2._ident = (0, 0)
154 self.assertNotEqual(p1, p2)
155 self.assertNotEqual(p1, 'foo')
156
157 def test_process__hash__(self):
158 s = set([psutil.Process(), psutil.Process()])
159 self.assertEqual(len(s), 1)
160
161 def test__all__(self):
162 dir_psutil = dir(psutil)
163 for name in dir_psutil:
164 if name in ('long', 'tests', 'test', 'PermissionError',
165 'ProcessLookupError'):
166 continue
167 if not name.startswith('_'):
168 try:
169 __import__(name)
170 except ImportError:
171 if name not in psutil.__all__:
172 fun = getattr(psutil, name)
173 if fun is None:
174 continue
175 if (fun.__doc__ is not None and
176 'deprecated' not in fun.__doc__.lower()):
177 self.fail('%r not in psutil.__all__' % name)
178
179 # Import 'star' will break if __all__ is inconsistent, see:
180 # https://github.com/giampaolo/psutil/issues/656
181 # Can't do `from psutil import *` as it won't work on python 3
182 # so we simply iterate over __all__.
183 for name in psutil.__all__:
184 self.assertIn(name, dir_psutil)
185
186 def test_version(self):
187 self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
188 psutil.__version__)
189
190 def test_process_as_dict_no_new_names(self):
191 # See https://github.com/giampaolo/psutil/issues/813
192 p = psutil.Process()
193 p.foo = '1'
194 self.assertNotIn('foo', p.as_dict())
195
196 def test_memoize(self):
197 @memoize
198 def foo(*args, **kwargs):
199 "foo docstring"
200 calls.append(None)
201 return (args, kwargs)
202
203 calls = []
204 # no args
205 for x in range(2):
206 ret = foo()
207 expected = ((), {})
208 self.assertEqual(ret, expected)
209 self.assertEqual(len(calls), 1)
210 # with args
211 for x in range(2):
212 ret = foo(1)
213 expected = ((1, ), {})
214 self.assertEqual(ret, expected)
215 self.assertEqual(len(calls), 2)
216 # with args + kwargs
217 for x in range(2):
218 ret = foo(1, bar=2)
219 expected = ((1, ), {'bar': 2})
220 self.assertEqual(ret, expected)
221 self.assertEqual(len(calls), 3)
222 # clear cache
223 foo.cache_clear()
224 ret = foo()
225 expected = ((), {})
226 self.assertEqual(ret, expected)
227 self.assertEqual(len(calls), 4)
228 # docstring
229 self.assertEqual(foo.__doc__, "foo docstring")
230
231 def test_memoize_when_activated(self):
232 class Foo:
233
234 @memoize_when_activated
235 def foo(self):
236 calls.append(None)
237
238 f = Foo()
239 calls = []
240 f.foo()
241 f.foo()
242 self.assertEqual(len(calls), 2)
243
244 # activate
245 calls = []
246 f.foo.cache_activate(f)
247 f.foo()
248 f.foo()
249 self.assertEqual(len(calls), 1)
250
251 # deactivate
252 calls = []
253 f.foo.cache_deactivate(f)
254 f.foo()
255 f.foo()
256 self.assertEqual(len(calls), 2)
257
258 def test_parse_environ_block(self):
259 from psutil._common import parse_environ_block
260
261 def k(s):
262 return s.upper() if WINDOWS else s
263
264 self.assertEqual(parse_environ_block("a=1\0"),
265 {k("a"): "1"})
266 self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
267 {k("a"): "1", k("b"): "2"})
268 self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
269 {k("a"): "1", k("b"): ""})
270 # ignore everything after \0\0
271 self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
272 {k("a"): "1", k("b"): "2"})
273 # ignore everything that is not an assignment
274 self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
275 self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
276 # do not fail if the block is incomplete
277 self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
278
279 def test_supports_ipv6(self):
280 self.addCleanup(supports_ipv6.cache_clear)
281 if supports_ipv6():
282 with mock.patch('psutil._common.socket') as s:
283 s.has_ipv6 = False
284 supports_ipv6.cache_clear()
285 assert not supports_ipv6()
286
287 supports_ipv6.cache_clear()
288 with mock.patch('psutil._common.socket.socket',
289 side_effect=socket.error) as s:
290 assert not supports_ipv6()
291 assert s.called
292
293 supports_ipv6.cache_clear()
294 with mock.patch('psutil._common.socket.socket',
295 side_effect=socket.gaierror) as s:
296 assert not supports_ipv6()
297 supports_ipv6.cache_clear()
298 assert s.called
299
300 supports_ipv6.cache_clear()
301 with mock.patch('psutil._common.socket.socket.bind',
302 side_effect=socket.gaierror) as s:
303 assert not supports_ipv6()
304 supports_ipv6.cache_clear()
305 assert s.called
306 else:
307 with self.assertRaises(Exception):
308 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
309 try:
310 sock.bind(("::1", 0))
311 finally:
312 sock.close()
313
314 def test_isfile_strict(self):
315 from psutil._common import isfile_strict
316 this_file = os.path.abspath(__file__)
317 assert isfile_strict(this_file)
318 assert not isfile_strict(os.path.dirname(this_file))
319 with mock.patch('psutil._common.os.stat',
320 side_effect=OSError(errno.EPERM, "foo")):
321 self.assertRaises(OSError, isfile_strict, this_file)
322 with mock.patch('psutil._common.os.stat',
323 side_effect=OSError(errno.EACCES, "foo")):
324 self.assertRaises(OSError, isfile_strict, this_file)
325 with mock.patch('psutil._common.os.stat',
326 side_effect=OSError(errno.ENOENT, "foo")):
327 assert not isfile_strict(this_file)
328 with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
329 assert not isfile_strict(this_file)
330
331 def test_serialization(self):
332 def check(ret):
333 if json is not None:
334 json.loads(json.dumps(ret))
335 a = pickle.dumps(ret)
336 b = pickle.loads(a)
337 self.assertEqual(ret, b)
338
339 check(psutil.Process().as_dict())
340 check(psutil.virtual_memory())
341 check(psutil.swap_memory())
342 check(psutil.cpu_times())
343 check(psutil.cpu_times_percent(interval=0))
344 check(psutil.net_io_counters())
345 if LINUX and not os.path.exists('/proc/diskstats'):
346 pass
347 else:
348 if not APPVEYOR:
349 check(psutil.disk_io_counters())
350 check(psutil.disk_partitions())
351 check(psutil.disk_usage(os.getcwd()))
352 check(psutil.users())
353
354 def test_setup_script(self):
355 setup_py = os.path.join(ROOT_DIR, 'setup.py')
356 if CI_TESTING and not os.path.exists(setup_py):
357 return self.skipTest("can't find setup.py")
358 module = import_module_by_path(setup_py)
359 self.assertRaises(SystemExit, module.setup)
360 self.assertEqual(module.get_version(), psutil.__version__)
361
362 def test_ad_on_process_creation(self):
363 # We are supposed to be able to instantiate Process also in case
364 # of zombie processes or access denied.
365 with mock.patch.object(psutil.Process, 'create_time',
366 side_effect=psutil.AccessDenied) as meth:
367 psutil.Process()
368 assert meth.called
369 with mock.patch.object(psutil.Process, 'create_time',
370 side_effect=psutil.ZombieProcess(1)) as meth:
371 psutil.Process()
372 assert meth.called
373 with mock.patch.object(psutil.Process, 'create_time',
374 side_effect=ValueError) as meth:
375 with self.assertRaises(ValueError):
376 psutil.Process()
377 assert meth.called
378
379 def test_sanity_version_check(self):
380 # see: https://github.com/giampaolo/psutil/issues/564
381 with mock.patch(
382 "psutil._psplatform.cext.version", return_value="0.0.0"):
383 with self.assertRaises(ImportError) as cm:
384 reload_module(psutil)
385 self.assertIn("version conflict", str(cm.exception).lower())
386
387
388 # ===================================================================
389 # --- Tests for wrap_numbers() function.
390 # ===================================================================
391
392
393 nt = collections.namedtuple('foo', 'a b c')
394
395
396 class TestWrapNumbers(PsutilTestCase):
397
398 def setUp(self):
399 wrap_numbers.cache_clear()
400
401 tearDown = setUp
402
403 def test_first_call(self):
404 input = {'disk1': nt(5, 5, 5)}
405 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
406
407 def test_input_hasnt_changed(self):
408 input = {'disk1': nt(5, 5, 5)}
409 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
410 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
411
412 def test_increase_but_no_wrap(self):
413 input = {'disk1': nt(5, 5, 5)}
414 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
415 input = {'disk1': nt(10, 15, 20)}
416 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
417 input = {'disk1': nt(20, 25, 30)}
418 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
419 input = {'disk1': nt(20, 25, 30)}
420 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
421
422 def test_wrap(self):
423 # let's say 100 is the threshold
424 input = {'disk1': nt(100, 100, 100)}
425 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
426 # first wrap restarts from 10
427 input = {'disk1': nt(100, 100, 10)}
428 self.assertEqual(wrap_numbers(input, 'disk_io'),
429 {'disk1': nt(100, 100, 110)})
430 # then it remains the same
431 input = {'disk1': nt(100, 100, 10)}
432 self.assertEqual(wrap_numbers(input, 'disk_io'),
433 {'disk1': nt(100, 100, 110)})
434 # then it goes up
435 input = {'disk1': nt(100, 100, 90)}
436 self.assertEqual(wrap_numbers(input, 'disk_io'),
437 {'disk1': nt(100, 100, 190)})
438 # then it wraps again
439 input = {'disk1': nt(100, 100, 20)}
440 self.assertEqual(wrap_numbers(input, 'disk_io'),
441 {'disk1': nt(100, 100, 210)})
442 # and remains the same
443 input = {'disk1': nt(100, 100, 20)}
444 self.assertEqual(wrap_numbers(input, 'disk_io'),
445 {'disk1': nt(100, 100, 210)})
446 # now wrap another num
447 input = {'disk1': nt(50, 100, 20)}
448 self.assertEqual(wrap_numbers(input, 'disk_io'),
449 {'disk1': nt(150, 100, 210)})
450 # and again
451 input = {'disk1': nt(40, 100, 20)}
452 self.assertEqual(wrap_numbers(input, 'disk_io'),
453 {'disk1': nt(190, 100, 210)})
454 # keep it the same
455 input = {'disk1': nt(40, 100, 20)}
456 self.assertEqual(wrap_numbers(input, 'disk_io'),
457 {'disk1': nt(190, 100, 210)})
458
459 def test_changing_keys(self):
460 # Emulate a case where the second call to disk_io()
461 # (or whatever) provides a new disk, then the new disk
462 # disappears on the third call.
463 input = {'disk1': nt(5, 5, 5)}
464 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
465 input = {'disk1': nt(5, 5, 5),
466 'disk2': nt(7, 7, 7)}
467 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
468 input = {'disk1': nt(8, 8, 8)}
469 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
470
471 def test_changing_keys_w_wrap(self):
472 input = {'disk1': nt(50, 50, 50),
473 'disk2': nt(100, 100, 100)}
474 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
475 # disk 2 wraps
476 input = {'disk1': nt(50, 50, 50),
477 'disk2': nt(100, 100, 10)}
478 self.assertEqual(wrap_numbers(input, 'disk_io'),
479 {'disk1': nt(50, 50, 50),
480 'disk2': nt(100, 100, 110)})
481 # disk 2 disappears
482 input = {'disk1': nt(50, 50, 50)}
483 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
484
485 # then it appears again; the old wrap is supposed to be
486 # gone.
487 input = {'disk1': nt(50, 50, 50),
488 'disk2': nt(100, 100, 100)}
489 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
490 # remains the same
491 input = {'disk1': nt(50, 50, 50),
492 'disk2': nt(100, 100, 100)}
493 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
494 # and then wraps again
495 input = {'disk1': nt(50, 50, 50),
496 'disk2': nt(100, 100, 10)}
497 self.assertEqual(wrap_numbers(input, 'disk_io'),
498 {'disk1': nt(50, 50, 50),
499 'disk2': nt(100, 100, 110)})
500
501 def test_real_data(self):
502 d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
503 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
504 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
505 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
506 self.assertEqual(wrap_numbers(d, 'disk_io'), d)
507 self.assertEqual(wrap_numbers(d, 'disk_io'), d)
508 # decrease this ↓
509 d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
510 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
511 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
512 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
513 out = wrap_numbers(d, 'disk_io')
514 self.assertEqual(out['nvme0n1'][0], 400)
515
516 # --- cache tests
517
518 def test_cache_first_call(self):
519 input = {'disk1': nt(5, 5, 5)}
520 wrap_numbers(input, 'disk_io')
521 cache = wrap_numbers.cache_info()
522 self.assertEqual(cache[0], {'disk_io': input})
523 self.assertEqual(cache[1], {'disk_io': {}})
524 self.assertEqual(cache[2], {'disk_io': {}})
525
526 def test_cache_call_twice(self):
527 input = {'disk1': nt(5, 5, 5)}
528 wrap_numbers(input, 'disk_io')
529 input = {'disk1': nt(10, 10, 10)}
530 wrap_numbers(input, 'disk_io')
531 cache = wrap_numbers.cache_info()
532 self.assertEqual(cache[0], {'disk_io': input})
533 self.assertEqual(
534 cache[1],
535 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
536 self.assertEqual(cache[2], {'disk_io': {}})
537
538 def test_cache_wrap(self):
539 # let's say 100 is the threshold
540 input = {'disk1': nt(100, 100, 100)}
541 wrap_numbers(input, 'disk_io')
542
543 # first wrap restarts from 10
544 input = {'disk1': nt(100, 100, 10)}
545 wrap_numbers(input, 'disk_io')
546 cache = wrap_numbers.cache_info()
547 self.assertEqual(cache[0], {'disk_io': input})
548 self.assertEqual(
549 cache[1],
550 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}})
551 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
552
553 def assert_():
554 cache = wrap_numbers.cache_info()
555 self.assertEqual(
556 cache[1],
557 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0,
558 ('disk1', 2): 100}})
559 self.assertEqual(cache[2],
560 {'disk_io': {'disk1': set([('disk1', 2)])}})
561
562 # then it remains the same
563 input = {'disk1': nt(100, 100, 10)}
564 wrap_numbers(input, 'disk_io')
565 cache = wrap_numbers.cache_info()
566 self.assertEqual(cache[0], {'disk_io': input})
567 assert_()
568
569 # then it goes up
570 input = {'disk1': nt(100, 100, 90)}
571 wrap_numbers(input, 'disk_io')
572 cache = wrap_numbers.cache_info()
573 self.assertEqual(cache[0], {'disk_io': input})
574 assert_()
575
576 # then it wraps again
577 input = {'disk1': nt(100, 100, 20)}
578 wrap_numbers(input, 'disk_io')
579 cache = wrap_numbers.cache_info()
580 self.assertEqual(cache[0], {'disk_io': input})
581 self.assertEqual(
582 cache[1],
583 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}})
584 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
585
586 def test_cache_changing_keys(self):
587 input = {'disk1': nt(5, 5, 5)}
588 wrap_numbers(input, 'disk_io')
589 input = {'disk1': nt(5, 5, 5),
590 'disk2': nt(7, 7, 7)}
591 wrap_numbers(input, 'disk_io')
592 cache = wrap_numbers.cache_info()
593 self.assertEqual(cache[0], {'disk_io': input})
594 self.assertEqual(
595 cache[1],
596 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
597 self.assertEqual(cache[2], {'disk_io': {}})
598
599 def test_cache_clear(self):
600 input = {'disk1': nt(5, 5, 5)}
601 wrap_numbers(input, 'disk_io')
602 wrap_numbers(input, 'disk_io')
603 wrap_numbers.cache_clear('disk_io')
604 self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {}))
605 wrap_numbers.cache_clear('disk_io')
606 wrap_numbers.cache_clear('?!?')
607
608 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
609 def test_cache_clear_public_apis(self):
610 if not psutil.disk_io_counters() or not psutil.net_io_counters():
611 return self.skipTest("no disks or NICs available")
612 psutil.disk_io_counters()
613 psutil.net_io_counters()
614 caches = wrap_numbers.cache_info()
615 for cache in caches:
616 self.assertIn('psutil.disk_io_counters', cache)
617 self.assertIn('psutil.net_io_counters', cache)
618
619 psutil.disk_io_counters.cache_clear()
620 caches = wrap_numbers.cache_info()
621 for cache in caches:
622 self.assertIn('psutil.net_io_counters', cache)
623 self.assertNotIn('psutil.disk_io_counters', cache)
624
625 psutil.net_io_counters.cache_clear()
626 caches = wrap_numbers.cache_info()
627 self.assertEqual(caches, ({}, {}, {}))
628
629
630 # ===================================================================
631 # --- Example script tests
632 # ===================================================================
633
634
635 @unittest.skipIf(not os.path.exists(SCRIPTS_DIR),
636 "can't locate scripts directory")
637 class TestScripts(PsutilTestCase):
638 """Tests for scripts in the "scripts" directory."""
639
640 @staticmethod
641 def assert_stdout(exe, *args, **kwargs):
642 exe = '%s' % os.path.join(SCRIPTS_DIR, exe)
643 cmd = [PYTHON_EXE, exe]
644 for arg in args:
645 cmd.append(arg)
646 try:
647 out = sh(cmd, **kwargs).strip()
648 except RuntimeError as err:
649 if 'AccessDenied' in str(err):
650 return str(err)
651 else:
652 raise
653 assert out, out
654 return out
655
656 @staticmethod
657 def assert_syntax(exe, args=None):
658 exe = os.path.join(SCRIPTS_DIR, exe)
659 if PY3:
660 f = open(exe, 'rt', encoding='utf8')
661 else:
662 f = open(exe, 'rt')
663 with f:
664 src = f.read()
665 ast.parse(src)
666
667 def test_coverage(self):
668 # make sure all example scripts have a test method defined
669 meths = dir(self)
670 for name in os.listdir(SCRIPTS_DIR):
671 if name.endswith('.py'):
672 if 'test_' + os.path.splitext(name)[0] not in meths:
673 # self.assert_stdout(name)
674 self.fail('no test defined for %r script'
675 % os.path.join(SCRIPTS_DIR, name))
676
677 @unittest.skipIf(not POSIX, "POSIX only")
678 def test_executable(self):
679 for name in os.listdir(SCRIPTS_DIR):
680 if name.endswith('.py'):
681 path = os.path.join(SCRIPTS_DIR, name)
682 if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
683 self.fail('%r is not executable' % path)
684
685 def test_disk_usage(self):
686 self.assert_stdout('disk_usage.py')
687
688 def test_free(self):
689 self.assert_stdout('free.py')
690
691 def test_meminfo(self):
692 self.assert_stdout('meminfo.py')
693
694 def test_procinfo(self):
695 self.assert_stdout('procinfo.py', str(os.getpid()))
696
697 @unittest.skipIf(CI_TESTING and not psutil.users(), "no users")
698 def test_who(self):
699 self.assert_stdout('who.py')
700
701 def test_ps(self):
702 self.assert_stdout('ps.py')
703
704 def test_pstree(self):
705 self.assert_stdout('pstree.py')
706
707 def test_netstat(self):
708 self.assert_stdout('netstat.py')
709
710 # permission denied on travis
711 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
712 def test_ifconfig(self):
713 self.assert_stdout('ifconfig.py')
714
715 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
716 def test_pmap(self):
717 self.assert_stdout('pmap.py', str(os.getpid()))
718
719 def test_procsmem(self):
720 if 'uss' not in psutil.Process().memory_full_info()._fields:
721 raise self.skipTest("not supported")
722 self.assert_stdout('procsmem.py', stderr=DEVNULL)
723
724 def test_killall(self):
725 self.assert_syntax('killall.py')
726
727 def test_nettop(self):
728 self.assert_syntax('nettop.py')
729
730 def test_top(self):
731 self.assert_syntax('top.py')
732
733 def test_iotop(self):
734 self.assert_syntax('iotop.py')
735
736 def test_pidof(self):
737 output = self.assert_stdout('pidof.py', psutil.Process().name())
738 self.assertIn(str(os.getpid()), output)
739
740 @unittest.skipIf(not WINDOWS, "WINDOWS only")
741 def test_winservices(self):
742 self.assert_stdout('winservices.py')
743
744 def test_cpu_distribution(self):
745 self.assert_syntax('cpu_distribution.py')
746
747 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
748 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
749 def test_temperatures(self):
750 if not psutil.sensors_temperatures():
751 self.skipTest("no temperatures")
752 self.assert_stdout('temperatures.py')
753
754 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
755 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
756 def test_fans(self):
757 if not psutil.sensors_fans():
758 self.skipTest("no fans")
759 self.assert_stdout('fans.py')
760
761 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
762 @unittest.skipIf(not HAS_BATTERY, "no battery")
763 def test_battery(self):
764 self.assert_stdout('battery.py')
765
766 def test_sensors(self):
767 self.assert_stdout('sensors.py')
768
769
770 if __name__ == '__main__':
771 from psutil.tests.runner import run_from_name
772 run_from_name(__file__)