Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_windows.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 #!/usr/bin/env python3 | |
2 # -*- coding: UTF-8 -* | |
3 | |
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
5 # Use of this source code is governed by a BSD-style license that can be | |
6 # found in the LICENSE file. | |
7 | |
8 """Windows specific tests.""" | |
9 | |
10 import datetime | |
11 import errno | |
12 import glob | |
13 import os | |
14 import platform | |
15 import re | |
16 import signal | |
17 import subprocess | |
18 import sys | |
19 import time | |
20 import warnings | |
21 | |
22 import psutil | |
23 from psutil import WINDOWS | |
24 from psutil._compat import FileNotFoundError | |
25 from psutil._compat import super | |
26 from psutil.tests import APPVEYOR | |
27 from psutil.tests import GITHUB_WHEELS | |
28 from psutil.tests import HAS_BATTERY | |
29 from psutil.tests import IS_64BIT | |
30 from psutil.tests import mock | |
31 from psutil.tests import PsutilTestCase | |
32 from psutil.tests import PY3 | |
33 from psutil.tests import PYPY | |
34 from psutil.tests import retry_on_failure | |
35 from psutil.tests import sh | |
36 from psutil.tests import spawn_testproc | |
37 from psutil.tests import terminate | |
38 from psutil.tests import TOLERANCE_DISK_USAGE | |
39 from psutil.tests import unittest | |
40 | |
41 | |
42 if WINDOWS and not PYPY: | |
43 with warnings.catch_warnings(): | |
44 warnings.simplefilter("ignore") | |
45 import win32api # requires "pip install pywin32" | |
46 import win32con | |
47 import win32process | |
48 import wmi # requires "pip install wmi" / "make setup-dev-env" | |
49 | |
50 | |
51 cext = psutil._psplatform.cext | |
52 | |
53 | |
54 def wrap_exceptions(fun): | |
55 def wrapper(self, *args, **kwargs): | |
56 try: | |
57 return fun(self, *args, **kwargs) | |
58 except OSError as err: | |
59 from psutil._pswindows import ACCESS_DENIED_SET | |
60 if err.errno in ACCESS_DENIED_SET: | |
61 raise psutil.AccessDenied(None, None) | |
62 if err.errno == errno.ESRCH: | |
63 raise psutil.NoSuchProcess(None, None) | |
64 raise | |
65 return wrapper | |
66 | |
67 | |
68 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
69 @unittest.skipIf(PYPY, "pywin32 not available on PYPY") | |
70 # https://github.com/giampaolo/psutil/pull/1762#issuecomment-632892692 | |
71 @unittest.skipIf(GITHUB_WHEELS and (not PY3 or not IS_64BIT), | |
72 "pywin32 broken on GITHUB + PY2") | |
73 class WindowsTestCase(PsutilTestCase): | |
74 pass | |
75 | |
76 | |
77 # =================================================================== | |
78 # System APIs | |
79 # =================================================================== | |
80 | |
81 | |
82 class TestCpuAPIs(WindowsTestCase): | |
83 | |
84 @unittest.skipIf('NUMBER_OF_PROCESSORS' not in os.environ, | |
85 'NUMBER_OF_PROCESSORS env var is not available') | |
86 def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self): | |
87 # Will likely fail on many-cores systems: | |
88 # https://stackoverflow.com/questions/31209256 | |
89 num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) | |
90 self.assertEqual(num_cpus, psutil.cpu_count()) | |
91 | |
92 def test_cpu_count_vs_GetSystemInfo(self): | |
93 # Will likely fail on many-cores systems: | |
94 # https://stackoverflow.com/questions/31209256 | |
95 sys_value = win32api.GetSystemInfo()[5] | |
96 psutil_value = psutil.cpu_count() | |
97 self.assertEqual(sys_value, psutil_value) | |
98 | |
99 def test_cpu_count_logical_vs_wmi(self): | |
100 w = wmi.WMI() | |
101 proc = w.Win32_Processor()[0] | |
102 self.assertEqual(psutil.cpu_count(), proc.NumberOfLogicalProcessors) | |
103 | |
104 def test_cpu_count_phys_vs_wmi(self): | |
105 w = wmi.WMI() | |
106 proc = w.Win32_Processor()[0] | |
107 self.assertEqual(psutil.cpu_count(logical=False), proc.NumberOfCores) | |
108 | |
109 def test_cpu_count_vs_cpu_times(self): | |
110 self.assertEqual(psutil.cpu_count(), | |
111 len(psutil.cpu_times(percpu=True))) | |
112 | |
113 def test_cpu_freq(self): | |
114 w = wmi.WMI() | |
115 proc = w.Win32_Processor()[0] | |
116 self.assertEqual(proc.CurrentClockSpeed, psutil.cpu_freq().current) | |
117 self.assertEqual(proc.MaxClockSpeed, psutil.cpu_freq().max) | |
118 | |
119 | |
120 class TestSystemAPIs(WindowsTestCase): | |
121 | |
122 def test_nic_names(self): | |
123 out = sh('ipconfig /all') | |
124 nics = psutil.net_io_counters(pernic=True).keys() | |
125 for nic in nics: | |
126 if "pseudo-interface" in nic.replace(' ', '-').lower(): | |
127 continue | |
128 if nic not in out: | |
129 self.fail( | |
130 "%r nic wasn't found in 'ipconfig /all' output" % nic) | |
131 | |
132 def test_total_phymem(self): | |
133 w = wmi.WMI().Win32_ComputerSystem()[0] | |
134 self.assertEqual(int(w.TotalPhysicalMemory), | |
135 psutil.virtual_memory().total) | |
136 | |
137 # @unittest.skipIf(wmi is None, "wmi module is not installed") | |
138 # def test__UPTIME(self): | |
139 # # _UPTIME constant is not public but it is used internally | |
140 # # as value to return for pid 0 creation time. | |
141 # # WMI behaves the same. | |
142 # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
143 # p = psutil.Process(0) | |
144 # wmic_create = str(w.CreationDate.split('.')[0]) | |
145 # psutil_create = time.strftime("%Y%m%d%H%M%S", | |
146 # time.localtime(p.create_time())) | |
147 | |
148 # Note: this test is not very reliable | |
149 @unittest.skipIf(APPVEYOR, "test not relieable on appveyor") | |
150 @retry_on_failure() | |
151 def test_pids(self): | |
152 # Note: this test might fail if the OS is starting/killing | |
153 # other processes in the meantime | |
154 w = wmi.WMI().Win32_Process() | |
155 wmi_pids = set([x.ProcessId for x in w]) | |
156 psutil_pids = set(psutil.pids()) | |
157 self.assertEqual(wmi_pids, psutil_pids) | |
158 | |
159 @retry_on_failure() | |
160 def test_disks(self): | |
161 ps_parts = psutil.disk_partitions(all=True) | |
162 wmi_parts = wmi.WMI().Win32_LogicalDisk() | |
163 for ps_part in ps_parts: | |
164 for wmi_part in wmi_parts: | |
165 if ps_part.device.replace('\\', '') == wmi_part.DeviceID: | |
166 if not ps_part.mountpoint: | |
167 # this is usually a CD-ROM with no disk inserted | |
168 break | |
169 if 'cdrom' in ps_part.opts: | |
170 break | |
171 if ps_part.mountpoint.startswith('A:'): | |
172 break # floppy | |
173 try: | |
174 usage = psutil.disk_usage(ps_part.mountpoint) | |
175 except FileNotFoundError: | |
176 # usually this is the floppy | |
177 break | |
178 self.assertEqual(usage.total, int(wmi_part.Size)) | |
179 wmi_free = int(wmi_part.FreeSpace) | |
180 self.assertEqual(usage.free, wmi_free) | |
181 # 10 MB tollerance | |
182 if abs(usage.free - wmi_free) > 10 * 1024 * 1024: | |
183 self.fail("psutil=%s, wmi=%s" % ( | |
184 usage.free, wmi_free)) | |
185 break | |
186 else: | |
187 self.fail("can't find partition %s" % repr(ps_part)) | |
188 | |
189 @retry_on_failure() | |
190 def test_disk_usage(self): | |
191 for disk in psutil.disk_partitions(): | |
192 if 'cdrom' in disk.opts: | |
193 continue | |
194 sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint) | |
195 psutil_value = psutil.disk_usage(disk.mountpoint) | |
196 self.assertAlmostEqual(sys_value[0], psutil_value.free, | |
197 delta=TOLERANCE_DISK_USAGE) | |
198 self.assertAlmostEqual(sys_value[1], psutil_value.total, | |
199 delta=TOLERANCE_DISK_USAGE) | |
200 self.assertEqual(psutil_value.used, | |
201 psutil_value.total - psutil_value.free) | |
202 | |
203 def test_disk_partitions(self): | |
204 sys_value = [ | |
205 x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00") | |
206 if x and not x.startswith('A:')] | |
207 psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True) | |
208 if not x.mountpoint.startswith('A:')] | |
209 self.assertEqual(sys_value, psutil_value) | |
210 | |
211 def test_net_if_stats(self): | |
212 ps_names = set(cext.net_if_stats()) | |
213 wmi_adapters = wmi.WMI().Win32_NetworkAdapter() | |
214 wmi_names = set() | |
215 for wmi_adapter in wmi_adapters: | |
216 wmi_names.add(wmi_adapter.Name) | |
217 wmi_names.add(wmi_adapter.NetConnectionID) | |
218 self.assertTrue(ps_names & wmi_names, | |
219 "no common entries in %s, %s" % (ps_names, wmi_names)) | |
220 | |
221 def test_boot_time(self): | |
222 wmi_os = wmi.WMI().Win32_OperatingSystem() | |
223 wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0] | |
224 wmi_btime_dt = datetime.datetime.strptime( | |
225 wmi_btime_str, "%Y%m%d%H%M%S") | |
226 psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time()) | |
227 diff = abs((wmi_btime_dt - psutil_dt).total_seconds()) | |
228 self.assertLessEqual(diff, 3) | |
229 | |
230 def test_boot_time_fluctuation(self): | |
231 # https://github.com/giampaolo/psutil/issues/1007 | |
232 with mock.patch('psutil._pswindows.cext.boot_time', return_value=5): | |
233 self.assertEqual(psutil.boot_time(), 5) | |
234 with mock.patch('psutil._pswindows.cext.boot_time', return_value=4): | |
235 self.assertEqual(psutil.boot_time(), 5) | |
236 with mock.patch('psutil._pswindows.cext.boot_time', return_value=6): | |
237 self.assertEqual(psutil.boot_time(), 5) | |
238 with mock.patch('psutil._pswindows.cext.boot_time', return_value=333): | |
239 self.assertEqual(psutil.boot_time(), 333) | |
240 | |
241 | |
242 # =================================================================== | |
243 # sensors_battery() | |
244 # =================================================================== | |
245 | |
246 | |
247 class TestSensorsBattery(WindowsTestCase): | |
248 | |
249 def test_has_battery(self): | |
250 if win32api.GetPwrCapabilities()['SystemBatteriesPresent']: | |
251 self.assertIsNotNone(psutil.sensors_battery()) | |
252 else: | |
253 self.assertIsNone(psutil.sensors_battery()) | |
254 | |
255 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
256 def test_percent(self): | |
257 w = wmi.WMI() | |
258 battery_wmi = w.query('select * from Win32_Battery')[0] | |
259 battery_psutil = psutil.sensors_battery() | |
260 self.assertAlmostEqual( | |
261 battery_psutil.percent, battery_wmi.EstimatedChargeRemaining, | |
262 delta=1) | |
263 | |
264 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
265 def test_power_plugged(self): | |
266 w = wmi.WMI() | |
267 battery_wmi = w.query('select * from Win32_Battery')[0] | |
268 battery_psutil = psutil.sensors_battery() | |
269 # Status codes: | |
270 # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx | |
271 self.assertEqual(battery_psutil.power_plugged, | |
272 battery_wmi.BatteryStatus == 2) | |
273 | |
274 def test_emulate_no_battery(self): | |
275 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
276 return_value=(0, 128, 0, 0)) as m: | |
277 self.assertIsNone(psutil.sensors_battery()) | |
278 assert m.called | |
279 | |
280 def test_emulate_power_connected(self): | |
281 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
282 return_value=(1, 0, 0, 0)) as m: | |
283 self.assertEqual(psutil.sensors_battery().secsleft, | |
284 psutil.POWER_TIME_UNLIMITED) | |
285 assert m.called | |
286 | |
287 def test_emulate_power_charging(self): | |
288 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
289 return_value=(0, 8, 0, 0)) as m: | |
290 self.assertEqual(psutil.sensors_battery().secsleft, | |
291 psutil.POWER_TIME_UNLIMITED) | |
292 assert m.called | |
293 | |
294 def test_emulate_secs_left_unknown(self): | |
295 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
296 return_value=(0, 0, 0, -1)) as m: | |
297 self.assertEqual(psutil.sensors_battery().secsleft, | |
298 psutil.POWER_TIME_UNKNOWN) | |
299 assert m.called | |
300 | |
301 | |
302 # =================================================================== | |
303 # Process APIs | |
304 # =================================================================== | |
305 | |
306 | |
307 class TestProcess(WindowsTestCase): | |
308 | |
309 @classmethod | |
310 def setUpClass(cls): | |
311 cls.pid = spawn_testproc().pid | |
312 | |
313 @classmethod | |
314 def tearDownClass(cls): | |
315 terminate(cls.pid) | |
316 | |
317 def test_issue_24(self): | |
318 p = psutil.Process(0) | |
319 self.assertRaises(psutil.AccessDenied, p.kill) | |
320 | |
321 def test_special_pid(self): | |
322 p = psutil.Process(4) | |
323 self.assertEqual(p.name(), 'System') | |
324 # use __str__ to access all common Process properties to check | |
325 # that nothing strange happens | |
326 str(p) | |
327 p.username() | |
328 self.assertTrue(p.create_time() >= 0.0) | |
329 try: | |
330 rss, vms = p.memory_info()[:2] | |
331 except psutil.AccessDenied: | |
332 # expected on Windows Vista and Windows 7 | |
333 if not platform.uname()[1] in ('vista', 'win-7', 'win7'): | |
334 raise | |
335 else: | |
336 self.assertTrue(rss > 0) | |
337 | |
338 def test_send_signal(self): | |
339 p = psutil.Process(self.pid) | |
340 self.assertRaises(ValueError, p.send_signal, signal.SIGINT) | |
341 | |
342 def test_num_handles_increment(self): | |
343 p = psutil.Process(os.getpid()) | |
344 before = p.num_handles() | |
345 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
346 win32con.FALSE, os.getpid()) | |
347 after = p.num_handles() | |
348 self.assertEqual(after, before + 1) | |
349 win32api.CloseHandle(handle) | |
350 self.assertEqual(p.num_handles(), before) | |
351 | |
352 @unittest.skipIf(not sys.version_info >= (2, 7), | |
353 "CTRL_* signals not supported") | |
354 def test_ctrl_signals(self): | |
355 p = psutil.Process(self.spawn_testproc().pid) | |
356 p.send_signal(signal.CTRL_C_EVENT) | |
357 p.send_signal(signal.CTRL_BREAK_EVENT) | |
358 p.kill() | |
359 p.wait() | |
360 self.assertRaises(psutil.NoSuchProcess, | |
361 p.send_signal, signal.CTRL_C_EVENT) | |
362 self.assertRaises(psutil.NoSuchProcess, | |
363 p.send_signal, signal.CTRL_BREAK_EVENT) | |
364 | |
365 def test_username(self): | |
366 self.assertEqual(psutil.Process().username(), | |
367 win32api.GetUserNameEx(win32con.NameSamCompatible)) | |
368 | |
369 def test_cmdline(self): | |
370 sys_value = re.sub(' +', ' ', win32api.GetCommandLine()).strip() | |
371 psutil_value = ' '.join(psutil.Process().cmdline()) | |
372 self.assertEqual(sys_value, psutil_value) | |
373 | |
374 # XXX - occasional failures | |
375 | |
376 # def test_cpu_times(self): | |
377 # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
378 # win32con.FALSE, os.getpid()) | |
379 # self.addCleanup(win32api.CloseHandle, handle) | |
380 # sys_value = win32process.GetProcessTimes(handle) | |
381 # psutil_value = psutil.Process().cpu_times() | |
382 # self.assertAlmostEqual( | |
383 # psutil_value.user, sys_value['UserTime'] / 10000000.0, | |
384 # delta=0.2) | |
385 # self.assertAlmostEqual( | |
386 # psutil_value.user, sys_value['KernelTime'] / 10000000.0, | |
387 # delta=0.2) | |
388 | |
389 def test_nice(self): | |
390 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
391 win32con.FALSE, os.getpid()) | |
392 self.addCleanup(win32api.CloseHandle, handle) | |
393 sys_value = win32process.GetPriorityClass(handle) | |
394 psutil_value = psutil.Process().nice() | |
395 self.assertEqual(psutil_value, sys_value) | |
396 | |
397 def test_memory_info(self): | |
398 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
399 win32con.FALSE, self.pid) | |
400 self.addCleanup(win32api.CloseHandle, handle) | |
401 sys_value = win32process.GetProcessMemoryInfo(handle) | |
402 psutil_value = psutil.Process(self.pid).memory_info() | |
403 self.assertEqual( | |
404 sys_value['PeakWorkingSetSize'], psutil_value.peak_wset) | |
405 self.assertEqual( | |
406 sys_value['WorkingSetSize'], psutil_value.wset) | |
407 self.assertEqual( | |
408 sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool) | |
409 self.assertEqual( | |
410 sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool) | |
411 self.assertEqual( | |
412 sys_value['QuotaPeakNonPagedPoolUsage'], | |
413 psutil_value.peak_nonpaged_pool) | |
414 self.assertEqual( | |
415 sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool) | |
416 self.assertEqual( | |
417 sys_value['PagefileUsage'], psutil_value.pagefile) | |
418 self.assertEqual( | |
419 sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile) | |
420 | |
421 self.assertEqual(psutil_value.rss, psutil_value.wset) | |
422 self.assertEqual(psutil_value.vms, psutil_value.pagefile) | |
423 | |
424 def test_wait(self): | |
425 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
426 win32con.FALSE, self.pid) | |
427 self.addCleanup(win32api.CloseHandle, handle) | |
428 p = psutil.Process(self.pid) | |
429 p.terminate() | |
430 psutil_value = p.wait() | |
431 sys_value = win32process.GetExitCodeProcess(handle) | |
432 self.assertEqual(psutil_value, sys_value) | |
433 | |
434 def test_cpu_affinity(self): | |
435 def from_bitmask(x): | |
436 return [i for i in range(64) if (1 << i) & x] | |
437 | |
438 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
439 win32con.FALSE, self.pid) | |
440 self.addCleanup(win32api.CloseHandle, handle) | |
441 sys_value = from_bitmask( | |
442 win32process.GetProcessAffinityMask(handle)[0]) | |
443 psutil_value = psutil.Process(self.pid).cpu_affinity() | |
444 self.assertEqual(psutil_value, sys_value) | |
445 | |
446 def test_io_counters(self): | |
447 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
448 win32con.FALSE, os.getpid()) | |
449 self.addCleanup(win32api.CloseHandle, handle) | |
450 sys_value = win32process.GetProcessIoCounters(handle) | |
451 psutil_value = psutil.Process().io_counters() | |
452 self.assertEqual( | |
453 psutil_value.read_count, sys_value['ReadOperationCount']) | |
454 self.assertEqual( | |
455 psutil_value.write_count, sys_value['WriteOperationCount']) | |
456 self.assertEqual( | |
457 psutil_value.read_bytes, sys_value['ReadTransferCount']) | |
458 self.assertEqual( | |
459 psutil_value.write_bytes, sys_value['WriteTransferCount']) | |
460 self.assertEqual( | |
461 psutil_value.other_count, sys_value['OtherOperationCount']) | |
462 self.assertEqual( | |
463 psutil_value.other_bytes, sys_value['OtherTransferCount']) | |
464 | |
465 def test_num_handles(self): | |
466 import ctypes | |
467 import ctypes.wintypes | |
468 PROCESS_QUERY_INFORMATION = 0x400 | |
469 handle = ctypes.windll.kernel32.OpenProcess( | |
470 PROCESS_QUERY_INFORMATION, 0, self.pid) | |
471 self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle) | |
472 | |
473 hndcnt = ctypes.wintypes.DWORD() | |
474 ctypes.windll.kernel32.GetProcessHandleCount( | |
475 handle, ctypes.byref(hndcnt)) | |
476 sys_value = hndcnt.value | |
477 psutil_value = psutil.Process(self.pid).num_handles() | |
478 self.assertEqual(psutil_value, sys_value) | |
479 | |
480 def test_error_partial_copy(self): | |
481 # https://github.com/giampaolo/psutil/issues/875 | |
482 exc = WindowsError() | |
483 exc.winerror = 299 | |
484 with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc): | |
485 with mock.patch("time.sleep") as m: | |
486 p = psutil.Process() | |
487 self.assertRaises(psutil.AccessDenied, p.cwd) | |
488 self.assertGreaterEqual(m.call_count, 5) | |
489 | |
490 def test_exe(self): | |
491 # NtQuerySystemInformation succeeds if process is gone. Make sure | |
492 # it raises NSP for a non existent pid. | |
493 pid = psutil.pids()[-1] + 99999 | |
494 proc = psutil._psplatform.Process(pid) | |
495 self.assertRaises(psutil.NoSuchProcess, proc.exe) | |
496 | |
497 | |
498 class TestProcessWMI(WindowsTestCase): | |
499 """Compare Process API results with WMI.""" | |
500 | |
501 @classmethod | |
502 def setUpClass(cls): | |
503 cls.pid = spawn_testproc().pid | |
504 | |
505 @classmethod | |
506 def tearDownClass(cls): | |
507 terminate(cls.pid) | |
508 | |
509 def test_name(self): | |
510 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
511 p = psutil.Process(self.pid) | |
512 self.assertEqual(p.name(), w.Caption) | |
513 | |
514 # This fail on github because using virtualenv for test environment | |
515 @unittest.skipIf(GITHUB_WHEELS, "unreliable path on GITHUB_WHEELS") | |
516 def test_exe(self): | |
517 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
518 p = psutil.Process(self.pid) | |
519 # Note: wmi reports the exe as a lower case string. | |
520 # Being Windows paths case-insensitive we ignore that. | |
521 self.assertEqual(p.exe().lower(), w.ExecutablePath.lower()) | |
522 | |
523 def test_cmdline(self): | |
524 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
525 p = psutil.Process(self.pid) | |
526 self.assertEqual(' '.join(p.cmdline()), | |
527 w.CommandLine.replace('"', '')) | |
528 | |
529 def test_username(self): | |
530 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
531 p = psutil.Process(self.pid) | |
532 domain, _, username = w.GetOwner() | |
533 username = "%s\\%s" % (domain, username) | |
534 self.assertEqual(p.username(), username) | |
535 | |
536 @retry_on_failure() | |
537 def test_memory_rss(self): | |
538 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
539 p = psutil.Process(self.pid) | |
540 rss = p.memory_info().rss | |
541 self.assertEqual(rss, int(w.WorkingSetSize)) | |
542 | |
543 @retry_on_failure() | |
544 def test_memory_vms(self): | |
545 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
546 p = psutil.Process(self.pid) | |
547 vms = p.memory_info().vms | |
548 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx | |
549 # ...claims that PageFileUsage is represented in Kilo | |
550 # bytes but funnily enough on certain platforms bytes are | |
551 # returned instead. | |
552 wmi_usage = int(w.PageFileUsage) | |
553 if (vms != wmi_usage) and (vms != wmi_usage * 1024): | |
554 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) | |
555 | |
556 def test_create_time(self): | |
557 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
558 p = psutil.Process(self.pid) | |
559 wmic_create = str(w.CreationDate.split('.')[0]) | |
560 psutil_create = time.strftime("%Y%m%d%H%M%S", | |
561 time.localtime(p.create_time())) | |
562 self.assertEqual(wmic_create, psutil_create) | |
563 | |
564 | |
565 # --- | |
566 | |
567 | |
568 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
569 class TestDualProcessImplementation(PsutilTestCase): | |
570 """ | |
571 Certain APIs on Windows have 2 internal implementations, one | |
572 based on documented Windows APIs, another one based | |
573 NtQuerySystemInformation() which gets called as fallback in | |
574 case the first fails because of limited permission error. | |
575 Here we test that the two methods return the exact same value, | |
576 see: | |
577 https://github.com/giampaolo/psutil/issues/304 | |
578 """ | |
579 | |
580 @classmethod | |
581 def setUpClass(cls): | |
582 cls.pid = spawn_testproc().pid | |
583 | |
584 @classmethod | |
585 def tearDownClass(cls): | |
586 terminate(cls.pid) | |
587 | |
588 def test_memory_info(self): | |
589 mem_1 = psutil.Process(self.pid).memory_info() | |
590 with mock.patch("psutil._psplatform.cext.proc_memory_info", | |
591 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
592 mem_2 = psutil.Process(self.pid).memory_info() | |
593 self.assertEqual(len(mem_1), len(mem_2)) | |
594 for i in range(len(mem_1)): | |
595 self.assertGreaterEqual(mem_1[i], 0) | |
596 self.assertGreaterEqual(mem_2[i], 0) | |
597 self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512) | |
598 assert fun.called | |
599 | |
600 def test_create_time(self): | |
601 ctime = psutil.Process(self.pid).create_time() | |
602 with mock.patch("psutil._psplatform.cext.proc_times", | |
603 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
604 self.assertEqual(psutil.Process(self.pid).create_time(), ctime) | |
605 assert fun.called | |
606 | |
607 def test_cpu_times(self): | |
608 cpu_times_1 = psutil.Process(self.pid).cpu_times() | |
609 with mock.patch("psutil._psplatform.cext.proc_times", | |
610 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
611 cpu_times_2 = psutil.Process(self.pid).cpu_times() | |
612 assert fun.called | |
613 self.assertAlmostEqual( | |
614 cpu_times_1.user, cpu_times_2.user, delta=0.01) | |
615 self.assertAlmostEqual( | |
616 cpu_times_1.system, cpu_times_2.system, delta=0.01) | |
617 | |
618 def test_io_counters(self): | |
619 io_counters_1 = psutil.Process(self.pid).io_counters() | |
620 with mock.patch("psutil._psplatform.cext.proc_io_counters", | |
621 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
622 io_counters_2 = psutil.Process(self.pid).io_counters() | |
623 for i in range(len(io_counters_1)): | |
624 self.assertAlmostEqual( | |
625 io_counters_1[i], io_counters_2[i], delta=5) | |
626 assert fun.called | |
627 | |
628 def test_num_handles(self): | |
629 num_handles = psutil.Process(self.pid).num_handles() | |
630 with mock.patch("psutil._psplatform.cext.proc_num_handles", | |
631 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
632 self.assertEqual(psutil.Process(self.pid).num_handles(), | |
633 num_handles) | |
634 assert fun.called | |
635 | |
636 def test_cmdline(self): | |
637 from psutil._pswindows import convert_oserror | |
638 for pid in psutil.pids(): | |
639 try: | |
640 a = cext.proc_cmdline(pid, use_peb=True) | |
641 b = cext.proc_cmdline(pid, use_peb=False) | |
642 except OSError as err: | |
643 err = convert_oserror(err) | |
644 if not isinstance(err, (psutil.AccessDenied, | |
645 psutil.NoSuchProcess)): | |
646 raise | |
647 else: | |
648 self.assertEqual(a, b) | |
649 | |
650 | |
651 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
652 class RemoteProcessTestCase(PsutilTestCase): | |
653 """Certain functions require calling ReadProcessMemory. | |
654 This trivially works when called on the current process. | |
655 Check that this works on other processes, especially when they | |
656 have a different bitness. | |
657 """ | |
658 | |
659 @staticmethod | |
660 def find_other_interpreter(): | |
661 # find a python interpreter that is of the opposite bitness from us | |
662 code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))" | |
663 | |
664 # XXX: a different and probably more stable approach might be to access | |
665 # the registry but accessing 64 bit paths from a 32 bit process | |
666 for filename in glob.glob(r"C:\Python*\python.exe"): | |
667 proc = subprocess.Popen(args=[filename, "-c", code], | |
668 stdout=subprocess.PIPE, | |
669 stderr=subprocess.STDOUT) | |
670 output, _ = proc.communicate() | |
671 proc.wait() | |
672 if output == str(not IS_64BIT): | |
673 return filename | |
674 | |
675 test_args = ["-c", "import sys; sys.stdin.read()"] | |
676 | |
677 def setUp(self): | |
678 super().setUp() | |
679 | |
680 other_python = self.find_other_interpreter() | |
681 if other_python is None: | |
682 raise unittest.SkipTest( | |
683 "could not find interpreter with opposite bitness") | |
684 if IS_64BIT: | |
685 self.python64 = sys.executable | |
686 self.python32 = other_python | |
687 else: | |
688 self.python64 = other_python | |
689 self.python32 = sys.executable | |
690 | |
691 env = os.environ.copy() | |
692 env["THINK_OF_A_NUMBER"] = str(os.getpid()) | |
693 self.proc32 = self.spawn_testproc( | |
694 [self.python32] + self.test_args, | |
695 env=env, | |
696 stdin=subprocess.PIPE) | |
697 self.proc64 = self.spawn_testproc( | |
698 [self.python64] + self.test_args, | |
699 env=env, | |
700 stdin=subprocess.PIPE) | |
701 | |
702 def tearDown(self): | |
703 super().tearDown() | |
704 self.proc32.communicate() | |
705 self.proc64.communicate() | |
706 | |
707 def test_cmdline_32(self): | |
708 p = psutil.Process(self.proc32.pid) | |
709 self.assertEqual(len(p.cmdline()), 3) | |
710 self.assertEqual(p.cmdline()[1:], self.test_args) | |
711 | |
712 def test_cmdline_64(self): | |
713 p = psutil.Process(self.proc64.pid) | |
714 self.assertEqual(len(p.cmdline()), 3) | |
715 self.assertEqual(p.cmdline()[1:], self.test_args) | |
716 | |
717 def test_cwd_32(self): | |
718 p = psutil.Process(self.proc32.pid) | |
719 self.assertEqual(p.cwd(), os.getcwd()) | |
720 | |
721 def test_cwd_64(self): | |
722 p = psutil.Process(self.proc64.pid) | |
723 self.assertEqual(p.cwd(), os.getcwd()) | |
724 | |
725 def test_environ_32(self): | |
726 p = psutil.Process(self.proc32.pid) | |
727 e = p.environ() | |
728 self.assertIn("THINK_OF_A_NUMBER", e) | |
729 self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid())) | |
730 | |
731 def test_environ_64(self): | |
732 p = psutil.Process(self.proc64.pid) | |
733 try: | |
734 p.environ() | |
735 except psutil.AccessDenied: | |
736 pass | |
737 | |
738 | |
739 # =================================================================== | |
740 # Windows services | |
741 # =================================================================== | |
742 | |
743 | |
744 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
745 class TestServices(PsutilTestCase): | |
746 | |
747 def test_win_service_iter(self): | |
748 valid_statuses = set([ | |
749 "running", | |
750 "paused", | |
751 "start", | |
752 "pause", | |
753 "continue", | |
754 "stop", | |
755 "stopped", | |
756 ]) | |
757 valid_start_types = set([ | |
758 "automatic", | |
759 "manual", | |
760 "disabled", | |
761 ]) | |
762 valid_statuses = set([ | |
763 "running", | |
764 "paused", | |
765 "start_pending", | |
766 "pause_pending", | |
767 "continue_pending", | |
768 "stop_pending", | |
769 "stopped" | |
770 ]) | |
771 for serv in psutil.win_service_iter(): | |
772 data = serv.as_dict() | |
773 self.assertIsInstance(data['name'], str) | |
774 self.assertNotEqual(data['name'].strip(), "") | |
775 self.assertIsInstance(data['display_name'], str) | |
776 self.assertIsInstance(data['username'], str) | |
777 self.assertIn(data['status'], valid_statuses) | |
778 if data['pid'] is not None: | |
779 psutil.Process(data['pid']) | |
780 self.assertIsInstance(data['binpath'], str) | |
781 self.assertIsInstance(data['username'], str) | |
782 self.assertIsInstance(data['start_type'], str) | |
783 self.assertIn(data['start_type'], valid_start_types) | |
784 self.assertIn(data['status'], valid_statuses) | |
785 self.assertIsInstance(data['description'], str) | |
786 pid = serv.pid() | |
787 if pid is not None: | |
788 p = psutil.Process(pid) | |
789 self.assertTrue(p.is_running()) | |
790 # win_service_get | |
791 s = psutil.win_service_get(serv.name()) | |
792 # test __eq__ | |
793 self.assertEqual(serv, s) | |
794 | |
795 def test_win_service_get(self): | |
796 ERROR_SERVICE_DOES_NOT_EXIST = \ | |
797 psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST | |
798 ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED | |
799 | |
800 name = next(psutil.win_service_iter()).name() | |
801 with self.assertRaises(psutil.NoSuchProcess) as cm: | |
802 psutil.win_service_get(name + '???') | |
803 self.assertEqual(cm.exception.name, name + '???') | |
804 | |
805 # test NoSuchProcess | |
806 service = psutil.win_service_get(name) | |
807 if PY3: | |
808 args = (0, "msg", 0, ERROR_SERVICE_DOES_NOT_EXIST) | |
809 else: | |
810 args = (ERROR_SERVICE_DOES_NOT_EXIST, "msg") | |
811 exc = WindowsError(*args) | |
812 with mock.patch("psutil._psplatform.cext.winservice_query_status", | |
813 side_effect=exc): | |
814 self.assertRaises(psutil.NoSuchProcess, service.status) | |
815 with mock.patch("psutil._psplatform.cext.winservice_query_config", | |
816 side_effect=exc): | |
817 self.assertRaises(psutil.NoSuchProcess, service.username) | |
818 | |
819 # test AccessDenied | |
820 if PY3: | |
821 args = (0, "msg", 0, ERROR_ACCESS_DENIED) | |
822 else: | |
823 args = (ERROR_ACCESS_DENIED, "msg") | |
824 exc = WindowsError(*args) | |
825 with mock.patch("psutil._psplatform.cext.winservice_query_status", | |
826 side_effect=exc): | |
827 self.assertRaises(psutil.AccessDenied, service.status) | |
828 with mock.patch("psutil._psplatform.cext.winservice_query_config", | |
829 side_effect=exc): | |
830 self.assertRaises(psutil.AccessDenied, service.username) | |
831 | |
832 # test __str__ and __repr__ | |
833 self.assertIn(service.name(), str(service)) | |
834 self.assertIn(service.display_name(), str(service)) | |
835 self.assertIn(service.name(), repr(service)) | |
836 self.assertIn(service.display_name(), repr(service)) | |
837 | |
838 | |
839 if __name__ == '__main__': | |
840 from psutil.tests.runner import run_from_name | |
841 run_from_name(__file__) |