comparison planemo/lib/python3.7/site-packages/psutil/tests/test_windows.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 #!/usr/bin/env python3
2 # -*- coding: UTF-8 -*
3
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
5 # Use of this source code is governed by a BSD-style license that can be
6 # found in the LICENSE file.
7
8 """Windows specific tests."""
9
10 import datetime
11 import errno
12 import glob
13 import os
14 import platform
15 import re
16 import signal
17 import subprocess
18 import sys
19 import time
20 import warnings
21
22 import psutil
23 from psutil import WINDOWS
24 from psutil._compat import FileNotFoundError
25 from psutil._compat import super
26 from psutil.tests import APPVEYOR
27 from psutil.tests import GITHUB_WHEELS
28 from psutil.tests import HAS_BATTERY
29 from psutil.tests import IS_64BIT
30 from psutil.tests import mock
31 from psutil.tests import PsutilTestCase
32 from psutil.tests import PY3
33 from psutil.tests import PYPY
34 from psutil.tests import retry_on_failure
35 from psutil.tests import sh
36 from psutil.tests import spawn_testproc
37 from psutil.tests import terminate
38 from psutil.tests import TOLERANCE_DISK_USAGE
39 from psutil.tests import unittest
40
41
42 if WINDOWS and not PYPY:
43 with warnings.catch_warnings():
44 warnings.simplefilter("ignore")
45 import win32api # requires "pip install pywin32"
46 import win32con
47 import win32process
48 import wmi # requires "pip install wmi" / "make setup-dev-env"
49
50
51 cext = psutil._psplatform.cext
52
53
54 def wrap_exceptions(fun):
55 def wrapper(self, *args, **kwargs):
56 try:
57 return fun(self, *args, **kwargs)
58 except OSError as err:
59 from psutil._pswindows import ACCESS_DENIED_SET
60 if err.errno in ACCESS_DENIED_SET:
61 raise psutil.AccessDenied(None, None)
62 if err.errno == errno.ESRCH:
63 raise psutil.NoSuchProcess(None, None)
64 raise
65 return wrapper
66
67
68 @unittest.skipIf(not WINDOWS, "WINDOWS only")
69 @unittest.skipIf(PYPY, "pywin32 not available on PYPY")
70 # https://github.com/giampaolo/psutil/pull/1762#issuecomment-632892692
71 @unittest.skipIf(GITHUB_WHEELS and (not PY3 or not IS_64BIT),
72 "pywin32 broken on GITHUB + PY2")
73 class WindowsTestCase(PsutilTestCase):
74 pass
75
76
77 # ===================================================================
78 # System APIs
79 # ===================================================================
80
81
82 class TestCpuAPIs(WindowsTestCase):
83
84 @unittest.skipIf('NUMBER_OF_PROCESSORS' not in os.environ,
85 'NUMBER_OF_PROCESSORS env var is not available')
86 def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self):
87 # Will likely fail on many-cores systems:
88 # https://stackoverflow.com/questions/31209256
89 num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
90 self.assertEqual(num_cpus, psutil.cpu_count())
91
92 def test_cpu_count_vs_GetSystemInfo(self):
93 # Will likely fail on many-cores systems:
94 # https://stackoverflow.com/questions/31209256
95 sys_value = win32api.GetSystemInfo()[5]
96 psutil_value = psutil.cpu_count()
97 self.assertEqual(sys_value, psutil_value)
98
99 def test_cpu_count_logical_vs_wmi(self):
100 w = wmi.WMI()
101 proc = w.Win32_Processor()[0]
102 self.assertEqual(psutil.cpu_count(), proc.NumberOfLogicalProcessors)
103
104 def test_cpu_count_phys_vs_wmi(self):
105 w = wmi.WMI()
106 proc = w.Win32_Processor()[0]
107 self.assertEqual(psutil.cpu_count(logical=False), proc.NumberOfCores)
108
109 def test_cpu_count_vs_cpu_times(self):
110 self.assertEqual(psutil.cpu_count(),
111 len(psutil.cpu_times(percpu=True)))
112
113 def test_cpu_freq(self):
114 w = wmi.WMI()
115 proc = w.Win32_Processor()[0]
116 self.assertEqual(proc.CurrentClockSpeed, psutil.cpu_freq().current)
117 self.assertEqual(proc.MaxClockSpeed, psutil.cpu_freq().max)
118
119
120 class TestSystemAPIs(WindowsTestCase):
121
122 def test_nic_names(self):
123 out = sh('ipconfig /all')
124 nics = psutil.net_io_counters(pernic=True).keys()
125 for nic in nics:
126 if "pseudo-interface" in nic.replace(' ', '-').lower():
127 continue
128 if nic not in out:
129 self.fail(
130 "%r nic wasn't found in 'ipconfig /all' output" % nic)
131
132 def test_total_phymem(self):
133 w = wmi.WMI().Win32_ComputerSystem()[0]
134 self.assertEqual(int(w.TotalPhysicalMemory),
135 psutil.virtual_memory().total)
136
137 # @unittest.skipIf(wmi is None, "wmi module is not installed")
138 # def test__UPTIME(self):
139 # # _UPTIME constant is not public but it is used internally
140 # # as value to return for pid 0 creation time.
141 # # WMI behaves the same.
142 # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
143 # p = psutil.Process(0)
144 # wmic_create = str(w.CreationDate.split('.')[0])
145 # psutil_create = time.strftime("%Y%m%d%H%M%S",
146 # time.localtime(p.create_time()))
147
148 # Note: this test is not very reliable
149 @unittest.skipIf(APPVEYOR, "test not relieable on appveyor")
150 @retry_on_failure()
151 def test_pids(self):
152 # Note: this test might fail if the OS is starting/killing
153 # other processes in the meantime
154 w = wmi.WMI().Win32_Process()
155 wmi_pids = set([x.ProcessId for x in w])
156 psutil_pids = set(psutil.pids())
157 self.assertEqual(wmi_pids, psutil_pids)
158
159 @retry_on_failure()
160 def test_disks(self):
161 ps_parts = psutil.disk_partitions(all=True)
162 wmi_parts = wmi.WMI().Win32_LogicalDisk()
163 for ps_part in ps_parts:
164 for wmi_part in wmi_parts:
165 if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
166 if not ps_part.mountpoint:
167 # this is usually a CD-ROM with no disk inserted
168 break
169 if 'cdrom' in ps_part.opts:
170 break
171 if ps_part.mountpoint.startswith('A:'):
172 break # floppy
173 try:
174 usage = psutil.disk_usage(ps_part.mountpoint)
175 except FileNotFoundError:
176 # usually this is the floppy
177 break
178 self.assertEqual(usage.total, int(wmi_part.Size))
179 wmi_free = int(wmi_part.FreeSpace)
180 self.assertEqual(usage.free, wmi_free)
181 # 10 MB tollerance
182 if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
183 self.fail("psutil=%s, wmi=%s" % (
184 usage.free, wmi_free))
185 break
186 else:
187 self.fail("can't find partition %s" % repr(ps_part))
188
189 @retry_on_failure()
190 def test_disk_usage(self):
191 for disk in psutil.disk_partitions():
192 if 'cdrom' in disk.opts:
193 continue
194 sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint)
195 psutil_value = psutil.disk_usage(disk.mountpoint)
196 self.assertAlmostEqual(sys_value[0], psutil_value.free,
197 delta=TOLERANCE_DISK_USAGE)
198 self.assertAlmostEqual(sys_value[1], psutil_value.total,
199 delta=TOLERANCE_DISK_USAGE)
200 self.assertEqual(psutil_value.used,
201 psutil_value.total - psutil_value.free)
202
203 def test_disk_partitions(self):
204 sys_value = [
205 x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00")
206 if x and not x.startswith('A:')]
207 psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)
208 if not x.mountpoint.startswith('A:')]
209 self.assertEqual(sys_value, psutil_value)
210
211 def test_net_if_stats(self):
212 ps_names = set(cext.net_if_stats())
213 wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
214 wmi_names = set()
215 for wmi_adapter in wmi_adapters:
216 wmi_names.add(wmi_adapter.Name)
217 wmi_names.add(wmi_adapter.NetConnectionID)
218 self.assertTrue(ps_names & wmi_names,
219 "no common entries in %s, %s" % (ps_names, wmi_names))
220
221 def test_boot_time(self):
222 wmi_os = wmi.WMI().Win32_OperatingSystem()
223 wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0]
224 wmi_btime_dt = datetime.datetime.strptime(
225 wmi_btime_str, "%Y%m%d%H%M%S")
226 psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time())
227 diff = abs((wmi_btime_dt - psutil_dt).total_seconds())
228 self.assertLessEqual(diff, 3)
229
230 def test_boot_time_fluctuation(self):
231 # https://github.com/giampaolo/psutil/issues/1007
232 with mock.patch('psutil._pswindows.cext.boot_time', return_value=5):
233 self.assertEqual(psutil.boot_time(), 5)
234 with mock.patch('psutil._pswindows.cext.boot_time', return_value=4):
235 self.assertEqual(psutil.boot_time(), 5)
236 with mock.patch('psutil._pswindows.cext.boot_time', return_value=6):
237 self.assertEqual(psutil.boot_time(), 5)
238 with mock.patch('psutil._pswindows.cext.boot_time', return_value=333):
239 self.assertEqual(psutil.boot_time(), 333)
240
241
242 # ===================================================================
243 # sensors_battery()
244 # ===================================================================
245
246
247 class TestSensorsBattery(WindowsTestCase):
248
249 def test_has_battery(self):
250 if win32api.GetPwrCapabilities()['SystemBatteriesPresent']:
251 self.assertIsNotNone(psutil.sensors_battery())
252 else:
253 self.assertIsNone(psutil.sensors_battery())
254
255 @unittest.skipIf(not HAS_BATTERY, "no battery")
256 def test_percent(self):
257 w = wmi.WMI()
258 battery_wmi = w.query('select * from Win32_Battery')[0]
259 battery_psutil = psutil.sensors_battery()
260 self.assertAlmostEqual(
261 battery_psutil.percent, battery_wmi.EstimatedChargeRemaining,
262 delta=1)
263
264 @unittest.skipIf(not HAS_BATTERY, "no battery")
265 def test_power_plugged(self):
266 w = wmi.WMI()
267 battery_wmi = w.query('select * from Win32_Battery')[0]
268 battery_psutil = psutil.sensors_battery()
269 # Status codes:
270 # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx
271 self.assertEqual(battery_psutil.power_plugged,
272 battery_wmi.BatteryStatus == 2)
273
274 def test_emulate_no_battery(self):
275 with mock.patch("psutil._pswindows.cext.sensors_battery",
276 return_value=(0, 128, 0, 0)) as m:
277 self.assertIsNone(psutil.sensors_battery())
278 assert m.called
279
280 def test_emulate_power_connected(self):
281 with mock.patch("psutil._pswindows.cext.sensors_battery",
282 return_value=(1, 0, 0, 0)) as m:
283 self.assertEqual(psutil.sensors_battery().secsleft,
284 psutil.POWER_TIME_UNLIMITED)
285 assert m.called
286
287 def test_emulate_power_charging(self):
288 with mock.patch("psutil._pswindows.cext.sensors_battery",
289 return_value=(0, 8, 0, 0)) as m:
290 self.assertEqual(psutil.sensors_battery().secsleft,
291 psutil.POWER_TIME_UNLIMITED)
292 assert m.called
293
294 def test_emulate_secs_left_unknown(self):
295 with mock.patch("psutil._pswindows.cext.sensors_battery",
296 return_value=(0, 0, 0, -1)) as m:
297 self.assertEqual(psutil.sensors_battery().secsleft,
298 psutil.POWER_TIME_UNKNOWN)
299 assert m.called
300
301
302 # ===================================================================
303 # Process APIs
304 # ===================================================================
305
306
307 class TestProcess(WindowsTestCase):
308
309 @classmethod
310 def setUpClass(cls):
311 cls.pid = spawn_testproc().pid
312
313 @classmethod
314 def tearDownClass(cls):
315 terminate(cls.pid)
316
317 def test_issue_24(self):
318 p = psutil.Process(0)
319 self.assertRaises(psutil.AccessDenied, p.kill)
320
321 def test_special_pid(self):
322 p = psutil.Process(4)
323 self.assertEqual(p.name(), 'System')
324 # use __str__ to access all common Process properties to check
325 # that nothing strange happens
326 str(p)
327 p.username()
328 self.assertTrue(p.create_time() >= 0.0)
329 try:
330 rss, vms = p.memory_info()[:2]
331 except psutil.AccessDenied:
332 # expected on Windows Vista and Windows 7
333 if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
334 raise
335 else:
336 self.assertTrue(rss > 0)
337
338 def test_send_signal(self):
339 p = psutil.Process(self.pid)
340 self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
341
342 def test_num_handles_increment(self):
343 p = psutil.Process(os.getpid())
344 before = p.num_handles()
345 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
346 win32con.FALSE, os.getpid())
347 after = p.num_handles()
348 self.assertEqual(after, before + 1)
349 win32api.CloseHandle(handle)
350 self.assertEqual(p.num_handles(), before)
351
352 @unittest.skipIf(not sys.version_info >= (2, 7),
353 "CTRL_* signals not supported")
354 def test_ctrl_signals(self):
355 p = psutil.Process(self.spawn_testproc().pid)
356 p.send_signal(signal.CTRL_C_EVENT)
357 p.send_signal(signal.CTRL_BREAK_EVENT)
358 p.kill()
359 p.wait()
360 self.assertRaises(psutil.NoSuchProcess,
361 p.send_signal, signal.CTRL_C_EVENT)
362 self.assertRaises(psutil.NoSuchProcess,
363 p.send_signal, signal.CTRL_BREAK_EVENT)
364
365 def test_username(self):
366 self.assertEqual(psutil.Process().username(),
367 win32api.GetUserNameEx(win32con.NameSamCompatible))
368
369 def test_cmdline(self):
370 sys_value = re.sub(' +', ' ', win32api.GetCommandLine()).strip()
371 psutil_value = ' '.join(psutil.Process().cmdline())
372 self.assertEqual(sys_value, psutil_value)
373
374 # XXX - occasional failures
375
376 # def test_cpu_times(self):
377 # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
378 # win32con.FALSE, os.getpid())
379 # self.addCleanup(win32api.CloseHandle, handle)
380 # sys_value = win32process.GetProcessTimes(handle)
381 # psutil_value = psutil.Process().cpu_times()
382 # self.assertAlmostEqual(
383 # psutil_value.user, sys_value['UserTime'] / 10000000.0,
384 # delta=0.2)
385 # self.assertAlmostEqual(
386 # psutil_value.user, sys_value['KernelTime'] / 10000000.0,
387 # delta=0.2)
388
389 def test_nice(self):
390 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
391 win32con.FALSE, os.getpid())
392 self.addCleanup(win32api.CloseHandle, handle)
393 sys_value = win32process.GetPriorityClass(handle)
394 psutil_value = psutil.Process().nice()
395 self.assertEqual(psutil_value, sys_value)
396
397 def test_memory_info(self):
398 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
399 win32con.FALSE, self.pid)
400 self.addCleanup(win32api.CloseHandle, handle)
401 sys_value = win32process.GetProcessMemoryInfo(handle)
402 psutil_value = psutil.Process(self.pid).memory_info()
403 self.assertEqual(
404 sys_value['PeakWorkingSetSize'], psutil_value.peak_wset)
405 self.assertEqual(
406 sys_value['WorkingSetSize'], psutil_value.wset)
407 self.assertEqual(
408 sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool)
409 self.assertEqual(
410 sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool)
411 self.assertEqual(
412 sys_value['QuotaPeakNonPagedPoolUsage'],
413 psutil_value.peak_nonpaged_pool)
414 self.assertEqual(
415 sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool)
416 self.assertEqual(
417 sys_value['PagefileUsage'], psutil_value.pagefile)
418 self.assertEqual(
419 sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile)
420
421 self.assertEqual(psutil_value.rss, psutil_value.wset)
422 self.assertEqual(psutil_value.vms, psutil_value.pagefile)
423
424 def test_wait(self):
425 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
426 win32con.FALSE, self.pid)
427 self.addCleanup(win32api.CloseHandle, handle)
428 p = psutil.Process(self.pid)
429 p.terminate()
430 psutil_value = p.wait()
431 sys_value = win32process.GetExitCodeProcess(handle)
432 self.assertEqual(psutil_value, sys_value)
433
434 def test_cpu_affinity(self):
435 def from_bitmask(x):
436 return [i for i in range(64) if (1 << i) & x]
437
438 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
439 win32con.FALSE, self.pid)
440 self.addCleanup(win32api.CloseHandle, handle)
441 sys_value = from_bitmask(
442 win32process.GetProcessAffinityMask(handle)[0])
443 psutil_value = psutil.Process(self.pid).cpu_affinity()
444 self.assertEqual(psutil_value, sys_value)
445
446 def test_io_counters(self):
447 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
448 win32con.FALSE, os.getpid())
449 self.addCleanup(win32api.CloseHandle, handle)
450 sys_value = win32process.GetProcessIoCounters(handle)
451 psutil_value = psutil.Process().io_counters()
452 self.assertEqual(
453 psutil_value.read_count, sys_value['ReadOperationCount'])
454 self.assertEqual(
455 psutil_value.write_count, sys_value['WriteOperationCount'])
456 self.assertEqual(
457 psutil_value.read_bytes, sys_value['ReadTransferCount'])
458 self.assertEqual(
459 psutil_value.write_bytes, sys_value['WriteTransferCount'])
460 self.assertEqual(
461 psutil_value.other_count, sys_value['OtherOperationCount'])
462 self.assertEqual(
463 psutil_value.other_bytes, sys_value['OtherTransferCount'])
464
465 def test_num_handles(self):
466 import ctypes
467 import ctypes.wintypes
468 PROCESS_QUERY_INFORMATION = 0x400
469 handle = ctypes.windll.kernel32.OpenProcess(
470 PROCESS_QUERY_INFORMATION, 0, self.pid)
471 self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle)
472
473 hndcnt = ctypes.wintypes.DWORD()
474 ctypes.windll.kernel32.GetProcessHandleCount(
475 handle, ctypes.byref(hndcnt))
476 sys_value = hndcnt.value
477 psutil_value = psutil.Process(self.pid).num_handles()
478 self.assertEqual(psutil_value, sys_value)
479
480 def test_error_partial_copy(self):
481 # https://github.com/giampaolo/psutil/issues/875
482 exc = WindowsError()
483 exc.winerror = 299
484 with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc):
485 with mock.patch("time.sleep") as m:
486 p = psutil.Process()
487 self.assertRaises(psutil.AccessDenied, p.cwd)
488 self.assertGreaterEqual(m.call_count, 5)
489
490 def test_exe(self):
491 # NtQuerySystemInformation succeeds if process is gone. Make sure
492 # it raises NSP for a non existent pid.
493 pid = psutil.pids()[-1] + 99999
494 proc = psutil._psplatform.Process(pid)
495 self.assertRaises(psutil.NoSuchProcess, proc.exe)
496
497
498 class TestProcessWMI(WindowsTestCase):
499 """Compare Process API results with WMI."""
500
501 @classmethod
502 def setUpClass(cls):
503 cls.pid = spawn_testproc().pid
504
505 @classmethod
506 def tearDownClass(cls):
507 terminate(cls.pid)
508
509 def test_name(self):
510 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
511 p = psutil.Process(self.pid)
512 self.assertEqual(p.name(), w.Caption)
513
514 # This fail on github because using virtualenv for test environment
515 @unittest.skipIf(GITHUB_WHEELS, "unreliable path on GITHUB_WHEELS")
516 def test_exe(self):
517 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
518 p = psutil.Process(self.pid)
519 # Note: wmi reports the exe as a lower case string.
520 # Being Windows paths case-insensitive we ignore that.
521 self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
522
523 def test_cmdline(self):
524 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
525 p = psutil.Process(self.pid)
526 self.assertEqual(' '.join(p.cmdline()),
527 w.CommandLine.replace('"', ''))
528
529 def test_username(self):
530 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
531 p = psutil.Process(self.pid)
532 domain, _, username = w.GetOwner()
533 username = "%s\\%s" % (domain, username)
534 self.assertEqual(p.username(), username)
535
536 @retry_on_failure()
537 def test_memory_rss(self):
538 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
539 p = psutil.Process(self.pid)
540 rss = p.memory_info().rss
541 self.assertEqual(rss, int(w.WorkingSetSize))
542
543 @retry_on_failure()
544 def test_memory_vms(self):
545 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
546 p = psutil.Process(self.pid)
547 vms = p.memory_info().vms
548 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
549 # ...claims that PageFileUsage is represented in Kilo
550 # bytes but funnily enough on certain platforms bytes are
551 # returned instead.
552 wmi_usage = int(w.PageFileUsage)
553 if (vms != wmi_usage) and (vms != wmi_usage * 1024):
554 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
555
556 def test_create_time(self):
557 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
558 p = psutil.Process(self.pid)
559 wmic_create = str(w.CreationDate.split('.')[0])
560 psutil_create = time.strftime("%Y%m%d%H%M%S",
561 time.localtime(p.create_time()))
562 self.assertEqual(wmic_create, psutil_create)
563
564
565 # ---
566
567
568 @unittest.skipIf(not WINDOWS, "WINDOWS only")
569 class TestDualProcessImplementation(PsutilTestCase):
570 """
571 Certain APIs on Windows have 2 internal implementations, one
572 based on documented Windows APIs, another one based
573 NtQuerySystemInformation() which gets called as fallback in
574 case the first fails because of limited permission error.
575 Here we test that the two methods return the exact same value,
576 see:
577 https://github.com/giampaolo/psutil/issues/304
578 """
579
580 @classmethod
581 def setUpClass(cls):
582 cls.pid = spawn_testproc().pid
583
584 @classmethod
585 def tearDownClass(cls):
586 terminate(cls.pid)
587
588 def test_memory_info(self):
589 mem_1 = psutil.Process(self.pid).memory_info()
590 with mock.patch("psutil._psplatform.cext.proc_memory_info",
591 side_effect=OSError(errno.EPERM, "msg")) as fun:
592 mem_2 = psutil.Process(self.pid).memory_info()
593 self.assertEqual(len(mem_1), len(mem_2))
594 for i in range(len(mem_1)):
595 self.assertGreaterEqual(mem_1[i], 0)
596 self.assertGreaterEqual(mem_2[i], 0)
597 self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512)
598 assert fun.called
599
600 def test_create_time(self):
601 ctime = psutil.Process(self.pid).create_time()
602 with mock.patch("psutil._psplatform.cext.proc_times",
603 side_effect=OSError(errno.EPERM, "msg")) as fun:
604 self.assertEqual(psutil.Process(self.pid).create_time(), ctime)
605 assert fun.called
606
607 def test_cpu_times(self):
608 cpu_times_1 = psutil.Process(self.pid).cpu_times()
609 with mock.patch("psutil._psplatform.cext.proc_times",
610 side_effect=OSError(errno.EPERM, "msg")) as fun:
611 cpu_times_2 = psutil.Process(self.pid).cpu_times()
612 assert fun.called
613 self.assertAlmostEqual(
614 cpu_times_1.user, cpu_times_2.user, delta=0.01)
615 self.assertAlmostEqual(
616 cpu_times_1.system, cpu_times_2.system, delta=0.01)
617
618 def test_io_counters(self):
619 io_counters_1 = psutil.Process(self.pid).io_counters()
620 with mock.patch("psutil._psplatform.cext.proc_io_counters",
621 side_effect=OSError(errno.EPERM, "msg")) as fun:
622 io_counters_2 = psutil.Process(self.pid).io_counters()
623 for i in range(len(io_counters_1)):
624 self.assertAlmostEqual(
625 io_counters_1[i], io_counters_2[i], delta=5)
626 assert fun.called
627
628 def test_num_handles(self):
629 num_handles = psutil.Process(self.pid).num_handles()
630 with mock.patch("psutil._psplatform.cext.proc_num_handles",
631 side_effect=OSError(errno.EPERM, "msg")) as fun:
632 self.assertEqual(psutil.Process(self.pid).num_handles(),
633 num_handles)
634 assert fun.called
635
636 def test_cmdline(self):
637 from psutil._pswindows import convert_oserror
638 for pid in psutil.pids():
639 try:
640 a = cext.proc_cmdline(pid, use_peb=True)
641 b = cext.proc_cmdline(pid, use_peb=False)
642 except OSError as err:
643 err = convert_oserror(err)
644 if not isinstance(err, (psutil.AccessDenied,
645 psutil.NoSuchProcess)):
646 raise
647 else:
648 self.assertEqual(a, b)
649
650
651 @unittest.skipIf(not WINDOWS, "WINDOWS only")
652 class RemoteProcessTestCase(PsutilTestCase):
653 """Certain functions require calling ReadProcessMemory.
654 This trivially works when called on the current process.
655 Check that this works on other processes, especially when they
656 have a different bitness.
657 """
658
659 @staticmethod
660 def find_other_interpreter():
661 # find a python interpreter that is of the opposite bitness from us
662 code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))"
663
664 # XXX: a different and probably more stable approach might be to access
665 # the registry but accessing 64 bit paths from a 32 bit process
666 for filename in glob.glob(r"C:\Python*\python.exe"):
667 proc = subprocess.Popen(args=[filename, "-c", code],
668 stdout=subprocess.PIPE,
669 stderr=subprocess.STDOUT)
670 output, _ = proc.communicate()
671 proc.wait()
672 if output == str(not IS_64BIT):
673 return filename
674
675 test_args = ["-c", "import sys; sys.stdin.read()"]
676
677 def setUp(self):
678 super().setUp()
679
680 other_python = self.find_other_interpreter()
681 if other_python is None:
682 raise unittest.SkipTest(
683 "could not find interpreter with opposite bitness")
684 if IS_64BIT:
685 self.python64 = sys.executable
686 self.python32 = other_python
687 else:
688 self.python64 = other_python
689 self.python32 = sys.executable
690
691 env = os.environ.copy()
692 env["THINK_OF_A_NUMBER"] = str(os.getpid())
693 self.proc32 = self.spawn_testproc(
694 [self.python32] + self.test_args,
695 env=env,
696 stdin=subprocess.PIPE)
697 self.proc64 = self.spawn_testproc(
698 [self.python64] + self.test_args,
699 env=env,
700 stdin=subprocess.PIPE)
701
702 def tearDown(self):
703 super().tearDown()
704 self.proc32.communicate()
705 self.proc64.communicate()
706
707 def test_cmdline_32(self):
708 p = psutil.Process(self.proc32.pid)
709 self.assertEqual(len(p.cmdline()), 3)
710 self.assertEqual(p.cmdline()[1:], self.test_args)
711
712 def test_cmdline_64(self):
713 p = psutil.Process(self.proc64.pid)
714 self.assertEqual(len(p.cmdline()), 3)
715 self.assertEqual(p.cmdline()[1:], self.test_args)
716
717 def test_cwd_32(self):
718 p = psutil.Process(self.proc32.pid)
719 self.assertEqual(p.cwd(), os.getcwd())
720
721 def test_cwd_64(self):
722 p = psutil.Process(self.proc64.pid)
723 self.assertEqual(p.cwd(), os.getcwd())
724
725 def test_environ_32(self):
726 p = psutil.Process(self.proc32.pid)
727 e = p.environ()
728 self.assertIn("THINK_OF_A_NUMBER", e)
729 self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid()))
730
731 def test_environ_64(self):
732 p = psutil.Process(self.proc64.pid)
733 try:
734 p.environ()
735 except psutil.AccessDenied:
736 pass
737
738
739 # ===================================================================
740 # Windows services
741 # ===================================================================
742
743
744 @unittest.skipIf(not WINDOWS, "WINDOWS only")
745 class TestServices(PsutilTestCase):
746
747 def test_win_service_iter(self):
748 valid_statuses = set([
749 "running",
750 "paused",
751 "start",
752 "pause",
753 "continue",
754 "stop",
755 "stopped",
756 ])
757 valid_start_types = set([
758 "automatic",
759 "manual",
760 "disabled",
761 ])
762 valid_statuses = set([
763 "running",
764 "paused",
765 "start_pending",
766 "pause_pending",
767 "continue_pending",
768 "stop_pending",
769 "stopped"
770 ])
771 for serv in psutil.win_service_iter():
772 data = serv.as_dict()
773 self.assertIsInstance(data['name'], str)
774 self.assertNotEqual(data['name'].strip(), "")
775 self.assertIsInstance(data['display_name'], str)
776 self.assertIsInstance(data['username'], str)
777 self.assertIn(data['status'], valid_statuses)
778 if data['pid'] is not None:
779 psutil.Process(data['pid'])
780 self.assertIsInstance(data['binpath'], str)
781 self.assertIsInstance(data['username'], str)
782 self.assertIsInstance(data['start_type'], str)
783 self.assertIn(data['start_type'], valid_start_types)
784 self.assertIn(data['status'], valid_statuses)
785 self.assertIsInstance(data['description'], str)
786 pid = serv.pid()
787 if pid is not None:
788 p = psutil.Process(pid)
789 self.assertTrue(p.is_running())
790 # win_service_get
791 s = psutil.win_service_get(serv.name())
792 # test __eq__
793 self.assertEqual(serv, s)
794
795 def test_win_service_get(self):
796 ERROR_SERVICE_DOES_NOT_EXIST = \
797 psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST
798 ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED
799
800 name = next(psutil.win_service_iter()).name()
801 with self.assertRaises(psutil.NoSuchProcess) as cm:
802 psutil.win_service_get(name + '???')
803 self.assertEqual(cm.exception.name, name + '???')
804
805 # test NoSuchProcess
806 service = psutil.win_service_get(name)
807 if PY3:
808 args = (0, "msg", 0, ERROR_SERVICE_DOES_NOT_EXIST)
809 else:
810 args = (ERROR_SERVICE_DOES_NOT_EXIST, "msg")
811 exc = WindowsError(*args)
812 with mock.patch("psutil._psplatform.cext.winservice_query_status",
813 side_effect=exc):
814 self.assertRaises(psutil.NoSuchProcess, service.status)
815 with mock.patch("psutil._psplatform.cext.winservice_query_config",
816 side_effect=exc):
817 self.assertRaises(psutil.NoSuchProcess, service.username)
818
819 # test AccessDenied
820 if PY3:
821 args = (0, "msg", 0, ERROR_ACCESS_DENIED)
822 else:
823 args = (ERROR_ACCESS_DENIED, "msg")
824 exc = WindowsError(*args)
825 with mock.patch("psutil._psplatform.cext.winservice_query_status",
826 side_effect=exc):
827 self.assertRaises(psutil.AccessDenied, service.status)
828 with mock.patch("psutil._psplatform.cext.winservice_query_config",
829 side_effect=exc):
830 self.assertRaises(psutil.AccessDenied, service.username)
831
832 # test __str__ and __repr__
833 self.assertIn(service.name(), str(service))
834 self.assertIn(service.display_name(), str(service))
835 self.assertIn(service.name(), repr(service))
836 self.assertIn(service.display_name(), repr(service))
837
838
839 if __name__ == '__main__':
840 from psutil.tests.runner import run_from_name
841 run_from_name(__file__)