comparison env/lib/python3.7/site-packages/psutil/tests/test_process.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """Tests for psutil.Process class."""
8
9 import collections
10 import errno
11 import getpass
12 import itertools
13 import os
14 import signal
15 import socket
16 import subprocess
17 import sys
18 import tempfile
19 import textwrap
20 import time
21 import types
22
23 import psutil
24
25 from psutil import AIX
26 from psutil import BSD
27 from psutil import LINUX
28 from psutil import MACOS
29 from psutil import NETBSD
30 from psutil import OPENBSD
31 from psutil import OSX
32 from psutil import POSIX
33 from psutil import SUNOS
34 from psutil import WINDOWS
35 from psutil._common import open_text
36 from psutil._compat import long
37 from psutil._compat import PY3
38 from psutil.tests import APPVEYOR
39 from psutil.tests import call_until
40 from psutil.tests import CIRRUS
41 from psutil.tests import copyload_shared_lib
42 from psutil.tests import create_exe
43 from psutil.tests import create_proc_children_pair
44 from psutil.tests import create_zombie_proc
45 from psutil.tests import enum
46 from psutil.tests import get_test_subprocess
47 from psutil.tests import HAS_CPU_AFFINITY
48 from psutil.tests import HAS_ENVIRON
49 from psutil.tests import HAS_IONICE
50 from psutil.tests import HAS_MEMORY_MAPS
51 from psutil.tests import HAS_PROC_CPU_NUM
52 from psutil.tests import HAS_PROC_IO_COUNTERS
53 from psutil.tests import HAS_RLIMIT
54 from psutil.tests import HAS_THREADS
55 from psutil.tests import mock
56 from psutil.tests import PYPY
57 from psutil.tests import PYTHON_EXE
58 from psutil.tests import reap_children
59 from psutil.tests import retry_on_failure
60 from psutil.tests import safe_rmpath
61 from psutil.tests import sh
62 from psutil.tests import skip_on_access_denied
63 from psutil.tests import skip_on_not_implemented
64 from psutil.tests import TESTFILE_PREFIX
65 from psutil.tests import TESTFN
66 from psutil.tests import ThreadTask
67 from psutil.tests import TRAVIS
68 from psutil.tests import unittest
69 from psutil.tests import wait_for_pid
70
71
72 # ===================================================================
73 # --- psutil.Process class tests
74 # ===================================================================
75
76 class TestProcess(unittest.TestCase):
77 """Tests for psutil.Process class."""
78
79 def setUp(self):
80 safe_rmpath(TESTFN)
81
82 def tearDown(self):
83 reap_children()
84
85 def test_pid(self):
86 p = psutil.Process()
87 self.assertEqual(p.pid, os.getpid())
88 sproc = get_test_subprocess()
89 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
90 with self.assertRaises(AttributeError):
91 p.pid = 33
92
93 def test_kill(self):
94 sproc = get_test_subprocess()
95 test_pid = sproc.pid
96 p = psutil.Process(test_pid)
97 p.kill()
98 sig = p.wait()
99 self.assertFalse(psutil.pid_exists(test_pid))
100 if POSIX:
101 self.assertEqual(sig, -signal.SIGKILL)
102
103 def test_terminate(self):
104 sproc = get_test_subprocess()
105 test_pid = sproc.pid
106 p = psutil.Process(test_pid)
107 p.terminate()
108 sig = p.wait()
109 self.assertFalse(psutil.pid_exists(test_pid))
110 if POSIX:
111 self.assertEqual(sig, -signal.SIGTERM)
112
113 def test_send_signal(self):
114 sig = signal.SIGKILL if POSIX else signal.SIGTERM
115 sproc = get_test_subprocess()
116 p = psutil.Process(sproc.pid)
117 p.send_signal(sig)
118 exit_sig = p.wait()
119 self.assertFalse(psutil.pid_exists(p.pid))
120 if POSIX:
121 self.assertEqual(exit_sig, -sig)
122 #
123 sproc = get_test_subprocess()
124 p = psutil.Process(sproc.pid)
125 p.send_signal(sig)
126 with mock.patch('psutil.os.kill',
127 side_effect=OSError(errno.ESRCH, "")):
128 with self.assertRaises(psutil.NoSuchProcess):
129 p.send_signal(sig)
130 #
131 sproc = get_test_subprocess()
132 p = psutil.Process(sproc.pid)
133 p.send_signal(sig)
134 with mock.patch('psutil.os.kill',
135 side_effect=OSError(errno.EPERM, "")):
136 with self.assertRaises(psutil.AccessDenied):
137 psutil.Process().send_signal(sig)
138 # Sending a signal to process with PID 0 is not allowed as
139 # it would affect every process in the process group of
140 # the calling process (os.getpid()) instead of PID 0").
141 if 0 in psutil.pids():
142 p = psutil.Process(0)
143 self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
144
145 def test_wait(self):
146 # check exit code signal
147 sproc = get_test_subprocess()
148 p = psutil.Process(sproc.pid)
149 p.kill()
150 code = p.wait()
151 if POSIX:
152 self.assertEqual(code, -signal.SIGKILL)
153 else:
154 self.assertEqual(code, signal.SIGTERM)
155 self.assertFalse(p.is_running())
156
157 sproc = get_test_subprocess()
158 p = psutil.Process(sproc.pid)
159 p.terminate()
160 code = p.wait()
161 if POSIX:
162 self.assertEqual(code, -signal.SIGTERM)
163 else:
164 self.assertEqual(code, signal.SIGTERM)
165 self.assertFalse(p.is_running())
166
167 # check sys.exit() code
168 code = "import time, sys; time.sleep(0.01); sys.exit(5);"
169 sproc = get_test_subprocess([PYTHON_EXE, "-c", code])
170 p = psutil.Process(sproc.pid)
171 self.assertEqual(p.wait(), 5)
172 self.assertFalse(p.is_running())
173
174 # Test wait() issued twice.
175 # It is not supposed to raise NSP when the process is gone.
176 # On UNIX this should return None, on Windows it should keep
177 # returning the exit code.
178 sproc = get_test_subprocess([PYTHON_EXE, "-c", code])
179 p = psutil.Process(sproc.pid)
180 self.assertEqual(p.wait(), 5)
181 self.assertIn(p.wait(), (5, None))
182
183 # test timeout
184 sproc = get_test_subprocess()
185 p = psutil.Process(sproc.pid)
186 p.name()
187 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
188
189 # timeout < 0 not allowed
190 self.assertRaises(ValueError, p.wait, -1)
191
192 def test_wait_non_children(self):
193 # Test wait() against a process which is not our direct
194 # child.
195 p1, p2 = create_proc_children_pair()
196 self.assertRaises(psutil.TimeoutExpired, p1.wait, 0.01)
197 self.assertRaises(psutil.TimeoutExpired, p2.wait, 0.01)
198 # We also terminate the direct child otherwise the
199 # grandchild will hang until the parent is gone.
200 p1.terminate()
201 p2.terminate()
202 ret1 = p1.wait()
203 ret2 = p2.wait()
204 if POSIX:
205 self.assertEqual(ret1, -signal.SIGTERM)
206 # For processes which are not our children we're supposed
207 # to get None.
208 self.assertEqual(ret2, None)
209 else:
210 self.assertEqual(ret1, signal.SIGTERM)
211 self.assertEqual(ret1, signal.SIGTERM)
212
213 def test_wait_timeout_0(self):
214 sproc = get_test_subprocess()
215 p = psutil.Process(sproc.pid)
216 self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
217 p.kill()
218 stop_at = time.time() + 2
219 while True:
220 try:
221 code = p.wait(0)
222 except psutil.TimeoutExpired:
223 if time.time() >= stop_at:
224 raise
225 else:
226 break
227 if POSIX:
228 self.assertEqual(code, -signal.SIGKILL)
229 else:
230 self.assertEqual(code, signal.SIGTERM)
231 self.assertFalse(p.is_running())
232
233 def test_cpu_percent(self):
234 p = psutil.Process()
235 p.cpu_percent(interval=0.001)
236 p.cpu_percent(interval=0.001)
237 for x in range(100):
238 percent = p.cpu_percent(interval=None)
239 self.assertIsInstance(percent, float)
240 self.assertGreaterEqual(percent, 0.0)
241 with self.assertRaises(ValueError):
242 p.cpu_percent(interval=-1)
243
244 def test_cpu_percent_numcpus_none(self):
245 # See: https://github.com/giampaolo/psutil/issues/1087
246 with mock.patch('psutil.cpu_count', return_value=None) as m:
247 psutil.Process().cpu_percent()
248 assert m.called
249
250 def test_cpu_times(self):
251 times = psutil.Process().cpu_times()
252 assert (times.user > 0.0) or (times.system > 0.0), times
253 assert (times.children_user >= 0.0), times
254 assert (times.children_system >= 0.0), times
255 if LINUX:
256 assert times.iowait >= 0.0, times
257 # make sure returned values can be pretty printed with strftime
258 for name in times._fields:
259 time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))
260
261 def test_cpu_times_2(self):
262 user_time, kernel_time = psutil.Process().cpu_times()[:2]
263 utime, ktime = os.times()[:2]
264
265 # Use os.times()[:2] as base values to compare our results
266 # using a tolerance of +/- 0.1 seconds.
267 # It will fail if the difference between the values is > 0.1s.
268 if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
269 self.fail("expected: %s, found: %s" % (utime, user_time))
270
271 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
272 self.fail("expected: %s, found: %s" % (ktime, kernel_time))
273
274 @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported")
275 def test_cpu_num(self):
276 p = psutil.Process()
277 num = p.cpu_num()
278 self.assertGreaterEqual(num, 0)
279 if psutil.cpu_count() == 1:
280 self.assertEqual(num, 0)
281 self.assertIn(p.cpu_num(), range(psutil.cpu_count()))
282
283 def test_create_time(self):
284 sproc = get_test_subprocess()
285 now = time.time()
286 p = psutil.Process(sproc.pid)
287 create_time = p.create_time()
288
289 # Use time.time() as base value to compare our result using a
290 # tolerance of +/- 1 second.
291 # It will fail if the difference between the values is > 2s.
292 difference = abs(create_time - now)
293 if difference > 2:
294 self.fail("expected: %s, found: %s, difference: %s"
295 % (now, create_time, difference))
296
297 # make sure returned value can be pretty printed with strftime
298 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time()))
299
300 @unittest.skipIf(not POSIX, 'POSIX only')
301 @unittest.skipIf(TRAVIS or CIRRUS, 'not reliable on TRAVIS/CIRRUS')
302 def test_terminal(self):
303 terminal = psutil.Process().terminal()
304 if sys.stdin.isatty() or sys.stdout.isatty():
305 tty = os.path.realpath(sh('tty'))
306 self.assertEqual(terminal, tty)
307 else:
308 self.assertIsNone(terminal)
309
310 @unittest.skipIf(not HAS_PROC_IO_COUNTERS, 'not supported')
311 @skip_on_not_implemented(only_if=LINUX)
312 def test_io_counters(self):
313 p = psutil.Process()
314 # test reads
315 io1 = p.io_counters()
316 with open(PYTHON_EXE, 'rb') as f:
317 f.read()
318 io2 = p.io_counters()
319 if not BSD and not AIX:
320 self.assertGreater(io2.read_count, io1.read_count)
321 self.assertEqual(io2.write_count, io1.write_count)
322 if LINUX:
323 self.assertGreater(io2.read_chars, io1.read_chars)
324 self.assertEqual(io2.write_chars, io1.write_chars)
325 else:
326 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
327 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
328
329 # test writes
330 io1 = p.io_counters()
331 with tempfile.TemporaryFile(prefix=TESTFILE_PREFIX) as f:
332 if PY3:
333 f.write(bytes("x" * 1000000, 'ascii'))
334 else:
335 f.write("x" * 1000000)
336 io2 = p.io_counters()
337 self.assertGreaterEqual(io2.write_count, io1.write_count)
338 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
339 self.assertGreaterEqual(io2.read_count, io1.read_count)
340 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
341 if LINUX:
342 self.assertGreater(io2.write_chars, io1.write_chars)
343 self.assertGreaterEqual(io2.read_chars, io1.read_chars)
344
345 # sanity check
346 for i in range(len(io2)):
347 if BSD and i >= 2:
348 # On BSD read_bytes and write_bytes are always set to -1.
349 continue
350 self.assertGreaterEqual(io2[i], 0)
351 self.assertGreaterEqual(io2[i], 0)
352
353 @unittest.skipIf(not HAS_IONICE, "not supported")
354 @unittest.skipIf(not LINUX, "linux only")
355 def test_ionice_linux(self):
356 p = psutil.Process()
357 self.assertEqual(p.ionice()[0], psutil.IOPRIO_CLASS_NONE)
358 self.assertEqual(psutil.IOPRIO_CLASS_NONE, 0)
359 self.assertEqual(psutil.IOPRIO_CLASS_RT, 1) # high
360 self.assertEqual(psutil.IOPRIO_CLASS_BE, 2) # normal
361 self.assertEqual(psutil.IOPRIO_CLASS_IDLE, 3) # low
362 try:
363 # low
364 p.ionice(psutil.IOPRIO_CLASS_IDLE)
365 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_IDLE, 0))
366 with self.assertRaises(ValueError): # accepts no value
367 p.ionice(psutil.IOPRIO_CLASS_IDLE, value=7)
368 # normal
369 p.ionice(psutil.IOPRIO_CLASS_BE)
370 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 0))
371 p.ionice(psutil.IOPRIO_CLASS_BE, value=7)
372 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 7))
373 with self.assertRaises(ValueError):
374 p.ionice(psutil.IOPRIO_CLASS_BE, value=8)
375 # high
376 if os.getuid() == 0: # root
377 p.ionice(psutil.IOPRIO_CLASS_RT)
378 self.assertEqual(tuple(p.ionice()),
379 (psutil.IOPRIO_CLASS_RT, 0))
380 p.ionice(psutil.IOPRIO_CLASS_RT, value=7)
381 self.assertEqual(tuple(p.ionice()),
382 (psutil.IOPRIO_CLASS_RT, 7))
383 with self.assertRaises(ValueError):
384 p.ionice(psutil.IOPRIO_CLASS_IDLE, value=8)
385 # errs
386 self.assertRaisesRegex(
387 ValueError, "ioclass accepts no value",
388 p.ionice, psutil.IOPRIO_CLASS_NONE, 1)
389 self.assertRaisesRegex(
390 ValueError, "ioclass accepts no value",
391 p.ionice, psutil.IOPRIO_CLASS_IDLE, 1)
392 self.assertRaisesRegex(
393 ValueError, "'ioclass' argument must be specified",
394 p.ionice, value=1)
395 finally:
396 p.ionice(psutil.IOPRIO_CLASS_BE)
397
398 @unittest.skipIf(not HAS_IONICE, "not supported")
399 @unittest.skipIf(not WINDOWS, 'not supported on this win version')
400 def test_ionice_win(self):
401 p = psutil.Process()
402 self.assertEqual(p.ionice(), psutil.IOPRIO_NORMAL)
403 try:
404 # base
405 p.ionice(psutil.IOPRIO_VERYLOW)
406 self.assertEqual(p.ionice(), psutil.IOPRIO_VERYLOW)
407 p.ionice(psutil.IOPRIO_LOW)
408 self.assertEqual(p.ionice(), psutil.IOPRIO_LOW)
409 try:
410 p.ionice(psutil.IOPRIO_HIGH)
411 except psutil.AccessDenied:
412 pass
413 else:
414 self.assertEqual(p.ionice(), psutil.IOPRIO_HIGH)
415 # errs
416 self.assertRaisesRegex(
417 TypeError, "value argument not accepted on Windows",
418 p.ionice, psutil.IOPRIO_NORMAL, value=1)
419 self.assertRaisesRegex(
420 ValueError, "is not a valid priority",
421 p.ionice, psutil.IOPRIO_HIGH + 1)
422 finally:
423 p.ionice(psutil.IOPRIO_NORMAL)
424 self.assertEqual(p.ionice(), psutil.IOPRIO_NORMAL)
425
426 @unittest.skipIf(not HAS_RLIMIT, "not supported")
427 def test_rlimit_get(self):
428 import resource
429 p = psutil.Process(os.getpid())
430 names = [x for x in dir(psutil) if x.startswith('RLIMIT')]
431 assert names, names
432 for name in names:
433 value = getattr(psutil, name)
434 self.assertGreaterEqual(value, 0)
435 if name in dir(resource):
436 self.assertEqual(value, getattr(resource, name))
437 # XXX - On PyPy RLIMIT_INFINITY returned by
438 # resource.getrlimit() is reported as a very big long
439 # number instead of -1. It looks like a bug with PyPy.
440 if PYPY:
441 continue
442 self.assertEqual(p.rlimit(value), resource.getrlimit(value))
443 else:
444 ret = p.rlimit(value)
445 self.assertEqual(len(ret), 2)
446 self.assertGreaterEqual(ret[0], -1)
447 self.assertGreaterEqual(ret[1], -1)
448
449 @unittest.skipIf(not HAS_RLIMIT, "not supported")
450 def test_rlimit_set(self):
451 sproc = get_test_subprocess()
452 p = psutil.Process(sproc.pid)
453 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5))
454 self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5))
455 # If pid is 0 prlimit() applies to the calling process and
456 # we don't want that.
457 with self.assertRaises(ValueError):
458 psutil._psplatform.Process(0).rlimit(0)
459 with self.assertRaises(ValueError):
460 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5))
461
462 @unittest.skipIf(not HAS_RLIMIT, "not supported")
463 def test_rlimit(self):
464 p = psutil.Process()
465 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
466 try:
467 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
468 with open(TESTFN, "wb") as f:
469 f.write(b"X" * 1024)
470 # write() or flush() doesn't always cause the exception
471 # but close() will.
472 with self.assertRaises(IOError) as exc:
473 with open(TESTFN, "wb") as f:
474 f.write(b"X" * 1025)
475 self.assertEqual(exc.exception.errno if PY3 else exc.exception[0],
476 errno.EFBIG)
477 finally:
478 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
479 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard))
480
481 @unittest.skipIf(not HAS_RLIMIT, "not supported")
482 def test_rlimit_infinity(self):
483 # First set a limit, then re-set it by specifying INFINITY
484 # and assume we overridden the previous limit.
485 p = psutil.Process()
486 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
487 try:
488 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
489 p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard))
490 with open(TESTFN, "wb") as f:
491 f.write(b"X" * 2048)
492 finally:
493 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
494 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard))
495
496 @unittest.skipIf(not HAS_RLIMIT, "not supported")
497 def test_rlimit_infinity_value(self):
498 # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really
499 # big number on a platform with large file support. On these
500 # platforms we need to test that the get/setrlimit functions
501 # properly convert the number to a C long long and that the
502 # conversion doesn't raise an error.
503 p = psutil.Process()
504 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
505 self.assertEqual(psutil.RLIM_INFINITY, hard)
506 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
507
508 def test_num_threads(self):
509 # on certain platforms such as Linux we might test for exact
510 # thread number, since we always have with 1 thread per process,
511 # but this does not apply across all platforms (MACOS, Windows)
512 p = psutil.Process()
513 if OPENBSD:
514 try:
515 step1 = p.num_threads()
516 except psutil.AccessDenied:
517 raise unittest.SkipTest("on OpenBSD this requires root access")
518 else:
519 step1 = p.num_threads()
520
521 with ThreadTask():
522 step2 = p.num_threads()
523 self.assertEqual(step2, step1 + 1)
524
525 @unittest.skipIf(not WINDOWS, 'WINDOWS only')
526 def test_num_handles(self):
527 # a better test is done later into test/_windows.py
528 p = psutil.Process()
529 self.assertGreater(p.num_handles(), 0)
530
531 @unittest.skipIf(not HAS_THREADS, 'not supported')
532 def test_threads(self):
533 p = psutil.Process()
534 if OPENBSD:
535 try:
536 step1 = p.threads()
537 except psutil.AccessDenied:
538 raise unittest.SkipTest("on OpenBSD this requires root access")
539 else:
540 step1 = p.threads()
541
542 with ThreadTask():
543 step2 = p.threads()
544 self.assertEqual(len(step2), len(step1) + 1)
545 athread = step2[0]
546 # test named tuple
547 self.assertEqual(athread.id, athread[0])
548 self.assertEqual(athread.user_time, athread[1])
549 self.assertEqual(athread.system_time, athread[2])
550
551 @retry_on_failure()
552 @skip_on_access_denied(only_if=MACOS)
553 @unittest.skipIf(not HAS_THREADS, 'not supported')
554 def test_threads_2(self):
555 sproc = get_test_subprocess()
556 p = psutil.Process(sproc.pid)
557 if OPENBSD:
558 try:
559 p.threads()
560 except psutil.AccessDenied:
561 raise unittest.SkipTest(
562 "on OpenBSD this requires root access")
563 self.assertAlmostEqual(
564 p.cpu_times().user,
565 sum([x.user_time for x in p.threads()]), delta=0.1)
566 self.assertAlmostEqual(
567 p.cpu_times().system,
568 sum([x.system_time for x in p.threads()]), delta=0.1)
569
570 def test_memory_info(self):
571 p = psutil.Process()
572
573 # step 1 - get a base value to compare our results
574 rss1, vms1 = p.memory_info()[:2]
575 percent1 = p.memory_percent()
576 self.assertGreater(rss1, 0)
577 self.assertGreater(vms1, 0)
578
579 # step 2 - allocate some memory
580 memarr = [None] * 1500000
581
582 rss2, vms2 = p.memory_info()[:2]
583 percent2 = p.memory_percent()
584
585 # step 3 - make sure that the memory usage bumped up
586 self.assertGreater(rss2, rss1)
587 self.assertGreaterEqual(vms2, vms1) # vms might be equal
588 self.assertGreater(percent2, percent1)
589 del memarr
590
591 if WINDOWS:
592 mem = p.memory_info()
593 self.assertEqual(mem.rss, mem.wset)
594 self.assertEqual(mem.vms, mem.pagefile)
595
596 mem = p.memory_info()
597 for name in mem._fields:
598 self.assertGreaterEqual(getattr(mem, name), 0)
599
600 def test_memory_full_info(self):
601 total = psutil.virtual_memory().total
602 mem = psutil.Process().memory_full_info()
603 for name in mem._fields:
604 value = getattr(mem, name)
605 self.assertGreaterEqual(value, 0, msg=(name, value))
606 if name == 'vms' and OSX or LINUX:
607 continue
608 self.assertLessEqual(value, total, msg=(name, value, total))
609 if LINUX or WINDOWS or MACOS:
610 self.assertGreaterEqual(mem.uss, 0)
611 if LINUX:
612 self.assertGreaterEqual(mem.pss, 0)
613 self.assertGreaterEqual(mem.swap, 0)
614
615 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
616 def test_memory_maps(self):
617 p = psutil.Process()
618 maps = p.memory_maps()
619 paths = [x for x in maps]
620 self.assertEqual(len(paths), len(set(paths)))
621 ext_maps = p.memory_maps(grouped=False)
622
623 for nt in maps:
624 if not nt.path.startswith('['):
625 assert os.path.isabs(nt.path), nt.path
626 if POSIX:
627 try:
628 assert os.path.exists(nt.path) or \
629 os.path.islink(nt.path), nt.path
630 except AssertionError:
631 if not LINUX:
632 raise
633 else:
634 # https://github.com/giampaolo/psutil/issues/759
635 with open_text('/proc/self/smaps') as f:
636 data = f.read()
637 if "%s (deleted)" % nt.path not in data:
638 raise
639 else:
640 # XXX - On Windows we have this strange behavior with
641 # 64 bit dlls: they are visible via explorer but cannot
642 # be accessed via os.stat() (wtf?).
643 if '64' not in os.path.basename(nt.path):
644 assert os.path.exists(nt.path), nt.path
645 for nt in ext_maps:
646 for fname in nt._fields:
647 value = getattr(nt, fname)
648 if fname == 'path':
649 continue
650 elif fname in ('addr', 'perms'):
651 assert value, value
652 else:
653 self.assertIsInstance(value, (int, long))
654 assert value >= 0, value
655
656 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
657 def test_memory_maps_lists_lib(self):
658 # Make sure a newly loaded shared lib is listed.
659 with copyload_shared_lib() as path:
660 def normpath(p):
661 return os.path.realpath(os.path.normcase(p))
662 libpaths = [normpath(x.path)
663 for x in psutil.Process().memory_maps()]
664 self.assertIn(normpath(path), libpaths)
665
666 def test_memory_percent(self):
667 p = psutil.Process()
668 p.memory_percent()
669 self.assertRaises(ValueError, p.memory_percent, memtype="?!?")
670 if LINUX or MACOS or WINDOWS:
671 p.memory_percent(memtype='uss')
672
673 def test_is_running(self):
674 sproc = get_test_subprocess()
675 p = psutil.Process(sproc.pid)
676 assert p.is_running()
677 assert p.is_running()
678 p.kill()
679 p.wait()
680 assert not p.is_running()
681 assert not p.is_running()
682
683 def test_exe(self):
684 sproc = get_test_subprocess()
685 exe = psutil.Process(sproc.pid).exe()
686 try:
687 self.assertEqual(exe, PYTHON_EXE)
688 except AssertionError:
689 if WINDOWS and len(exe) == len(PYTHON_EXE):
690 # on Windows we don't care about case sensitivity
691 normcase = os.path.normcase
692 self.assertEqual(normcase(exe), normcase(PYTHON_EXE))
693 else:
694 # certain platforms such as BSD are more accurate returning:
695 # "/usr/local/bin/python2.7"
696 # ...instead of:
697 # "/usr/local/bin/python"
698 # We do not want to consider this difference in accuracy
699 # an error.
700 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
701 try:
702 self.assertEqual(exe.replace(ver, ''),
703 PYTHON_EXE.replace(ver, ''))
704 except AssertionError:
705 # Tipically MACOS. Really not sure what to do here.
706 pass
707
708 out = sh([exe, "-c", "import os; print('hey')"])
709 self.assertEqual(out, 'hey')
710
711 def test_cmdline(self):
712 cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"]
713 sproc = get_test_subprocess(cmdline)
714 try:
715 self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()),
716 ' '.join(cmdline))
717 except AssertionError:
718 # XXX - most of the times the underlying sysctl() call on Net
719 # and Open BSD returns a truncated string.
720 # Also /proc/pid/cmdline behaves the same so it looks
721 # like this is a kernel bug.
722 # XXX - AIX truncates long arguments in /proc/pid/cmdline
723 if NETBSD or OPENBSD or AIX:
724 self.assertEqual(
725 psutil.Process(sproc.pid).cmdline()[0], PYTHON_EXE)
726 else:
727 raise
728
729 @unittest.skipIf(PYPY, "broken on PYPY")
730 def test_long_cmdline(self):
731 create_exe(TESTFN)
732 self.addCleanup(safe_rmpath, TESTFN)
733 cmdline = [TESTFN] + (["0123456789"] * 20)
734 sproc = get_test_subprocess(cmdline)
735 p = psutil.Process(sproc.pid)
736 self.assertEqual(p.cmdline(), cmdline)
737
738 def test_name(self):
739 sproc = get_test_subprocess(PYTHON_EXE)
740 name = psutil.Process(sproc.pid).name().lower()
741 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
742 assert pyexe.startswith(name), (pyexe, name)
743
744 @unittest.skipIf(PYPY, "unreliable on PYPY")
745 def test_long_name(self):
746 long_name = TESTFN + ("0123456789" * 2)
747 create_exe(long_name)
748 self.addCleanup(safe_rmpath, long_name)
749 sproc = get_test_subprocess(long_name)
750 p = psutil.Process(sproc.pid)
751 self.assertEqual(p.name(), os.path.basename(long_name))
752
753 # XXX
754 @unittest.skipIf(SUNOS, "broken on SUNOS")
755 @unittest.skipIf(AIX, "broken on AIX")
756 @unittest.skipIf(PYPY, "broken on PYPY")
757 def test_prog_w_funky_name(self):
758 # Test that name(), exe() and cmdline() correctly handle programs
759 # with funky chars such as spaces and ")", see:
760 # https://github.com/giampaolo/psutil/issues/628
761
762 def rm():
763 # Try to limit occasional failures on Appveyor:
764 # https://ci.appveyor.com/project/giampaolo/psutil/build/1350/
765 # job/lbo3bkju55le850n
766 try:
767 safe_rmpath(funky_path)
768 except OSError:
769 pass
770
771 funky_path = TESTFN + 'foo bar )'
772 create_exe(funky_path)
773 self.addCleanup(rm)
774 cmdline = [funky_path, "-c",
775 "import time; [time.sleep(0.01) for x in range(3000)];"
776 "arg1", "arg2", "", "arg3", ""]
777 sproc = get_test_subprocess(cmdline)
778 p = psutil.Process(sproc.pid)
779 # ...in order to try to prevent occasional failures on travis
780 if TRAVIS:
781 wait_for_pid(p.pid)
782 self.assertEqual(p.cmdline(), cmdline)
783 self.assertEqual(p.name(), os.path.basename(funky_path))
784 self.assertEqual(os.path.normcase(p.exe()),
785 os.path.normcase(funky_path))
786
787 @unittest.skipIf(not POSIX, 'POSIX only')
788 def test_uids(self):
789 p = psutil.Process()
790 real, effective, saved = p.uids()
791 # os.getuid() refers to "real" uid
792 self.assertEqual(real, os.getuid())
793 # os.geteuid() refers to "effective" uid
794 self.assertEqual(effective, os.geteuid())
795 # No such thing as os.getsuid() ("saved" uid), but starting
796 # from python 2.7 we have os.getresuid() which returns all
797 # of them.
798 if hasattr(os, "getresuid"):
799 self.assertEqual(os.getresuid(), p.uids())
800
801 @unittest.skipIf(not POSIX, 'POSIX only')
802 def test_gids(self):
803 p = psutil.Process()
804 real, effective, saved = p.gids()
805 # os.getuid() refers to "real" uid
806 self.assertEqual(real, os.getgid())
807 # os.geteuid() refers to "effective" uid
808 self.assertEqual(effective, os.getegid())
809 # No such thing as os.getsgid() ("saved" gid), but starting
810 # from python 2.7 we have os.getresgid() which returns all
811 # of them.
812 if hasattr(os, "getresuid"):
813 self.assertEqual(os.getresgid(), p.gids())
814
815 def test_nice(self):
816 p = psutil.Process()
817 self.assertRaises(TypeError, p.nice, "str")
818 if WINDOWS:
819 try:
820 init = p.nice()
821 if sys.version_info > (3, 4):
822 self.assertIsInstance(init, enum.IntEnum)
823 else:
824 self.assertIsInstance(init, int)
825 self.assertEqual(init, psutil.NORMAL_PRIORITY_CLASS)
826 p.nice(psutil.HIGH_PRIORITY_CLASS)
827 self.assertEqual(p.nice(), psutil.HIGH_PRIORITY_CLASS)
828 p.nice(psutil.NORMAL_PRIORITY_CLASS)
829 self.assertEqual(p.nice(), psutil.NORMAL_PRIORITY_CLASS)
830 finally:
831 p.nice(psutil.NORMAL_PRIORITY_CLASS)
832 else:
833 first_nice = p.nice()
834 try:
835 if hasattr(os, "getpriority"):
836 self.assertEqual(
837 os.getpriority(os.PRIO_PROCESS, os.getpid()), p.nice())
838 p.nice(1)
839 self.assertEqual(p.nice(), 1)
840 if hasattr(os, "getpriority"):
841 self.assertEqual(
842 os.getpriority(os.PRIO_PROCESS, os.getpid()), p.nice())
843 # XXX - going back to previous nice value raises
844 # AccessDenied on MACOS
845 if not MACOS:
846 p.nice(0)
847 self.assertEqual(p.nice(), 0)
848 except psutil.AccessDenied:
849 pass
850 finally:
851 try:
852 p.nice(first_nice)
853 except psutil.AccessDenied:
854 pass
855
856 def test_status(self):
857 p = psutil.Process()
858 self.assertEqual(p.status(), psutil.STATUS_RUNNING)
859
860 def test_username(self):
861 sproc = get_test_subprocess()
862 p = psutil.Process(sproc.pid)
863 username = p.username()
864 if WINDOWS:
865 domain, username = username.split('\\')
866 self.assertEqual(username, getpass.getuser())
867 if 'USERDOMAIN' in os.environ:
868 self.assertEqual(domain, os.environ['USERDOMAIN'])
869 else:
870 self.assertEqual(username, getpass.getuser())
871
872 def test_cwd(self):
873 sproc = get_test_subprocess()
874 p = psutil.Process(sproc.pid)
875 self.assertEqual(p.cwd(), os.getcwd())
876
877 def test_cwd_2(self):
878 cmd = [PYTHON_EXE, "-c",
879 "import os, time; os.chdir('..'); time.sleep(60)"]
880 sproc = get_test_subprocess(cmd)
881 p = psutil.Process(sproc.pid)
882 call_until(p.cwd, "ret == os.path.dirname(os.getcwd())")
883
884 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
885 def test_cpu_affinity(self):
886 p = psutil.Process()
887 initial = p.cpu_affinity()
888 assert initial, initial
889 self.addCleanup(p.cpu_affinity, initial)
890
891 if hasattr(os, "sched_getaffinity"):
892 self.assertEqual(initial, list(os.sched_getaffinity(p.pid)))
893 self.assertEqual(len(initial), len(set(initial)))
894
895 all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
896 # Work around travis failure:
897 # https://travis-ci.org/giampaolo/psutil/builds/284173194
898 for n in all_cpus if not TRAVIS else initial:
899 p.cpu_affinity([n])
900 self.assertEqual(p.cpu_affinity(), [n])
901 if hasattr(os, "sched_getaffinity"):
902 self.assertEqual(p.cpu_affinity(),
903 list(os.sched_getaffinity(p.pid)))
904 # also test num_cpu()
905 if hasattr(p, "num_cpu"):
906 self.assertEqual(p.cpu_affinity()[0], p.num_cpu())
907
908 # [] is an alias for "all eligible CPUs"; on Linux this may
909 # not be equal to all available CPUs, see:
910 # https://github.com/giampaolo/psutil/issues/956
911 p.cpu_affinity([])
912 if LINUX:
913 self.assertEqual(p.cpu_affinity(), p._proc._get_eligible_cpus())
914 else:
915 self.assertEqual(p.cpu_affinity(), all_cpus)
916 if hasattr(os, "sched_getaffinity"):
917 self.assertEqual(p.cpu_affinity(),
918 list(os.sched_getaffinity(p.pid)))
919 #
920 self.assertRaises(TypeError, p.cpu_affinity, 1)
921 p.cpu_affinity(initial)
922 # it should work with all iterables, not only lists
923 if not TRAVIS:
924 p.cpu_affinity(set(all_cpus))
925 p.cpu_affinity(tuple(all_cpus))
926
927 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
928 def test_cpu_affinity_errs(self):
929 sproc = get_test_subprocess()
930 p = psutil.Process(sproc.pid)
931 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
932 self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu)
933 self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000))
934 self.assertRaises(TypeError, p.cpu_affinity, [0, "1"])
935 self.assertRaises(ValueError, p.cpu_affinity, [0, -1])
936
937 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
938 def test_cpu_affinity_all_combinations(self):
939 p = psutil.Process()
940 initial = p.cpu_affinity()
941 assert initial, initial
942 self.addCleanup(p.cpu_affinity, initial)
943
944 # All possible CPU set combinations.
945 if len(initial) > 12:
946 initial = initial[:12] # ...otherwise it will take forever
947 combos = []
948 for l in range(0, len(initial) + 1):
949 for subset in itertools.combinations(initial, l):
950 if subset:
951 combos.append(list(subset))
952
953 for combo in combos:
954 p.cpu_affinity(combo)
955 self.assertEqual(p.cpu_affinity(), combo)
956
957 # TODO: #595
958 @unittest.skipIf(BSD, "broken on BSD")
959 # can't find any process file on Appveyor
960 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
961 def test_open_files(self):
962 # current process
963 p = psutil.Process()
964 files = p.open_files()
965 self.assertFalse(TESTFN in files)
966 with open(TESTFN, 'wb') as f:
967 f.write(b'x' * 1024)
968 f.flush()
969 # give the kernel some time to see the new file
970 files = call_until(p.open_files, "len(ret) != %i" % len(files))
971 filenames = [os.path.normcase(x.path) for x in files]
972 self.assertIn(os.path.normcase(TESTFN), filenames)
973 if LINUX:
974 for file in files:
975 if file.path == TESTFN:
976 self.assertEqual(file.position, 1024)
977 for file in files:
978 assert os.path.isfile(file.path), file
979
980 # another process
981 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % TESTFN
982 sproc = get_test_subprocess([PYTHON_EXE, "-c", cmdline])
983 p = psutil.Process(sproc.pid)
984
985 for x in range(100):
986 filenames = [os.path.normcase(x.path) for x in p.open_files()]
987 if TESTFN in filenames:
988 break
989 time.sleep(.01)
990 else:
991 self.assertIn(os.path.normcase(TESTFN), filenames)
992 for file in filenames:
993 assert os.path.isfile(file), file
994
995 # TODO: #595
996 @unittest.skipIf(BSD, "broken on BSD")
997 # can't find any process file on Appveyor
998 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
999 def test_open_files_2(self):
1000 # test fd and path fields
1001 normcase = os.path.normcase
1002 with open(TESTFN, 'w') as fileobj:
1003 p = psutil.Process()
1004 for file in p.open_files():
1005 if normcase(file.path) == normcase(fileobj.name) or \
1006 file.fd == fileobj.fileno():
1007 break
1008 else:
1009 self.fail("no file found; files=%s" % repr(p.open_files()))
1010 self.assertEqual(normcase(file.path), normcase(fileobj.name))
1011 if WINDOWS:
1012 self.assertEqual(file.fd, -1)
1013 else:
1014 self.assertEqual(file.fd, fileobj.fileno())
1015 # test positions
1016 ntuple = p.open_files()[0]
1017 self.assertEqual(ntuple[0], ntuple.path)
1018 self.assertEqual(ntuple[1], ntuple.fd)
1019 # test file is gone
1020 self.assertNotIn(fileobj.name, p.open_files())
1021
1022 @unittest.skipIf(not POSIX, 'POSIX only')
1023 def test_num_fds(self):
1024 p = psutil.Process()
1025 start = p.num_fds()
1026 file = open(TESTFN, 'w')
1027 self.addCleanup(file.close)
1028 self.assertEqual(p.num_fds(), start + 1)
1029 sock = socket.socket()
1030 self.addCleanup(sock.close)
1031 self.assertEqual(p.num_fds(), start + 2)
1032 file.close()
1033 sock.close()
1034 self.assertEqual(p.num_fds(), start)
1035
1036 @skip_on_not_implemented(only_if=LINUX)
1037 @unittest.skipIf(OPENBSD or NETBSD, "not reliable on OPENBSD & NETBSD")
1038 def test_num_ctx_switches(self):
1039 p = psutil.Process()
1040 before = sum(p.num_ctx_switches())
1041 for x in range(500000):
1042 after = sum(p.num_ctx_switches())
1043 if after > before:
1044 return
1045 self.fail("num ctx switches still the same after 50.000 iterations")
1046
1047 def test_ppid(self):
1048 if hasattr(os, 'getppid'):
1049 self.assertEqual(psutil.Process().ppid(), os.getppid())
1050 this_parent = os.getpid()
1051 sproc = get_test_subprocess()
1052 p = psutil.Process(sproc.pid)
1053 self.assertEqual(p.ppid(), this_parent)
1054 # no other process is supposed to have us as parent
1055 reap_children(recursive=True)
1056 if APPVEYOR:
1057 # Occasional failures, see:
1058 # https://ci.appveyor.com/project/giampaolo/psutil/build/
1059 # job/0hs623nenj7w4m33
1060 return
1061 for p in psutil.process_iter():
1062 if p.pid == sproc.pid:
1063 continue
1064 # XXX: sometimes this fails on Windows; not sure why.
1065 self.assertNotEqual(p.ppid(), this_parent, msg=p)
1066
1067 def test_parent(self):
1068 this_parent = os.getpid()
1069 sproc = get_test_subprocess()
1070 p = psutil.Process(sproc.pid)
1071 self.assertEqual(p.parent().pid, this_parent)
1072
1073 lowest_pid = psutil.pids()[0]
1074 self.assertIsNone(psutil.Process(lowest_pid).parent())
1075
1076 def test_parent_multi(self):
1077 p1, p2 = create_proc_children_pair()
1078 self.assertEqual(p2.parent(), p1)
1079 self.assertEqual(p1.parent(), psutil.Process())
1080
1081 def test_parent_disappeared(self):
1082 # Emulate a case where the parent process disappeared.
1083 sproc = get_test_subprocess()
1084 p = psutil.Process(sproc.pid)
1085 with mock.patch("psutil.Process",
1086 side_effect=psutil.NoSuchProcess(0, 'foo')):
1087 self.assertIsNone(p.parent())
1088
1089 @retry_on_failure()
1090 def test_parents(self):
1091 assert psutil.Process().parents()
1092 p1, p2 = create_proc_children_pair()
1093 self.assertEqual(p1.parents()[0], psutil.Process())
1094 self.assertEqual(p2.parents()[0], p1)
1095 self.assertEqual(p2.parents()[1], psutil.Process())
1096
1097 def test_children(self):
1098 reap_children(recursive=True)
1099 p = psutil.Process()
1100 self.assertEqual(p.children(), [])
1101 self.assertEqual(p.children(recursive=True), [])
1102 # On Windows we set the flag to 0 in order to cancel out the
1103 # CREATE_NO_WINDOW flag (enabled by default) which creates
1104 # an extra "conhost.exe" child.
1105 sproc = get_test_subprocess(creationflags=0)
1106 children1 = p.children()
1107 children2 = p.children(recursive=True)
1108 for children in (children1, children2):
1109 self.assertEqual(len(children), 1)
1110 self.assertEqual(children[0].pid, sproc.pid)
1111 self.assertEqual(children[0].ppid(), os.getpid())
1112
1113 def test_children_recursive(self):
1114 # Test children() against two sub processes, p1 and p2, where
1115 # p1 (our child) spawned p2 (our grandchild).
1116 p1, p2 = create_proc_children_pair()
1117 p = psutil.Process()
1118 self.assertEqual(p.children(), [p1])
1119 self.assertEqual(p.children(recursive=True), [p1, p2])
1120 # If the intermediate process is gone there's no way for
1121 # children() to recursively find it.
1122 p1.terminate()
1123 p1.wait()
1124 self.assertEqual(p.children(recursive=True), [])
1125
1126 def test_children_duplicates(self):
1127 # find the process which has the highest number of children
1128 table = collections.defaultdict(int)
1129 for p in psutil.process_iter():
1130 try:
1131 table[p.ppid()] += 1
1132 except psutil.Error:
1133 pass
1134 # this is the one, now let's make sure there are no duplicates
1135 pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
1136 p = psutil.Process(pid)
1137 try:
1138 c = p.children(recursive=True)
1139 except psutil.AccessDenied: # windows
1140 pass
1141 else:
1142 self.assertEqual(len(c), len(set(c)))
1143
1144 def test_parents_and_children(self):
1145 p1, p2 = create_proc_children_pair()
1146 me = psutil.Process()
1147 # forward
1148 children = me.children(recursive=True)
1149 self.assertEqual(len(children), 2)
1150 self.assertEqual(children[0], p1)
1151 self.assertEqual(children[1], p2)
1152 # backward
1153 parents = p2.parents()
1154 self.assertEqual(parents[0], p1)
1155 self.assertEqual(parents[1], me)
1156
1157 def test_suspend_resume(self):
1158 sproc = get_test_subprocess()
1159 p = psutil.Process(sproc.pid)
1160 p.suspend()
1161 for x in range(100):
1162 if p.status() == psutil.STATUS_STOPPED:
1163 break
1164 time.sleep(0.01)
1165 p.resume()
1166 self.assertNotEqual(p.status(), psutil.STATUS_STOPPED)
1167
1168 def test_invalid_pid(self):
1169 self.assertRaises(TypeError, psutil.Process, "1")
1170 self.assertRaises(ValueError, psutil.Process, -1)
1171
1172 def test_as_dict(self):
1173 p = psutil.Process()
1174 d = p.as_dict(attrs=['exe', 'name'])
1175 self.assertEqual(sorted(d.keys()), ['exe', 'name'])
1176
1177 p = psutil.Process(min(psutil.pids()))
1178 d = p.as_dict(attrs=['connections'], ad_value='foo')
1179 if not isinstance(d['connections'], list):
1180 self.assertEqual(d['connections'], 'foo')
1181
1182 # Test ad_value is set on AccessDenied.
1183 with mock.patch('psutil.Process.nice', create=True,
1184 side_effect=psutil.AccessDenied):
1185 self.assertEqual(
1186 p.as_dict(attrs=["nice"], ad_value=1), {"nice": 1})
1187
1188 # Test that NoSuchProcess bubbles up.
1189 with mock.patch('psutil.Process.nice', create=True,
1190 side_effect=psutil.NoSuchProcess(p.pid, "name")):
1191 self.assertRaises(
1192 psutil.NoSuchProcess, p.as_dict, attrs=["nice"])
1193
1194 # Test that ZombieProcess is swallowed.
1195 with mock.patch('psutil.Process.nice', create=True,
1196 side_effect=psutil.ZombieProcess(p.pid, "name")):
1197 self.assertEqual(
1198 p.as_dict(attrs=["nice"], ad_value="foo"), {"nice": "foo"})
1199
1200 # By default APIs raising NotImplementedError are
1201 # supposed to be skipped.
1202 with mock.patch('psutil.Process.nice', create=True,
1203 side_effect=NotImplementedError):
1204 d = p.as_dict()
1205 self.assertNotIn('nice', list(d.keys()))
1206 # ...unless the user explicitly asked for some attr.
1207 with self.assertRaises(NotImplementedError):
1208 p.as_dict(attrs=["nice"])
1209
1210 # errors
1211 with self.assertRaises(TypeError):
1212 p.as_dict('name')
1213 with self.assertRaises(ValueError):
1214 p.as_dict(['foo'])
1215 with self.assertRaises(ValueError):
1216 p.as_dict(['foo', 'bar'])
1217
1218 def test_oneshot(self):
1219 with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1220 p = psutil.Process()
1221 with p.oneshot():
1222 p.cpu_times()
1223 p.cpu_times()
1224 self.assertEqual(m.call_count, 1)
1225
1226 with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1227 p.cpu_times()
1228 p.cpu_times()
1229 self.assertEqual(m.call_count, 2)
1230
1231 def test_oneshot_twice(self):
1232 # Test the case where the ctx manager is __enter__ed twice.
1233 # The second __enter__ is supposed to resut in a NOOP.
1234 with mock.patch("psutil._psplatform.Process.cpu_times") as m1:
1235 with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2:
1236 p = psutil.Process()
1237 with p.oneshot():
1238 p.cpu_times()
1239 p.cpu_times()
1240 with p.oneshot():
1241 p.cpu_times()
1242 p.cpu_times()
1243 self.assertEqual(m1.call_count, 1)
1244 self.assertEqual(m2.call_count, 1)
1245
1246 with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1247 p.cpu_times()
1248 p.cpu_times()
1249 self.assertEqual(m.call_count, 2)
1250
1251 def test_oneshot_cache(self):
1252 # Make sure oneshot() cache is nonglobal. Instead it's
1253 # supposed to be bound to the Process instance, see:
1254 # https://github.com/giampaolo/psutil/issues/1373
1255 p1, p2 = create_proc_children_pair()
1256 p1_ppid = p1.ppid()
1257 p2_ppid = p2.ppid()
1258 self.assertNotEqual(p1_ppid, p2_ppid)
1259 with p1.oneshot():
1260 self.assertEqual(p1.ppid(), p1_ppid)
1261 self.assertEqual(p2.ppid(), p2_ppid)
1262 with p2.oneshot():
1263 self.assertEqual(p1.ppid(), p1_ppid)
1264 self.assertEqual(p2.ppid(), p2_ppid)
1265
1266 def test_halfway_terminated_process(self):
1267 # Test that NoSuchProcess exception gets raised in case the
1268 # process dies after we create the Process object.
1269 # Example:
1270 # >>> proc = Process(1234)
1271 # >>> time.sleep(2) # time-consuming task, process dies in meantime
1272 # >>> proc.name()
1273 # Refers to Issue #15
1274 sproc = get_test_subprocess()
1275 p = psutil.Process(sproc.pid)
1276 p.terminate()
1277 p.wait()
1278 if WINDOWS:
1279 call_until(psutil.pids, "%s not in ret" % p.pid)
1280 assert not p.is_running()
1281
1282 if WINDOWS:
1283 with self.assertRaises(psutil.NoSuchProcess):
1284 p.send_signal(signal.CTRL_C_EVENT)
1285 with self.assertRaises(psutil.NoSuchProcess):
1286 p.send_signal(signal.CTRL_BREAK_EVENT)
1287
1288 excluded_names = ['pid', 'is_running', 'wait', 'create_time',
1289 'oneshot', 'memory_info_ex']
1290 if LINUX and not HAS_RLIMIT:
1291 excluded_names.append('rlimit')
1292 for name in dir(p):
1293 if (name.startswith('_') or
1294 name in excluded_names):
1295 continue
1296 try:
1297 meth = getattr(p, name)
1298 # get/set methods
1299 if name == 'nice':
1300 if POSIX:
1301 ret = meth(1)
1302 else:
1303 ret = meth(psutil.NORMAL_PRIORITY_CLASS)
1304 elif name == 'ionice':
1305 ret = meth()
1306 ret = meth(2)
1307 elif name == 'rlimit':
1308 ret = meth(psutil.RLIMIT_NOFILE)
1309 ret = meth(psutil.RLIMIT_NOFILE, (5, 5))
1310 elif name == 'cpu_affinity':
1311 ret = meth()
1312 ret = meth([0])
1313 elif name == 'send_signal':
1314 ret = meth(signal.SIGTERM)
1315 else:
1316 ret = meth()
1317 except psutil.ZombieProcess:
1318 self.fail("ZombieProcess for %r was not supposed to happen" %
1319 name)
1320 except psutil.NoSuchProcess:
1321 pass
1322 except psutil.AccessDenied:
1323 if OPENBSD and name in ('threads', 'num_threads'):
1324 pass
1325 else:
1326 raise
1327 except NotImplementedError:
1328 pass
1329 else:
1330 # NtQuerySystemInformation succeeds if process is gone.
1331 if WINDOWS and name in ('exe', 'name'):
1332 normcase = os.path.normcase
1333 if name == 'exe':
1334 self.assertEqual(normcase(ret), normcase(PYTHON_EXE))
1335 else:
1336 self.assertEqual(
1337 normcase(ret),
1338 normcase(os.path.basename(PYTHON_EXE)))
1339 continue
1340 self.fail(
1341 "NoSuchProcess exception not raised for %r, retval=%s" % (
1342 name, ret))
1343
1344 @unittest.skipIf(not POSIX, 'POSIX only')
1345 def test_zombie_process(self):
1346 def succeed_or_zombie_p_exc(fun, *args, **kwargs):
1347 try:
1348 return fun(*args, **kwargs)
1349 except (psutil.ZombieProcess, psutil.AccessDenied):
1350 pass
1351
1352 zpid = create_zombie_proc()
1353 self.addCleanup(reap_children, recursive=True)
1354 # A zombie process should always be instantiable
1355 zproc = psutil.Process(zpid)
1356 # ...and at least its status always be querable
1357 self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE)
1358 # ...and it should be considered 'running'
1359 self.assertTrue(zproc.is_running())
1360 # ...and as_dict() shouldn't crash
1361 zproc.as_dict()
1362 # if cmdline succeeds it should be an empty list
1363 ret = succeed_or_zombie_p_exc(zproc.suspend)
1364 if ret is not None:
1365 self.assertEqual(ret, [])
1366
1367 if hasattr(zproc, "rlimit"):
1368 succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE)
1369 succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE,
1370 (5, 5))
1371 # set methods
1372 succeed_or_zombie_p_exc(zproc.parent)
1373 if hasattr(zproc, 'cpu_affinity'):
1374 try:
1375 succeed_or_zombie_p_exc(zproc.cpu_affinity, [0])
1376 except ValueError as err:
1377 if TRAVIS and LINUX and "not eligible" in str(err):
1378 # https://travis-ci.org/giampaolo/psutil/jobs/279890461
1379 pass
1380 else:
1381 raise
1382
1383 succeed_or_zombie_p_exc(zproc.nice, 0)
1384 if hasattr(zproc, 'ionice'):
1385 if LINUX:
1386 succeed_or_zombie_p_exc(zproc.ionice, 2, 0)
1387 else:
1388 succeed_or_zombie_p_exc(zproc.ionice, 0) # Windows
1389 if hasattr(zproc, 'rlimit'):
1390 succeed_or_zombie_p_exc(zproc.rlimit,
1391 psutil.RLIMIT_NOFILE, (5, 5))
1392 succeed_or_zombie_p_exc(zproc.suspend)
1393 succeed_or_zombie_p_exc(zproc.resume)
1394 succeed_or_zombie_p_exc(zproc.terminate)
1395 succeed_or_zombie_p_exc(zproc.kill)
1396
1397 # ...its parent should 'see' it
1398 # edit: not true on BSD and MACOS
1399 # descendants = [x.pid for x in psutil.Process().children(
1400 # recursive=True)]
1401 # self.assertIn(zpid, descendants)
1402 # XXX should we also assume ppid be usable? Note: this
1403 # would be an important use case as the only way to get
1404 # rid of a zombie is to kill its parent.
1405 # self.assertEqual(zpid.ppid(), os.getpid())
1406 # ...and all other APIs should be able to deal with it
1407 self.assertTrue(psutil.pid_exists(zpid))
1408 if not TRAVIS and MACOS:
1409 # For some reason this started failing all of the sudden.
1410 # Maybe they upgraded MACOS version?
1411 # https://travis-ci.org/giampaolo/psutil/jobs/310896404
1412 self.assertIn(zpid, psutil.pids())
1413 self.assertIn(zpid, [x.pid for x in psutil.process_iter()])
1414 psutil._pmap = {}
1415 self.assertIn(zpid, [x.pid for x in psutil.process_iter()])
1416
1417 @unittest.skipIf(not POSIX, 'POSIX only')
1418 def test_zombie_process_is_running_w_exc(self):
1419 # Emulate a case where internally is_running() raises
1420 # ZombieProcess.
1421 p = psutil.Process()
1422 with mock.patch("psutil.Process",
1423 side_effect=psutil.ZombieProcess(0)) as m:
1424 assert p.is_running()
1425 assert m.called
1426
1427 @unittest.skipIf(not POSIX, 'POSIX only')
1428 def test_zombie_process_status_w_exc(self):
1429 # Emulate a case where internally status() raises
1430 # ZombieProcess.
1431 p = psutil.Process()
1432 with mock.patch("psutil._psplatform.Process.status",
1433 side_effect=psutil.ZombieProcess(0)) as m:
1434 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
1435 assert m.called
1436
1437 def test_pid_0(self):
1438 # Process(0) is supposed to work on all platforms except Linux
1439 if 0 not in psutil.pids():
1440 self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
1441 return
1442
1443 # test all methods
1444 p = psutil.Process(0)
1445 for name in psutil._as_dict_attrnames:
1446 if name == 'pid':
1447 continue
1448 meth = getattr(p, name)
1449 try:
1450 ret = meth()
1451 except psutil.AccessDenied:
1452 pass
1453 else:
1454 if name in ("uids", "gids"):
1455 self.assertEqual(ret.real, 0)
1456 elif name == "username":
1457 if POSIX:
1458 self.assertEqual(p.username(), 'root')
1459 elif WINDOWS:
1460 self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
1461 elif name == "name":
1462 assert name, name
1463
1464 if hasattr(p, 'rlimit'):
1465 try:
1466 p.rlimit(psutil.RLIMIT_FSIZE)
1467 except psutil.AccessDenied:
1468 pass
1469
1470 p.as_dict()
1471
1472 if not OPENBSD:
1473 self.assertIn(0, psutil.pids())
1474 self.assertTrue(psutil.pid_exists(0))
1475
1476 @unittest.skipIf(not HAS_ENVIRON, "not supported")
1477 def test_environ(self):
1478 def clean_dict(d):
1479 # Most of these are problematic on Travis.
1480 d.pop("PSUTIL_TESTING", None)
1481 d.pop("PLAT", None)
1482 d.pop("HOME", None)
1483 if MACOS:
1484 d.pop("__CF_USER_TEXT_ENCODING", None)
1485 d.pop("VERSIONER_PYTHON_PREFER_32_BIT", None)
1486 d.pop("VERSIONER_PYTHON_VERSION", None)
1487 return dict(
1488 [(k.replace("\r", "").replace("\n", ""),
1489 v.replace("\r", "").replace("\n", ""))
1490 for k, v in d.items()])
1491
1492 self.maxDiff = None
1493 p = psutil.Process()
1494 d1 = clean_dict(p.environ())
1495 d2 = clean_dict(os.environ.copy())
1496 self.assertEqual(d1, d2)
1497
1498 @unittest.skipIf(not HAS_ENVIRON, "not supported")
1499 @unittest.skipIf(not POSIX, "POSIX only")
1500 def test_weird_environ(self):
1501 # environment variables can contain values without an equals sign
1502 code = textwrap.dedent("""
1503 #include <unistd.h>
1504 #include <fcntl.h>
1505 char * const argv[] = {"cat", 0};
1506 char * const envp[] = {"A=1", "X", "C=3", 0};
1507 int main(void) {
1508 /* Close stderr on exec so parent can wait for the execve to
1509 * finish. */
1510 if (fcntl(2, F_SETFD, FD_CLOEXEC) != 0)
1511 return 0;
1512 return execve("/bin/cat", argv, envp);
1513 }
1514 """)
1515 path = TESTFN
1516 create_exe(path, c_code=code)
1517 self.addCleanup(safe_rmpath, path)
1518 sproc = get_test_subprocess([path],
1519 stdin=subprocess.PIPE,
1520 stderr=subprocess.PIPE)
1521 p = psutil.Process(sproc.pid)
1522 wait_for_pid(p.pid)
1523 self.assertTrue(p.is_running())
1524 # Wait for process to exec or exit.
1525 self.assertEqual(sproc.stderr.read(), b"")
1526 self.assertEqual(p.environ(), {"A": "1", "C": "3"})
1527 sproc.communicate()
1528 self.assertEqual(sproc.returncode, 0)
1529
1530
1531 # ===================================================================
1532 # --- Limited user tests
1533 # ===================================================================
1534
1535
1536 if POSIX and os.getuid() == 0:
1537 class LimitedUserTestCase(TestProcess):
1538 """Repeat the previous tests by using a limited user.
1539 Executed only on UNIX and only if the user who run the test script
1540 is root.
1541 """
1542 # the uid/gid the test suite runs under
1543 if hasattr(os, 'getuid'):
1544 PROCESS_UID = os.getuid()
1545 PROCESS_GID = os.getgid()
1546
1547 def __init__(self, *args, **kwargs):
1548 TestProcess.__init__(self, *args, **kwargs)
1549 # re-define all existent test methods in order to
1550 # ignore AccessDenied exceptions
1551 for attr in [x for x in dir(self) if x.startswith('test')]:
1552 meth = getattr(self, attr)
1553
1554 def test_(self):
1555 try:
1556 meth()
1557 except psutil.AccessDenied:
1558 pass
1559 setattr(self, attr, types.MethodType(test_, self))
1560
1561 def setUp(self):
1562 safe_rmpath(TESTFN)
1563 TestProcess.setUp(self)
1564 os.setegid(1000)
1565 os.seteuid(1000)
1566
1567 def tearDown(self):
1568 os.setegid(self.PROCESS_UID)
1569 os.seteuid(self.PROCESS_GID)
1570 TestProcess.tearDown(self)
1571
1572 def test_nice(self):
1573 try:
1574 psutil.Process().nice(-1)
1575 except psutil.AccessDenied:
1576 pass
1577 else:
1578 self.fail("exception not raised")
1579
1580 def test_zombie_process(self):
1581 # causes problems if test test suite is run as root
1582 pass
1583
1584
1585 # ===================================================================
1586 # --- psutil.Popen tests
1587 # ===================================================================
1588
1589
1590 class TestPopen(unittest.TestCase):
1591 """Tests for psutil.Popen class."""
1592
1593 def tearDown(self):
1594 reap_children()
1595
1596 def test_misc(self):
1597 # XXX this test causes a ResourceWarning on Python 3 because
1598 # psutil.__subproc instance doesn't get propertly freed.
1599 # Not sure what to do though.
1600 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
1601 with psutil.Popen(cmd, stdout=subprocess.PIPE,
1602 stderr=subprocess.PIPE) as proc:
1603 proc.name()
1604 proc.cpu_times()
1605 proc.stdin
1606 self.assertTrue(dir(proc))
1607 self.assertRaises(AttributeError, getattr, proc, 'foo')
1608 proc.terminate()
1609
1610 def test_ctx_manager(self):
1611 with psutil.Popen([PYTHON_EXE, "-V"],
1612 stdout=subprocess.PIPE,
1613 stderr=subprocess.PIPE,
1614 stdin=subprocess.PIPE) as proc:
1615 proc.communicate()
1616 assert proc.stdout.closed
1617 assert proc.stderr.closed
1618 assert proc.stdin.closed
1619 self.assertEqual(proc.returncode, 0)
1620
1621 def test_kill_terminate(self):
1622 # subprocess.Popen()'s terminate(), kill() and send_signal() do
1623 # not raise exception after the process is gone. psutil.Popen
1624 # diverges from that.
1625 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
1626 with psutil.Popen(cmd, stdout=subprocess.PIPE,
1627 stderr=subprocess.PIPE) as proc:
1628 proc.terminate()
1629 proc.wait()
1630 self.assertRaises(psutil.NoSuchProcess, proc.terminate)
1631 self.assertRaises(psutil.NoSuchProcess, proc.kill)
1632 self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1633 signal.SIGTERM)
1634 if WINDOWS and sys.version_info >= (2, 7):
1635 self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1636 signal.CTRL_C_EVENT)
1637 self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1638 signal.CTRL_BREAK_EVENT)
1639
1640
1641 if __name__ == '__main__':
1642 from psutil.tests.runner import run
1643 run(__file__)