comparison planemo/lib/python3.7/site-packages/psutil/tests/test_process.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """Tests for psutil.Process class."""
8
9 import collections
10 import errno
11 import getpass
12 import itertools
13 import os
14 import signal
15 import socket
16 import stat
17 import subprocess
18 import sys
19 import textwrap
20 import time
21 import types
22
23 import psutil
24
25 from psutil import AIX
26 from psutil import BSD
27 from psutil import LINUX
28 from psutil import MACOS
29 from psutil import NETBSD
30 from psutil import OPENBSD
31 from psutil import OSX
32 from psutil import POSIX
33 from psutil import SUNOS
34 from psutil import WINDOWS
35 from psutil._common import open_text
36 from psutil._compat import FileNotFoundError
37 from psutil._compat import long
38 from psutil._compat import PY3
39 from psutil._compat import super
40 from psutil.tests import APPVEYOR
41 from psutil.tests import call_until
42 from psutil.tests import CI_TESTING
43 from psutil.tests import CIRRUS
44 from psutil.tests import copyload_shared_lib
45 from psutil.tests import create_exe
46 from psutil.tests import GITHUB_WHEELS
47 from psutil.tests import GLOBAL_TIMEOUT
48 from psutil.tests import HAS_CPU_AFFINITY
49 from psutil.tests import HAS_ENVIRON
50 from psutil.tests import HAS_IONICE
51 from psutil.tests import HAS_MEMORY_MAPS
52 from psutil.tests import HAS_PROC_CPU_NUM
53 from psutil.tests import HAS_PROC_IO_COUNTERS
54 from psutil.tests import HAS_RLIMIT
55 from psutil.tests import HAS_THREADS
56 from psutil.tests import mock
57 from psutil.tests import process_namespace
58 from psutil.tests import PsutilTestCase
59 from psutil.tests import PYPY
60 from psutil.tests import PYTHON_EXE
61 from psutil.tests import reap_children
62 from psutil.tests import retry_on_failure
63 from psutil.tests import sh
64 from psutil.tests import skip_on_access_denied
65 from psutil.tests import skip_on_not_implemented
66 from psutil.tests import ThreadTask
67 from psutil.tests import TRAVIS
68 from psutil.tests import unittest
69 from psutil.tests import wait_for_pid
70
71
72 # ===================================================================
73 # --- psutil.Process class tests
74 # ===================================================================
75
76
77 class TestProcess(PsutilTestCase):
78 """Tests for psutil.Process class."""
79
80 def spawn_psproc(self, *args, **kwargs):
81 sproc = self.spawn_testproc(*args, **kwargs)
82 return psutil.Process(sproc.pid)
83
84 # ---
85
86 def test_pid(self):
87 p = psutil.Process()
88 self.assertEqual(p.pid, os.getpid())
89 with self.assertRaises(AttributeError):
90 p.pid = 33
91
92 def test_kill(self):
93 p = self.spawn_psproc()
94 p.kill()
95 code = p.wait()
96 if WINDOWS:
97 self.assertEqual(code, signal.SIGTERM)
98 else:
99 self.assertEqual(code, -signal.SIGKILL)
100 self.assertProcessGone(p)
101
102 def test_terminate(self):
103 p = self.spawn_psproc()
104 p.terminate()
105 code = p.wait()
106 if WINDOWS:
107 self.assertEqual(code, signal.SIGTERM)
108 else:
109 self.assertEqual(code, -signal.SIGTERM)
110 self.assertProcessGone(p)
111
112 def test_send_signal(self):
113 sig = signal.SIGKILL if POSIX else signal.SIGTERM
114 p = self.spawn_psproc()
115 p.send_signal(sig)
116 code = p.wait()
117 if WINDOWS:
118 self.assertEqual(code, sig)
119 else:
120 self.assertEqual(code, -sig)
121 self.assertProcessGone(p)
122
123 @unittest.skipIf(not POSIX, "not POSIX")
124 def test_send_signal_mocked(self):
125 sig = signal.SIGTERM
126 p = self.spawn_psproc()
127 with mock.patch('psutil.os.kill',
128 side_effect=OSError(errno.ESRCH, "")):
129 self.assertRaises(psutil.NoSuchProcess, p.send_signal, sig)
130
131 p = self.spawn_psproc()
132 with mock.patch('psutil.os.kill',
133 side_effect=OSError(errno.EPERM, "")):
134 self.assertRaises(psutil.AccessDenied, p.send_signal, sig)
135
136 def test_wait_exited(self):
137 # Test waitpid() + WIFEXITED -> WEXITSTATUS.
138 # normal return, same as exit(0)
139 cmd = [PYTHON_EXE, "-c", "pass"]
140 p = self.spawn_psproc(cmd)
141 code = p.wait()
142 self.assertEqual(code, 0)
143 self.assertProcessGone(p)
144 # exit(1), implicit in case of error
145 cmd = [PYTHON_EXE, "-c", "1 / 0"]
146 p = self.spawn_psproc(cmd, stderr=subprocess.PIPE)
147 code = p.wait()
148 self.assertEqual(code, 1)
149 self.assertProcessGone(p)
150 # via sys.exit()
151 cmd = [PYTHON_EXE, "-c", "import sys; sys.exit(5);"]
152 p = self.spawn_psproc(cmd)
153 code = p.wait()
154 self.assertEqual(code, 5)
155 self.assertProcessGone(p)
156 # via os._exit()
157 cmd = [PYTHON_EXE, "-c", "import os; os._exit(5);"]
158 p = self.spawn_psproc(cmd)
159 code = p.wait()
160 self.assertEqual(code, 5)
161 self.assertProcessGone(p)
162
163 def test_wait_stopped(self):
164 p = self.spawn_psproc()
165 if POSIX:
166 # Test waitpid() + WIFSTOPPED and WIFCONTINUED.
167 # Note: if a process is stopped it ignores SIGTERM.
168 p.send_signal(signal.SIGSTOP)
169 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
170 p.send_signal(signal.SIGCONT)
171 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
172 p.send_signal(signal.SIGTERM)
173 self.assertEqual(p.wait(), -signal.SIGTERM)
174 self.assertEqual(p.wait(), -signal.SIGTERM)
175 else:
176 p.suspend()
177 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
178 p.resume()
179 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
180 p.terminate()
181 self.assertEqual(p.wait(), signal.SIGTERM)
182 self.assertEqual(p.wait(), signal.SIGTERM)
183
184 def test_wait_non_children(self):
185 # Test wait() against a process which is not our direct
186 # child.
187 child, grandchild = self.spawn_children_pair()
188 self.assertRaises(psutil.TimeoutExpired, child.wait, 0.01)
189 self.assertRaises(psutil.TimeoutExpired, grandchild.wait, 0.01)
190 # We also terminate the direct child otherwise the
191 # grandchild will hang until the parent is gone.
192 child.terminate()
193 grandchild.terminate()
194 child_ret = child.wait()
195 grandchild_ret = grandchild.wait()
196 if POSIX:
197 self.assertEqual(child_ret, -signal.SIGTERM)
198 # For processes which are not our children we're supposed
199 # to get None.
200 self.assertEqual(grandchild_ret, None)
201 else:
202 self.assertEqual(child_ret, signal.SIGTERM)
203 self.assertEqual(child_ret, signal.SIGTERM)
204
205 def test_wait_timeout(self):
206 p = self.spawn_psproc()
207 p.name()
208 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
209 self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
210 self.assertRaises(ValueError, p.wait, -1)
211
212 def test_wait_timeout_nonblocking(self):
213 p = self.spawn_psproc()
214 self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
215 p.kill()
216 stop_at = time.time() + GLOBAL_TIMEOUT
217 while time.time() < stop_at:
218 try:
219 code = p.wait(0)
220 break
221 except psutil.TimeoutExpired:
222 pass
223 else:
224 raise self.fail('timeout')
225 if POSIX:
226 self.assertEqual(code, -signal.SIGKILL)
227 else:
228 self.assertEqual(code, signal.SIGTERM)
229 self.assertProcessGone(p)
230
231 def test_cpu_percent(self):
232 p = psutil.Process()
233 p.cpu_percent(interval=0.001)
234 p.cpu_percent(interval=0.001)
235 for x in range(100):
236 percent = p.cpu_percent(interval=None)
237 self.assertIsInstance(percent, float)
238 self.assertGreaterEqual(percent, 0.0)
239 with self.assertRaises(ValueError):
240 p.cpu_percent(interval=-1)
241
242 def test_cpu_percent_numcpus_none(self):
243 # See: https://github.com/giampaolo/psutil/issues/1087
244 with mock.patch('psutil.cpu_count', return_value=None) as m:
245 psutil.Process().cpu_percent()
246 assert m.called
247
248 def test_cpu_times(self):
249 times = psutil.Process().cpu_times()
250 assert (times.user > 0.0) or (times.system > 0.0), times
251 assert (times.children_user >= 0.0), times
252 assert (times.children_system >= 0.0), times
253 if LINUX:
254 assert times.iowait >= 0.0, times
255 # make sure returned values can be pretty printed with strftime
256 for name in times._fields:
257 time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))
258
259 def test_cpu_times_2(self):
260 user_time, kernel_time = psutil.Process().cpu_times()[:2]
261 utime, ktime = os.times()[:2]
262
263 # Use os.times()[:2] as base values to compare our results
264 # using a tolerance of +/- 0.1 seconds.
265 # It will fail if the difference between the values is > 0.1s.
266 if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
267 self.fail("expected: %s, found: %s" % (utime, user_time))
268
269 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
270 self.fail("expected: %s, found: %s" % (ktime, kernel_time))
271
272 @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported")
273 def test_cpu_num(self):
274 p = psutil.Process()
275 num = p.cpu_num()
276 self.assertGreaterEqual(num, 0)
277 if psutil.cpu_count() == 1:
278 self.assertEqual(num, 0)
279 self.assertIn(p.cpu_num(), range(psutil.cpu_count()))
280
281 def test_create_time(self):
282 p = self.spawn_psproc()
283 now = time.time()
284 create_time = p.create_time()
285
286 # Use time.time() as base value to compare our result using a
287 # tolerance of +/- 1 second.
288 # It will fail if the difference between the values is > 2s.
289 difference = abs(create_time - now)
290 if difference > 2:
291 self.fail("expected: %s, found: %s, difference: %s"
292 % (now, create_time, difference))
293
294 # make sure returned value can be pretty printed with strftime
295 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time()))
296
297 @unittest.skipIf(not POSIX, 'POSIX only')
298 @unittest.skipIf(TRAVIS or CIRRUS, 'not reliable on TRAVIS/CIRRUS')
299 def test_terminal(self):
300 terminal = psutil.Process().terminal()
301 if sys.stdout.isatty():
302 tty = os.path.realpath(sh('tty'))
303 self.assertEqual(terminal, tty)
304 else:
305 self.assertIsNone(terminal)
306
307 @unittest.skipIf(not HAS_PROC_IO_COUNTERS, 'not supported')
308 @skip_on_not_implemented(only_if=LINUX)
309 def test_io_counters(self):
310 p = psutil.Process()
311 # test reads
312 io1 = p.io_counters()
313 with open(PYTHON_EXE, 'rb') as f:
314 f.read()
315 io2 = p.io_counters()
316 if not BSD and not AIX:
317 self.assertGreater(io2.read_count, io1.read_count)
318 self.assertEqual(io2.write_count, io1.write_count)
319 if LINUX:
320 self.assertGreater(io2.read_chars, io1.read_chars)
321 self.assertEqual(io2.write_chars, io1.write_chars)
322 else:
323 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
324 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
325
326 # test writes
327 io1 = p.io_counters()
328 with open(self.get_testfn(), 'wb') as f:
329 if PY3:
330 f.write(bytes("x" * 1000000, 'ascii'))
331 else:
332 f.write("x" * 1000000)
333 io2 = p.io_counters()
334 self.assertGreaterEqual(io2.write_count, io1.write_count)
335 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
336 self.assertGreaterEqual(io2.read_count, io1.read_count)
337 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
338 if LINUX:
339 self.assertGreater(io2.write_chars, io1.write_chars)
340 self.assertGreaterEqual(io2.read_chars, io1.read_chars)
341
342 # sanity check
343 for i in range(len(io2)):
344 if BSD and i >= 2:
345 # On BSD read_bytes and write_bytes are always set to -1.
346 continue
347 self.assertGreaterEqual(io2[i], 0)
348 self.assertGreaterEqual(io2[i], 0)
349
350 @unittest.skipIf(not HAS_IONICE, "not supported")
351 @unittest.skipIf(not LINUX, "linux only")
352 def test_ionice_linux(self):
353 p = psutil.Process()
354 if not CI_TESTING:
355 self.assertEqual(p.ionice()[0], psutil.IOPRIO_CLASS_NONE)
356 self.assertEqual(psutil.IOPRIO_CLASS_NONE, 0)
357 self.assertEqual(psutil.IOPRIO_CLASS_RT, 1) # high
358 self.assertEqual(psutil.IOPRIO_CLASS_BE, 2) # normal
359 self.assertEqual(psutil.IOPRIO_CLASS_IDLE, 3) # low
360 init = p.ionice()
361 try:
362 # low
363 p.ionice(psutil.IOPRIO_CLASS_IDLE)
364 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_IDLE, 0))
365 with self.assertRaises(ValueError): # accepts no value
366 p.ionice(psutil.IOPRIO_CLASS_IDLE, value=7)
367 # normal
368 p.ionice(psutil.IOPRIO_CLASS_BE)
369 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 0))
370 p.ionice(psutil.IOPRIO_CLASS_BE, value=7)
371 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 7))
372 with self.assertRaises(ValueError):
373 p.ionice(psutil.IOPRIO_CLASS_BE, value=8)
374 try:
375 p.ionice(psutil.IOPRIO_CLASS_RT, value=7)
376 except psutil.AccessDenied:
377 pass
378 # errs
379 self.assertRaisesRegex(
380 ValueError, "ioclass accepts no value",
381 p.ionice, psutil.IOPRIO_CLASS_NONE, 1)
382 self.assertRaisesRegex(
383 ValueError, "ioclass accepts no value",
384 p.ionice, psutil.IOPRIO_CLASS_IDLE, 1)
385 self.assertRaisesRegex(
386 ValueError, "'ioclass' argument must be specified",
387 p.ionice, value=1)
388 finally:
389 ioclass, value = init
390 if ioclass == psutil.IOPRIO_CLASS_NONE:
391 value = 0
392 p.ionice(ioclass, value)
393
394 @unittest.skipIf(not HAS_IONICE, "not supported")
395 @unittest.skipIf(not WINDOWS, 'not supported on this win version')
396 def test_ionice_win(self):
397 p = psutil.Process()
398 if not CI_TESTING:
399 self.assertEqual(p.ionice(), psutil.IOPRIO_NORMAL)
400 init = p.ionice()
401 try:
402 # base
403 p.ionice(psutil.IOPRIO_VERYLOW)
404 self.assertEqual(p.ionice(), psutil.IOPRIO_VERYLOW)
405 p.ionice(psutil.IOPRIO_LOW)
406 self.assertEqual(p.ionice(), psutil.IOPRIO_LOW)
407 try:
408 p.ionice(psutil.IOPRIO_HIGH)
409 except psutil.AccessDenied:
410 pass
411 else:
412 self.assertEqual(p.ionice(), psutil.IOPRIO_HIGH)
413 # errs
414 self.assertRaisesRegex(
415 TypeError, "value argument not accepted on Windows",
416 p.ionice, psutil.IOPRIO_NORMAL, value=1)
417 self.assertRaisesRegex(
418 ValueError, "is not a valid priority",
419 p.ionice, psutil.IOPRIO_HIGH + 1)
420 finally:
421 p.ionice(init)
422
423 @unittest.skipIf(not HAS_RLIMIT, "not supported")
424 def test_rlimit_get(self):
425 import resource
426 p = psutil.Process(os.getpid())
427 names = [x for x in dir(psutil) if x.startswith('RLIMIT')]
428 assert names, names
429 for name in names:
430 value = getattr(psutil, name)
431 self.assertGreaterEqual(value, 0)
432 if name in dir(resource):
433 self.assertEqual(value, getattr(resource, name))
434 # XXX - On PyPy RLIMIT_INFINITY returned by
435 # resource.getrlimit() is reported as a very big long
436 # number instead of -1. It looks like a bug with PyPy.
437 if PYPY:
438 continue
439 self.assertEqual(p.rlimit(value), resource.getrlimit(value))
440 else:
441 ret = p.rlimit(value)
442 self.assertEqual(len(ret), 2)
443 self.assertGreaterEqual(ret[0], -1)
444 self.assertGreaterEqual(ret[1], -1)
445
446 @unittest.skipIf(not HAS_RLIMIT, "not supported")
447 def test_rlimit_set(self):
448 p = self.spawn_psproc()
449 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5))
450 self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5))
451 # If pid is 0 prlimit() applies to the calling process and
452 # we don't want that.
453 with self.assertRaises(ValueError):
454 psutil._psplatform.Process(0).rlimit(0)
455 with self.assertRaises(ValueError):
456 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5))
457
458 @unittest.skipIf(not HAS_RLIMIT, "not supported")
459 def test_rlimit(self):
460 p = psutil.Process()
461 testfn = self.get_testfn()
462 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
463 try:
464 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
465 with open(testfn, "wb") as f:
466 f.write(b"X" * 1024)
467 # write() or flush() doesn't always cause the exception
468 # but close() will.
469 with self.assertRaises(IOError) as exc:
470 with open(testfn, "wb") as f:
471 f.write(b"X" * 1025)
472 self.assertEqual(exc.exception.errno if PY3 else exc.exception[0],
473 errno.EFBIG)
474 finally:
475 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
476 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard))
477
478 @unittest.skipIf(not HAS_RLIMIT, "not supported")
479 def test_rlimit_infinity(self):
480 # First set a limit, then re-set it by specifying INFINITY
481 # and assume we overridden the previous limit.
482 p = psutil.Process()
483 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
484 try:
485 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
486 p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard))
487 with open(self.get_testfn(), "wb") as f:
488 f.write(b"X" * 2048)
489 finally:
490 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
491 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard))
492
493 @unittest.skipIf(not HAS_RLIMIT, "not supported")
494 def test_rlimit_infinity_value(self):
495 # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really
496 # big number on a platform with large file support. On these
497 # platforms we need to test that the get/setrlimit functions
498 # properly convert the number to a C long long and that the
499 # conversion doesn't raise an error.
500 p = psutil.Process()
501 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
502 self.assertEqual(psutil.RLIM_INFINITY, hard)
503 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
504
505 def test_num_threads(self):
506 # on certain platforms such as Linux we might test for exact
507 # thread number, since we always have with 1 thread per process,
508 # but this does not apply across all platforms (MACOS, Windows)
509 p = psutil.Process()
510 if OPENBSD:
511 try:
512 step1 = p.num_threads()
513 except psutil.AccessDenied:
514 raise unittest.SkipTest("on OpenBSD this requires root access")
515 else:
516 step1 = p.num_threads()
517
518 with ThreadTask():
519 step2 = p.num_threads()
520 self.assertEqual(step2, step1 + 1)
521
522 @unittest.skipIf(not WINDOWS, 'WINDOWS only')
523 def test_num_handles(self):
524 # a better test is done later into test/_windows.py
525 p = psutil.Process()
526 self.assertGreater(p.num_handles(), 0)
527
528 @unittest.skipIf(not HAS_THREADS, 'not supported')
529 def test_threads(self):
530 p = psutil.Process()
531 if OPENBSD:
532 try:
533 step1 = p.threads()
534 except psutil.AccessDenied:
535 raise unittest.SkipTest("on OpenBSD this requires root access")
536 else:
537 step1 = p.threads()
538
539 with ThreadTask():
540 step2 = p.threads()
541 self.assertEqual(len(step2), len(step1) + 1)
542 athread = step2[0]
543 # test named tuple
544 self.assertEqual(athread.id, athread[0])
545 self.assertEqual(athread.user_time, athread[1])
546 self.assertEqual(athread.system_time, athread[2])
547
548 @retry_on_failure()
549 @skip_on_access_denied(only_if=MACOS)
550 @unittest.skipIf(not HAS_THREADS, 'not supported')
551 def test_threads_2(self):
552 p = self.spawn_psproc()
553 if OPENBSD:
554 try:
555 p.threads()
556 except psutil.AccessDenied:
557 raise unittest.SkipTest(
558 "on OpenBSD this requires root access")
559 self.assertAlmostEqual(
560 p.cpu_times().user,
561 sum([x.user_time for x in p.threads()]), delta=0.1)
562 self.assertAlmostEqual(
563 p.cpu_times().system,
564 sum([x.system_time for x in p.threads()]), delta=0.1)
565
566 @retry_on_failure()
567 def test_memory_info(self):
568 p = psutil.Process()
569
570 # step 1 - get a base value to compare our results
571 rss1, vms1 = p.memory_info()[:2]
572 percent1 = p.memory_percent()
573 self.assertGreater(rss1, 0)
574 self.assertGreater(vms1, 0)
575
576 # step 2 - allocate some memory
577 memarr = [None] * 1500000
578
579 rss2, vms2 = p.memory_info()[:2]
580 percent2 = p.memory_percent()
581
582 # step 3 - make sure that the memory usage bumped up
583 self.assertGreater(rss2, rss1)
584 self.assertGreaterEqual(vms2, vms1) # vms might be equal
585 self.assertGreater(percent2, percent1)
586 del memarr
587
588 if WINDOWS:
589 mem = p.memory_info()
590 self.assertEqual(mem.rss, mem.wset)
591 self.assertEqual(mem.vms, mem.pagefile)
592
593 mem = p.memory_info()
594 for name in mem._fields:
595 self.assertGreaterEqual(getattr(mem, name), 0)
596
597 def test_memory_full_info(self):
598 p = psutil.Process()
599 total = psutil.virtual_memory().total
600 mem = p.memory_full_info()
601 for name in mem._fields:
602 value = getattr(mem, name)
603 self.assertGreaterEqual(value, 0, msg=(name, value))
604 if name == 'vms' and OSX or LINUX:
605 continue
606 self.assertLessEqual(value, total, msg=(name, value, total))
607 if LINUX or WINDOWS or MACOS:
608 self.assertGreaterEqual(mem.uss, 0)
609 if LINUX:
610 self.assertGreaterEqual(mem.pss, 0)
611 self.assertGreaterEqual(mem.swap, 0)
612
613 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
614 def test_memory_maps(self):
615 p = psutil.Process()
616 maps = p.memory_maps()
617 paths = [x for x in maps]
618 self.assertEqual(len(paths), len(set(paths)))
619 ext_maps = p.memory_maps(grouped=False)
620
621 for nt in maps:
622 if not nt.path.startswith('['):
623 assert os.path.isabs(nt.path), nt.path
624 if POSIX:
625 try:
626 assert os.path.exists(nt.path) or \
627 os.path.islink(nt.path), nt.path
628 except AssertionError:
629 if not LINUX:
630 raise
631 else:
632 # https://github.com/giampaolo/psutil/issues/759
633 with open_text('/proc/self/smaps') as f:
634 data = f.read()
635 if "%s (deleted)" % nt.path not in data:
636 raise
637 else:
638 # XXX - On Windows we have this strange behavior with
639 # 64 bit dlls: they are visible via explorer but cannot
640 # be accessed via os.stat() (wtf?).
641 if '64' not in os.path.basename(nt.path):
642 try:
643 st = os.stat(nt.path)
644 except FileNotFoundError:
645 pass
646 else:
647 assert stat.S_ISREG(st.st_mode), nt.path
648 for nt in ext_maps:
649 for fname in nt._fields:
650 value = getattr(nt, fname)
651 if fname == 'path':
652 continue
653 elif fname in ('addr', 'perms'):
654 assert value, value
655 else:
656 self.assertIsInstance(value, (int, long))
657 assert value >= 0, value
658
659 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
660 def test_memory_maps_lists_lib(self):
661 # Make sure a newly loaded shared lib is listed.
662 p = psutil.Process()
663 with copyload_shared_lib() as path:
664 def normpath(p):
665 return os.path.realpath(os.path.normcase(p))
666 libpaths = [normpath(x.path)
667 for x in p.memory_maps()]
668 self.assertIn(normpath(path), libpaths)
669
670 def test_memory_percent(self):
671 p = psutil.Process()
672 p.memory_percent()
673 self.assertRaises(ValueError, p.memory_percent, memtype="?!?")
674 if LINUX or MACOS or WINDOWS:
675 p.memory_percent(memtype='uss')
676
677 def test_is_running(self):
678 p = self.spawn_psproc()
679 assert p.is_running()
680 assert p.is_running()
681 p.kill()
682 p.wait()
683 assert not p.is_running()
684 assert not p.is_running()
685
686 def test_exe(self):
687 p = self.spawn_psproc()
688 exe = p.exe()
689 try:
690 self.assertEqual(exe, PYTHON_EXE)
691 except AssertionError:
692 if WINDOWS and len(exe) == len(PYTHON_EXE):
693 # on Windows we don't care about case sensitivity
694 normcase = os.path.normcase
695 self.assertEqual(normcase(exe), normcase(PYTHON_EXE))
696 else:
697 # certain platforms such as BSD are more accurate returning:
698 # "/usr/local/bin/python2.7"
699 # ...instead of:
700 # "/usr/local/bin/python"
701 # We do not want to consider this difference in accuracy
702 # an error.
703 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
704 try:
705 self.assertEqual(exe.replace(ver, ''),
706 PYTHON_EXE.replace(ver, ''))
707 except AssertionError:
708 # Tipically MACOS. Really not sure what to do here.
709 pass
710
711 out = sh([exe, "-c", "import os; print('hey')"])
712 self.assertEqual(out, 'hey')
713
714 def test_cmdline(self):
715 cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"]
716 p = self.spawn_psproc(cmdline)
717 try:
718 self.assertEqual(' '.join(p.cmdline()), ' '.join(cmdline))
719 except AssertionError:
720 # XXX - most of the times the underlying sysctl() call on Net
721 # and Open BSD returns a truncated string.
722 # Also /proc/pid/cmdline behaves the same so it looks
723 # like this is a kernel bug.
724 # XXX - AIX truncates long arguments in /proc/pid/cmdline
725 if NETBSD or OPENBSD or AIX:
726 self.assertEqual(p.cmdline()[0], PYTHON_EXE)
727 else:
728 raise
729
730 @unittest.skipIf(PYPY, "broken on PYPY")
731 def test_long_cmdline(self):
732 testfn = self.get_testfn()
733 create_exe(testfn)
734 cmdline = [testfn] + (["0123456789"] * 20)
735 p = self.spawn_psproc(cmdline)
736 self.assertEqual(p.cmdline(), cmdline)
737
738 def test_name(self):
739 p = self.spawn_psproc(PYTHON_EXE)
740 name = p.name().lower()
741 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
742 assert pyexe.startswith(name), (pyexe, name)
743
744 @unittest.skipIf(PYPY, "unreliable on PYPY")
745 def test_long_name(self):
746 testfn = self.get_testfn(suffix="0123456789" * 2)
747 create_exe(testfn)
748 p = self.spawn_psproc(testfn)
749 self.assertEqual(p.name(), os.path.basename(testfn))
750
751 # XXX
752 @unittest.skipIf(SUNOS, "broken on SUNOS")
753 @unittest.skipIf(AIX, "broken on AIX")
754 @unittest.skipIf(PYPY, "broken on PYPY")
755 def test_prog_w_funky_name(self):
756 # Test that name(), exe() and cmdline() correctly handle programs
757 # with funky chars such as spaces and ")", see:
758 # https://github.com/giampaolo/psutil/issues/628
759 funky_path = self.get_testfn(suffix='foo bar )')
760 create_exe(funky_path)
761 cmdline = [funky_path, "-c",
762 "import time; [time.sleep(0.01) for x in range(3000)];"
763 "arg1", "arg2", "", "arg3", ""]
764 p = self.spawn_psproc(cmdline)
765 # ...in order to try to prevent occasional failures on travis
766 if TRAVIS:
767 wait_for_pid(p.pid)
768 self.assertEqual(p.cmdline(), cmdline)
769 self.assertEqual(p.name(), os.path.basename(funky_path))
770 self.assertEqual(os.path.normcase(p.exe()),
771 os.path.normcase(funky_path))
772
773 @unittest.skipIf(not POSIX, 'POSIX only')
774 def test_uids(self):
775 p = psutil.Process()
776 real, effective, saved = p.uids()
777 # os.getuid() refers to "real" uid
778 self.assertEqual(real, os.getuid())
779 # os.geteuid() refers to "effective" uid
780 self.assertEqual(effective, os.geteuid())
781 # No such thing as os.getsuid() ("saved" uid), but starting
782 # from python 2.7 we have os.getresuid() which returns all
783 # of them.
784 if hasattr(os, "getresuid"):
785 self.assertEqual(os.getresuid(), p.uids())
786
787 @unittest.skipIf(not POSIX, 'POSIX only')
788 def test_gids(self):
789 p = psutil.Process()
790 real, effective, saved = p.gids()
791 # os.getuid() refers to "real" uid
792 self.assertEqual(real, os.getgid())
793 # os.geteuid() refers to "effective" uid
794 self.assertEqual(effective, os.getegid())
795 # No such thing as os.getsgid() ("saved" gid), but starting
796 # from python 2.7 we have os.getresgid() which returns all
797 # of them.
798 if hasattr(os, "getresuid"):
799 self.assertEqual(os.getresgid(), p.gids())
800
801 def test_nice(self):
802 p = psutil.Process()
803 self.assertRaises(TypeError, p.nice, "str")
804 init = p.nice()
805 try:
806 if WINDOWS:
807 for prio in [psutil.NORMAL_PRIORITY_CLASS,
808 psutil.IDLE_PRIORITY_CLASS,
809 psutil.BELOW_NORMAL_PRIORITY_CLASS,
810 psutil.REALTIME_PRIORITY_CLASS,
811 psutil.HIGH_PRIORITY_CLASS,
812 psutil.ABOVE_NORMAL_PRIORITY_CLASS]:
813 with self.subTest(prio=prio):
814 try:
815 p.nice(prio)
816 except psutil.AccessDenied:
817 pass
818 else:
819 self.assertEqual(p.nice(), prio)
820 else:
821 try:
822 if hasattr(os, "getpriority"):
823 self.assertEqual(
824 os.getpriority(os.PRIO_PROCESS, os.getpid()),
825 p.nice())
826 p.nice(1)
827 self.assertEqual(p.nice(), 1)
828 if hasattr(os, "getpriority"):
829 self.assertEqual(
830 os.getpriority(os.PRIO_PROCESS, os.getpid()),
831 p.nice())
832 # XXX - going back to previous nice value raises
833 # AccessDenied on MACOS
834 if not MACOS:
835 p.nice(0)
836 self.assertEqual(p.nice(), 0)
837 except psutil.AccessDenied:
838 pass
839 finally:
840 try:
841 p.nice(init)
842 except psutil.AccessDenied:
843 pass
844
845 def test_status(self):
846 p = psutil.Process()
847 self.assertEqual(p.status(), psutil.STATUS_RUNNING)
848
849 def test_username(self):
850 p = self.spawn_psproc()
851 username = p.username()
852 if WINDOWS:
853 domain, username = username.split('\\')
854 self.assertEqual(username, getpass.getuser())
855 if 'USERDOMAIN' in os.environ:
856 self.assertEqual(domain, os.environ['USERDOMAIN'])
857 else:
858 self.assertEqual(username, getpass.getuser())
859
860 def test_cwd(self):
861 p = self.spawn_psproc()
862 self.assertEqual(p.cwd(), os.getcwd())
863
864 def test_cwd_2(self):
865 cmd = [PYTHON_EXE, "-c",
866 "import os, time; os.chdir('..'); time.sleep(60)"]
867 p = self.spawn_psproc(cmd)
868 call_until(p.cwd, "ret == os.path.dirname(os.getcwd())")
869
870 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
871 def test_cpu_affinity(self):
872 p = psutil.Process()
873 initial = p.cpu_affinity()
874 assert initial, initial
875 self.addCleanup(p.cpu_affinity, initial)
876
877 if hasattr(os, "sched_getaffinity"):
878 self.assertEqual(initial, list(os.sched_getaffinity(p.pid)))
879 self.assertEqual(len(initial), len(set(initial)))
880
881 all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
882 # Work around travis failure:
883 # https://travis-ci.org/giampaolo/psutil/builds/284173194
884 for n in all_cpus if not TRAVIS else initial:
885 p.cpu_affinity([n])
886 self.assertEqual(p.cpu_affinity(), [n])
887 if hasattr(os, "sched_getaffinity"):
888 self.assertEqual(p.cpu_affinity(),
889 list(os.sched_getaffinity(p.pid)))
890 # also test num_cpu()
891 if hasattr(p, "num_cpu"):
892 self.assertEqual(p.cpu_affinity()[0], p.num_cpu())
893
894 # [] is an alias for "all eligible CPUs"; on Linux this may
895 # not be equal to all available CPUs, see:
896 # https://github.com/giampaolo/psutil/issues/956
897 p.cpu_affinity([])
898 if LINUX:
899 self.assertEqual(p.cpu_affinity(), p._proc._get_eligible_cpus())
900 else:
901 self.assertEqual(p.cpu_affinity(), all_cpus)
902 if hasattr(os, "sched_getaffinity"):
903 self.assertEqual(p.cpu_affinity(),
904 list(os.sched_getaffinity(p.pid)))
905 #
906 self.assertRaises(TypeError, p.cpu_affinity, 1)
907 p.cpu_affinity(initial)
908 # it should work with all iterables, not only lists
909 if not TRAVIS:
910 p.cpu_affinity(set(all_cpus))
911 p.cpu_affinity(tuple(all_cpus))
912
913 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
914 def test_cpu_affinity_errs(self):
915 p = self.spawn_psproc()
916 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
917 self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu)
918 self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000))
919 self.assertRaises(TypeError, p.cpu_affinity, [0, "1"])
920 self.assertRaises(ValueError, p.cpu_affinity, [0, -1])
921
922 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
923 def test_cpu_affinity_all_combinations(self):
924 p = psutil.Process()
925 initial = p.cpu_affinity()
926 assert initial, initial
927 self.addCleanup(p.cpu_affinity, initial)
928
929 # All possible CPU set combinations.
930 if len(initial) > 12:
931 initial = initial[:12] # ...otherwise it will take forever
932 combos = []
933 for i in range(0, len(initial) + 1):
934 for subset in itertools.combinations(initial, i):
935 if subset:
936 combos.append(list(subset))
937
938 for combo in combos:
939 p.cpu_affinity(combo)
940 self.assertEqual(sorted(p.cpu_affinity()), sorted(combo))
941
942 # TODO: #595
943 @unittest.skipIf(BSD, "broken on BSD")
944 # can't find any process file on Appveyor
945 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
946 def test_open_files(self):
947 p = psutil.Process()
948 testfn = self.get_testfn()
949 files = p.open_files()
950 self.assertNotIn(testfn, files)
951 with open(testfn, 'wb') as f:
952 f.write(b'x' * 1024)
953 f.flush()
954 # give the kernel some time to see the new file
955 files = call_until(p.open_files, "len(ret) != %i" % len(files))
956 filenames = [os.path.normcase(x.path) for x in files]
957 self.assertIn(os.path.normcase(testfn), filenames)
958 if LINUX:
959 for file in files:
960 if file.path == testfn:
961 self.assertEqual(file.position, 1024)
962 for file in files:
963 assert os.path.isfile(file.path), file
964
965 # another process
966 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn
967 p = self.spawn_psproc([PYTHON_EXE, "-c", cmdline])
968
969 for x in range(100):
970 filenames = [os.path.normcase(x.path) for x in p.open_files()]
971 if testfn in filenames:
972 break
973 time.sleep(.01)
974 else:
975 self.assertIn(os.path.normcase(testfn), filenames)
976 for file in filenames:
977 assert os.path.isfile(file), file
978
979 # TODO: #595
980 @unittest.skipIf(BSD, "broken on BSD")
981 # can't find any process file on Appveyor
982 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
983 def test_open_files_2(self):
984 # test fd and path fields
985 p = psutil.Process()
986 normcase = os.path.normcase
987 testfn = self.get_testfn()
988 with open(testfn, 'w') as fileobj:
989 for file in p.open_files():
990 if normcase(file.path) == normcase(fileobj.name) or \
991 file.fd == fileobj.fileno():
992 break
993 else:
994 self.fail("no file found; files=%s" % repr(p.open_files()))
995 self.assertEqual(normcase(file.path), normcase(fileobj.name))
996 if WINDOWS:
997 self.assertEqual(file.fd, -1)
998 else:
999 self.assertEqual(file.fd, fileobj.fileno())
1000 # test positions
1001 ntuple = p.open_files()[0]
1002 self.assertEqual(ntuple[0], ntuple.path)
1003 self.assertEqual(ntuple[1], ntuple.fd)
1004 # test file is gone
1005 self.assertNotIn(fileobj.name, p.open_files())
1006
1007 @unittest.skipIf(not POSIX, 'POSIX only')
1008 def test_num_fds(self):
1009 p = psutil.Process()
1010 testfn = self.get_testfn()
1011 start = p.num_fds()
1012 file = open(testfn, 'w')
1013 self.addCleanup(file.close)
1014 self.assertEqual(p.num_fds(), start + 1)
1015 sock = socket.socket()
1016 self.addCleanup(sock.close)
1017 self.assertEqual(p.num_fds(), start + 2)
1018 file.close()
1019 sock.close()
1020 self.assertEqual(p.num_fds(), start)
1021
1022 @skip_on_not_implemented(only_if=LINUX)
1023 @unittest.skipIf(OPENBSD or NETBSD, "not reliable on OPENBSD & NETBSD")
1024 def test_num_ctx_switches(self):
1025 p = psutil.Process()
1026 before = sum(p.num_ctx_switches())
1027 for x in range(500000):
1028 after = sum(p.num_ctx_switches())
1029 if after > before:
1030 return
1031 self.fail("num ctx switches still the same after 50.000 iterations")
1032
1033 def test_ppid(self):
1034 p = psutil.Process()
1035 if hasattr(os, 'getppid'):
1036 self.assertEqual(p.ppid(), os.getppid())
1037 p = self.spawn_psproc()
1038 self.assertEqual(p.ppid(), os.getpid())
1039 if APPVEYOR:
1040 # Occasional failures, see:
1041 # https://ci.appveyor.com/project/giampaolo/psutil/build/
1042 # job/0hs623nenj7w4m33
1043 return
1044
1045 def test_parent(self):
1046 p = self.spawn_psproc()
1047 self.assertEqual(p.parent().pid, os.getpid())
1048
1049 lowest_pid = psutil.pids()[0]
1050 self.assertIsNone(psutil.Process(lowest_pid).parent())
1051
1052 def test_parent_multi(self):
1053 parent = psutil.Process()
1054 child, grandchild = self.spawn_children_pair()
1055 self.assertEqual(grandchild.parent(), child)
1056 self.assertEqual(child.parent(), parent)
1057
1058 def test_parent_disappeared(self):
1059 # Emulate a case where the parent process disappeared.
1060 p = self.spawn_psproc()
1061 with mock.patch("psutil.Process",
1062 side_effect=psutil.NoSuchProcess(0, 'foo')):
1063 self.assertIsNone(p.parent())
1064
1065 @retry_on_failure()
1066 def test_parents(self):
1067 parent = psutil.Process()
1068 assert parent.parents()
1069 child, grandchild = self.spawn_children_pair()
1070 self.assertEqual(child.parents()[0], parent)
1071 self.assertEqual(grandchild.parents()[0], child)
1072 self.assertEqual(grandchild.parents()[1], parent)
1073
1074 def test_children(self):
1075 parent = psutil.Process()
1076 self.assertEqual(parent.children(), [])
1077 self.assertEqual(parent.children(recursive=True), [])
1078 # On Windows we set the flag to 0 in order to cancel out the
1079 # CREATE_NO_WINDOW flag (enabled by default) which creates
1080 # an extra "conhost.exe" child.
1081 child = self.spawn_psproc(creationflags=0)
1082 children1 = parent.children()
1083 children2 = parent.children(recursive=True)
1084 for children in (children1, children2):
1085 self.assertEqual(len(children), 1)
1086 self.assertEqual(children[0].pid, child.pid)
1087 self.assertEqual(children[0].ppid(), parent.pid)
1088
1089 def test_children_recursive(self):
1090 # Test children() against two sub processes, p1 and p2, where
1091 # p1 (our child) spawned p2 (our grandchild).
1092 parent = psutil.Process()
1093 child, grandchild = self.spawn_children_pair()
1094 self.assertEqual(parent.children(), [child])
1095 self.assertEqual(parent.children(recursive=True), [child, grandchild])
1096 # If the intermediate process is gone there's no way for
1097 # children() to recursively find it.
1098 child.terminate()
1099 child.wait()
1100 self.assertEqual(parent.children(recursive=True), [])
1101
1102 def test_children_duplicates(self):
1103 # find the process which has the highest number of children
1104 table = collections.defaultdict(int)
1105 for p in psutil.process_iter():
1106 try:
1107 table[p.ppid()] += 1
1108 except psutil.Error:
1109 pass
1110 # this is the one, now let's make sure there are no duplicates
1111 pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
1112 if LINUX and pid == 0:
1113 raise self.skipTest("PID 0")
1114 p = psutil.Process(pid)
1115 try:
1116 c = p.children(recursive=True)
1117 except psutil.AccessDenied: # windows
1118 pass
1119 else:
1120 self.assertEqual(len(c), len(set(c)))
1121
1122 def test_parents_and_children(self):
1123 parent = psutil.Process()
1124 child, grandchild = self.spawn_children_pair()
1125 # forward
1126 children = parent.children(recursive=True)
1127 self.assertEqual(len(children), 2)
1128 self.assertEqual(children[0], child)
1129 self.assertEqual(children[1], grandchild)
1130 # backward
1131 parents = grandchild.parents()
1132 self.assertEqual(parents[0], child)
1133 self.assertEqual(parents[1], parent)
1134
1135 def test_suspend_resume(self):
1136 p = self.spawn_psproc()
1137 p.suspend()
1138 for x in range(100):
1139 if p.status() == psutil.STATUS_STOPPED:
1140 break
1141 time.sleep(0.01)
1142 p.resume()
1143 self.assertNotEqual(p.status(), psutil.STATUS_STOPPED)
1144
1145 def test_invalid_pid(self):
1146 self.assertRaises(TypeError, psutil.Process, "1")
1147 self.assertRaises(ValueError, psutil.Process, -1)
1148
1149 def test_as_dict(self):
1150 p = psutil.Process()
1151 d = p.as_dict(attrs=['exe', 'name'])
1152 self.assertEqual(sorted(d.keys()), ['exe', 'name'])
1153
1154 p = psutil.Process(min(psutil.pids()))
1155 d = p.as_dict(attrs=['connections'], ad_value='foo')
1156 if not isinstance(d['connections'], list):
1157 self.assertEqual(d['connections'], 'foo')
1158
1159 # Test ad_value is set on AccessDenied.
1160 with mock.patch('psutil.Process.nice', create=True,
1161 side_effect=psutil.AccessDenied):
1162 self.assertEqual(
1163 p.as_dict(attrs=["nice"], ad_value=1), {"nice": 1})
1164
1165 # Test that NoSuchProcess bubbles up.
1166 with mock.patch('psutil.Process.nice', create=True,
1167 side_effect=psutil.NoSuchProcess(p.pid, "name")):
1168 self.assertRaises(
1169 psutil.NoSuchProcess, p.as_dict, attrs=["nice"])
1170
1171 # Test that ZombieProcess is swallowed.
1172 with mock.patch('psutil.Process.nice', create=True,
1173 side_effect=psutil.ZombieProcess(p.pid, "name")):
1174 self.assertEqual(
1175 p.as_dict(attrs=["nice"], ad_value="foo"), {"nice": "foo"})
1176
1177 # By default APIs raising NotImplementedError are
1178 # supposed to be skipped.
1179 with mock.patch('psutil.Process.nice', create=True,
1180 side_effect=NotImplementedError):
1181 d = p.as_dict()
1182 self.assertNotIn('nice', list(d.keys()))
1183 # ...unless the user explicitly asked for some attr.
1184 with self.assertRaises(NotImplementedError):
1185 p.as_dict(attrs=["nice"])
1186
1187 # errors
1188 with self.assertRaises(TypeError):
1189 p.as_dict('name')
1190 with self.assertRaises(ValueError):
1191 p.as_dict(['foo'])
1192 with self.assertRaises(ValueError):
1193 p.as_dict(['foo', 'bar'])
1194
1195 def test_oneshot(self):
1196 p = psutil.Process()
1197 with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1198 with p.oneshot():
1199 p.cpu_times()
1200 p.cpu_times()
1201 self.assertEqual(m.call_count, 1)
1202
1203 with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1204 p.cpu_times()
1205 p.cpu_times()
1206 self.assertEqual(m.call_count, 2)
1207
1208 def test_oneshot_twice(self):
1209 # Test the case where the ctx manager is __enter__ed twice.
1210 # The second __enter__ is supposed to resut in a NOOP.
1211 p = psutil.Process()
1212 with mock.patch("psutil._psplatform.Process.cpu_times") as m1:
1213 with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2:
1214 with p.oneshot():
1215 p.cpu_times()
1216 p.cpu_times()
1217 with p.oneshot():
1218 p.cpu_times()
1219 p.cpu_times()
1220 self.assertEqual(m1.call_count, 1)
1221 self.assertEqual(m2.call_count, 1)
1222
1223 with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1224 p.cpu_times()
1225 p.cpu_times()
1226 self.assertEqual(m.call_count, 2)
1227
1228 def test_oneshot_cache(self):
1229 # Make sure oneshot() cache is nonglobal. Instead it's
1230 # supposed to be bound to the Process instance, see:
1231 # https://github.com/giampaolo/psutil/issues/1373
1232 p1, p2 = self.spawn_children_pair()
1233 p1_ppid = p1.ppid()
1234 p2_ppid = p2.ppid()
1235 self.assertNotEqual(p1_ppid, p2_ppid)
1236 with p1.oneshot():
1237 self.assertEqual(p1.ppid(), p1_ppid)
1238 self.assertEqual(p2.ppid(), p2_ppid)
1239 with p2.oneshot():
1240 self.assertEqual(p1.ppid(), p1_ppid)
1241 self.assertEqual(p2.ppid(), p2_ppid)
1242
1243 def test_halfway_terminated_process(self):
1244 # Test that NoSuchProcess exception gets raised in case the
1245 # process dies after we create the Process object.
1246 # Example:
1247 # >>> proc = Process(1234)
1248 # >>> time.sleep(2) # time-consuming task, process dies in meantime
1249 # >>> proc.name()
1250 # Refers to Issue #15
1251 def assert_raises_nsp(fun, fun_name):
1252 try:
1253 ret = fun()
1254 except psutil.ZombieProcess: # differentiate from NSP
1255 raise
1256 except psutil.NoSuchProcess:
1257 pass
1258 except psutil.AccessDenied:
1259 if OPENBSD and fun_name in ('threads', 'num_threads'):
1260 return
1261 raise
1262 else:
1263 # NtQuerySystemInformation succeeds even if process is gone.
1264 if WINDOWS and fun_name in ('exe', 'name'):
1265 return
1266 raise self.fail("%r didn't raise NSP and returned %r "
1267 "instead" % (fun, ret))
1268
1269 p = self.spawn_psproc()
1270 p.terminate()
1271 p.wait()
1272 if WINDOWS: # XXX
1273 call_until(psutil.pids, "%s not in ret" % p.pid)
1274 self.assertProcessGone(p)
1275
1276 ns = process_namespace(p)
1277 for fun, name in ns.iter(ns.all):
1278 assert_raises_nsp(fun, name)
1279
1280 # NtQuerySystemInformation succeeds even if process is gone.
1281 if WINDOWS and not GITHUB_WHEELS:
1282 normcase = os.path.normcase
1283 self.assertEqual(normcase(p.exe()), normcase(PYTHON_EXE))
1284
1285 @unittest.skipIf(not POSIX, 'POSIX only')
1286 def test_zombie_process(self):
1287 def succeed_or_zombie_p_exc(fun):
1288 try:
1289 return fun()
1290 except (psutil.ZombieProcess, psutil.AccessDenied):
1291 pass
1292
1293 parent, zombie = self.spawn_zombie()
1294 # A zombie process should always be instantiable
1295 zproc = psutil.Process(zombie.pid)
1296 # ...and at least its status always be querable
1297 self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE)
1298 # ...and it should be considered 'running'
1299 assert zproc.is_running()
1300 # ...and as_dict() shouldn't crash
1301 zproc.as_dict()
1302 # ...its parent should 'see' it (edit: not true on BSD and MACOS
1303 # descendants = [x.pid for x in psutil.Process().children(
1304 # recursive=True)]
1305 # self.assertIn(zpid, descendants)
1306 # XXX should we also assume ppid be usable? Note: this
1307 # would be an important use case as the only way to get
1308 # rid of a zombie is to kill its parent.
1309 # self.assertEqual(zpid.ppid(), os.getpid())
1310 # ...and all other APIs should be able to deal with it
1311
1312 ns = process_namespace(zproc)
1313 for fun, name in ns.iter(ns.all):
1314 succeed_or_zombie_p_exc(fun)
1315
1316 assert psutil.pid_exists(zproc.pid)
1317 if not TRAVIS and MACOS:
1318 # For some reason this started failing all of the sudden.
1319 # Maybe they upgraded MACOS version?
1320 # https://travis-ci.org/giampaolo/psutil/jobs/310896404
1321 self.assertIn(zproc.pid, psutil.pids())
1322 self.assertIn(zproc.pid, [x.pid for x in psutil.process_iter()])
1323 psutil._pmap = {}
1324 self.assertIn(zproc.pid, [x.pid for x in psutil.process_iter()])
1325
1326 @unittest.skipIf(not POSIX, 'POSIX only')
1327 def test_zombie_process_is_running_w_exc(self):
1328 # Emulate a case where internally is_running() raises
1329 # ZombieProcess.
1330 p = psutil.Process()
1331 with mock.patch("psutil.Process",
1332 side_effect=psutil.ZombieProcess(0)) as m:
1333 assert p.is_running()
1334 assert m.called
1335
1336 @unittest.skipIf(not POSIX, 'POSIX only')
1337 def test_zombie_process_status_w_exc(self):
1338 # Emulate a case where internally status() raises
1339 # ZombieProcess.
1340 p = psutil.Process()
1341 with mock.patch("psutil._psplatform.Process.status",
1342 side_effect=psutil.ZombieProcess(0)) as m:
1343 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
1344 assert m.called
1345
1346 def test_pid_0(self):
1347 # Process(0) is supposed to work on all platforms except Linux
1348 if 0 not in psutil.pids():
1349 self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
1350 # These 2 are a contradiction, but "ps" says PID 1's parent
1351 # is PID 0.
1352 assert not psutil.pid_exists(0)
1353 self.assertEqual(psutil.Process(1).ppid(), 0)
1354 return
1355
1356 p = psutil.Process(0)
1357 exc = psutil.AccessDenied if WINDOWS else ValueError
1358 self.assertRaises(exc, p.wait)
1359 self.assertRaises(exc, p.terminate)
1360 self.assertRaises(exc, p.suspend)
1361 self.assertRaises(exc, p.resume)
1362 self.assertRaises(exc, p.kill)
1363 self.assertRaises(exc, p.send_signal, signal.SIGTERM)
1364
1365 # test all methods
1366 ns = process_namespace(p)
1367 for fun, name in ns.iter(ns.getters + ns.setters):
1368 try:
1369 ret = fun()
1370 except psutil.AccessDenied:
1371 pass
1372 else:
1373 if name in ("uids", "gids"):
1374 self.assertEqual(ret.real, 0)
1375 elif name == "username":
1376 user = 'NT AUTHORITY\\SYSTEM' if WINDOWS else 'root'
1377 self.assertEqual(p.username(), user)
1378 elif name == "name":
1379 assert name, name
1380
1381 if not OPENBSD:
1382 self.assertIn(0, psutil.pids())
1383 assert psutil.pid_exists(0)
1384
1385 @unittest.skipIf(not HAS_ENVIRON, "not supported")
1386 def test_environ(self):
1387 def clean_dict(d):
1388 # Most of these are problematic on Travis.
1389 d.pop("PSUTIL_TESTING", None)
1390 d.pop("PLAT", None)
1391 d.pop("HOME", None)
1392 if MACOS:
1393 d.pop("__CF_USER_TEXT_ENCODING", None)
1394 d.pop("VERSIONER_PYTHON_PREFER_32_BIT", None)
1395 d.pop("VERSIONER_PYTHON_VERSION", None)
1396 return dict(
1397 [(k.replace("\r", "").replace("\n", ""),
1398 v.replace("\r", "").replace("\n", ""))
1399 for k, v in d.items()])
1400
1401 self.maxDiff = None
1402 p = psutil.Process()
1403 d1 = clean_dict(p.environ())
1404 d2 = clean_dict(os.environ.copy())
1405 if not OSX and GITHUB_WHEELS:
1406 self.assertEqual(d1, d2)
1407
1408 @unittest.skipIf(not HAS_ENVIRON, "not supported")
1409 @unittest.skipIf(not POSIX, "POSIX only")
1410 def test_weird_environ(self):
1411 # environment variables can contain values without an equals sign
1412 code = textwrap.dedent("""
1413 #include <unistd.h>
1414 #include <fcntl.h>
1415 char * const argv[] = {"cat", 0};
1416 char * const envp[] = {"A=1", "X", "C=3", 0};
1417 int main(void) {
1418 /* Close stderr on exec so parent can wait for the execve to
1419 * finish. */
1420 if (fcntl(2, F_SETFD, FD_CLOEXEC) != 0)
1421 return 0;
1422 return execve("/bin/cat", argv, envp);
1423 }
1424 """)
1425 path = self.get_testfn()
1426 create_exe(path, c_code=code)
1427 sproc = self.spawn_testproc(
1428 [path], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
1429 p = psutil.Process(sproc.pid)
1430 wait_for_pid(p.pid)
1431 assert p.is_running()
1432 # Wait for process to exec or exit.
1433 self.assertEqual(sproc.stderr.read(), b"")
1434 self.assertEqual(p.environ(), {"A": "1", "C": "3"})
1435 sproc.communicate()
1436 self.assertEqual(sproc.returncode, 0)
1437
1438
1439 # ===================================================================
1440 # --- Limited user tests
1441 # ===================================================================
1442
1443
1444 if POSIX and os.getuid() == 0:
1445
1446 class LimitedUserTestCase(TestProcess):
1447 """Repeat the previous tests by using a limited user.
1448 Executed only on UNIX and only if the user who run the test script
1449 is root.
1450 """
1451 # the uid/gid the test suite runs under
1452 if hasattr(os, 'getuid'):
1453 PROCESS_UID = os.getuid()
1454 PROCESS_GID = os.getgid()
1455
1456 def __init__(self, *args, **kwargs):
1457 super().__init__(*args, **kwargs)
1458 # re-define all existent test methods in order to
1459 # ignore AccessDenied exceptions
1460 for attr in [x for x in dir(self) if x.startswith('test')]:
1461 meth = getattr(self, attr)
1462
1463 def test_(self):
1464 try:
1465 meth()
1466 except psutil.AccessDenied:
1467 pass
1468 setattr(self, attr, types.MethodType(test_, self))
1469
1470 def setUp(self):
1471 super().setUp()
1472 os.setegid(1000)
1473 os.seteuid(1000)
1474
1475 def tearDown(self):
1476 os.setegid(self.PROCESS_UID)
1477 os.seteuid(self.PROCESS_GID)
1478 super().tearDown()
1479
1480 def test_nice(self):
1481 try:
1482 psutil.Process().nice(-1)
1483 except psutil.AccessDenied:
1484 pass
1485 else:
1486 self.fail("exception not raised")
1487
1488 @unittest.skipIf(1, "causes problem as root")
1489 def test_zombie_process(self):
1490 pass
1491
1492
1493 # ===================================================================
1494 # --- psutil.Popen tests
1495 # ===================================================================
1496
1497
1498 class TestPopen(PsutilTestCase):
1499 """Tests for psutil.Popen class."""
1500
1501 @classmethod
1502 def tearDownClass(cls):
1503 reap_children()
1504
1505 def test_misc(self):
1506 # XXX this test causes a ResourceWarning on Python 3 because
1507 # psutil.__subproc instance doesn't get propertly freed.
1508 # Not sure what to do though.
1509 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
1510 with psutil.Popen(cmd, stdout=subprocess.PIPE,
1511 stderr=subprocess.PIPE) as proc:
1512 proc.name()
1513 proc.cpu_times()
1514 proc.stdin
1515 self.assertTrue(dir(proc))
1516 self.assertRaises(AttributeError, getattr, proc, 'foo')
1517 proc.terminate()
1518 if POSIX:
1519 self.assertEqual(proc.wait(5), -signal.SIGTERM)
1520 else:
1521 self.assertEqual(proc.wait(5), signal.SIGTERM)
1522
1523 def test_ctx_manager(self):
1524 with psutil.Popen([PYTHON_EXE, "-V"],
1525 stdout=subprocess.PIPE,
1526 stderr=subprocess.PIPE,
1527 stdin=subprocess.PIPE) as proc:
1528 proc.communicate()
1529 assert proc.stdout.closed
1530 assert proc.stderr.closed
1531 assert proc.stdin.closed
1532 self.assertEqual(proc.returncode, 0)
1533
1534 def test_kill_terminate(self):
1535 # subprocess.Popen()'s terminate(), kill() and send_signal() do
1536 # not raise exception after the process is gone. psutil.Popen
1537 # diverges from that.
1538 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
1539 with psutil.Popen(cmd, stdout=subprocess.PIPE,
1540 stderr=subprocess.PIPE) as proc:
1541 proc.terminate()
1542 proc.wait()
1543 self.assertRaises(psutil.NoSuchProcess, proc.terminate)
1544 self.assertRaises(psutil.NoSuchProcess, proc.kill)
1545 self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1546 signal.SIGTERM)
1547 if WINDOWS and sys.version_info >= (2, 7):
1548 self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1549 signal.CTRL_C_EVENT)
1550 self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1551 signal.CTRL_BREAK_EVENT)
1552
1553
1554 if __name__ == '__main__':
1555 from psutil.tests.runner import run_from_name
1556 run_from_name(__file__)