Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/psutil/tests/test_windows.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 #!/usr/bin/env python3 | |
2 # -*- coding: UTF-8 -* | |
3 | |
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
5 # Use of this source code is governed by a BSD-style license that can be | |
6 # found in the LICENSE file. | |
7 | |
8 """Windows specific tests.""" | |
9 | |
10 import datetime | |
11 import errno | |
12 import glob | |
13 import os | |
14 import platform | |
15 import re | |
16 import signal | |
17 import subprocess | |
18 import sys | |
19 import time | |
20 import warnings | |
21 | |
22 import psutil | |
23 from psutil import WINDOWS | |
24 from psutil._compat import FileNotFoundError | |
25 from psutil.tests import APPVEYOR | |
26 from psutil.tests import get_test_subprocess | |
27 from psutil.tests import HAS_BATTERY | |
28 from psutil.tests import mock | |
29 from psutil.tests import PY3 | |
30 from psutil.tests import PYPY | |
31 from psutil.tests import reap_children | |
32 from psutil.tests import retry_on_failure | |
33 from psutil.tests import sh | |
34 from psutil.tests import unittest | |
35 | |
36 | |
37 if WINDOWS and not PYPY: | |
38 with warnings.catch_warnings(): | |
39 warnings.simplefilter("ignore") | |
40 import win32api # requires "pip install pypiwin32" | |
41 import win32con | |
42 import win32process | |
43 import wmi # requires "pip install wmi" / "make setup-dev-env" | |
44 | |
45 | |
46 cext = psutil._psplatform.cext | |
47 | |
48 # are we a 64 bit process | |
49 IS_64_BIT = sys.maxsize > 2**32 | |
50 | |
51 | |
52 def wrap_exceptions(fun): | |
53 def wrapper(self, *args, **kwargs): | |
54 try: | |
55 return fun(self, *args, **kwargs) | |
56 except OSError as err: | |
57 from psutil._pswindows import ACCESS_DENIED_SET | |
58 if err.errno in ACCESS_DENIED_SET: | |
59 raise psutil.AccessDenied(None, None) | |
60 if err.errno == errno.ESRCH: | |
61 raise psutil.NoSuchProcess(None, None) | |
62 raise | |
63 return wrapper | |
64 | |
65 | |
66 @unittest.skipIf(PYPY, "pywin32 not available on PYPY") # skip whole module | |
67 class TestCase(unittest.TestCase): | |
68 pass | |
69 | |
70 | |
71 # =================================================================== | |
72 # System APIs | |
73 # =================================================================== | |
74 | |
75 | |
76 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
77 class TestCpuAPIs(TestCase): | |
78 | |
79 @unittest.skipIf('NUMBER_OF_PROCESSORS' not in os.environ, | |
80 'NUMBER_OF_PROCESSORS env var is not available') | |
81 def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self): | |
82 # Will likely fail on many-cores systems: | |
83 # https://stackoverflow.com/questions/31209256 | |
84 num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) | |
85 self.assertEqual(num_cpus, psutil.cpu_count()) | |
86 | |
87 def test_cpu_count_vs_GetSystemInfo(self): | |
88 # Will likely fail on many-cores systems: | |
89 # https://stackoverflow.com/questions/31209256 | |
90 sys_value = win32api.GetSystemInfo()[5] | |
91 psutil_value = psutil.cpu_count() | |
92 self.assertEqual(sys_value, psutil_value) | |
93 | |
94 def test_cpu_count_logical_vs_wmi(self): | |
95 w = wmi.WMI() | |
96 proc = w.Win32_Processor()[0] | |
97 self.assertEqual(psutil.cpu_count(), proc.NumberOfLogicalProcessors) | |
98 | |
99 def test_cpu_count_phys_vs_wmi(self): | |
100 w = wmi.WMI() | |
101 proc = w.Win32_Processor()[0] | |
102 self.assertEqual(psutil.cpu_count(logical=False), proc.NumberOfCores) | |
103 | |
104 def test_cpu_count_vs_cpu_times(self): | |
105 self.assertEqual(psutil.cpu_count(), | |
106 len(psutil.cpu_times(percpu=True))) | |
107 | |
108 def test_cpu_freq(self): | |
109 w = wmi.WMI() | |
110 proc = w.Win32_Processor()[0] | |
111 self.assertEqual(proc.CurrentClockSpeed, psutil.cpu_freq().current) | |
112 self.assertEqual(proc.MaxClockSpeed, psutil.cpu_freq().max) | |
113 | |
114 | |
115 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
116 class TestSystemAPIs(TestCase): | |
117 | |
118 def test_nic_names(self): | |
119 out = sh('ipconfig /all') | |
120 nics = psutil.net_io_counters(pernic=True).keys() | |
121 for nic in nics: | |
122 if "pseudo-interface" in nic.replace(' ', '-').lower(): | |
123 continue | |
124 if nic not in out: | |
125 self.fail( | |
126 "%r nic wasn't found in 'ipconfig /all' output" % nic) | |
127 | |
128 def test_total_phymem(self): | |
129 w = wmi.WMI().Win32_ComputerSystem()[0] | |
130 self.assertEqual(int(w.TotalPhysicalMemory), | |
131 psutil.virtual_memory().total) | |
132 | |
133 # @unittest.skipIf(wmi is None, "wmi module is not installed") | |
134 # def test__UPTIME(self): | |
135 # # _UPTIME constant is not public but it is used internally | |
136 # # as value to return for pid 0 creation time. | |
137 # # WMI behaves the same. | |
138 # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
139 # p = psutil.Process(0) | |
140 # wmic_create = str(w.CreationDate.split('.')[0]) | |
141 # psutil_create = time.strftime("%Y%m%d%H%M%S", | |
142 # time.localtime(p.create_time())) | |
143 | |
144 # Note: this test is not very reliable | |
145 @unittest.skipIf(APPVEYOR, "test not relieable on appveyor") | |
146 @retry_on_failure() | |
147 def test_pids(self): | |
148 # Note: this test might fail if the OS is starting/killing | |
149 # other processes in the meantime | |
150 w = wmi.WMI().Win32_Process() | |
151 wmi_pids = set([x.ProcessId for x in w]) | |
152 psutil_pids = set(psutil.pids()) | |
153 self.assertEqual(wmi_pids, psutil_pids) | |
154 | |
155 @retry_on_failure() | |
156 def test_disks(self): | |
157 ps_parts = psutil.disk_partitions(all=True) | |
158 wmi_parts = wmi.WMI().Win32_LogicalDisk() | |
159 for ps_part in ps_parts: | |
160 for wmi_part in wmi_parts: | |
161 if ps_part.device.replace('\\', '') == wmi_part.DeviceID: | |
162 if not ps_part.mountpoint: | |
163 # this is usually a CD-ROM with no disk inserted | |
164 break | |
165 if 'cdrom' in ps_part.opts: | |
166 break | |
167 try: | |
168 usage = psutil.disk_usage(ps_part.mountpoint) | |
169 except FileNotFoundError: | |
170 # usually this is the floppy | |
171 break | |
172 self.assertEqual(usage.total, int(wmi_part.Size)) | |
173 wmi_free = int(wmi_part.FreeSpace) | |
174 self.assertEqual(usage.free, wmi_free) | |
175 # 10 MB tollerance | |
176 if abs(usage.free - wmi_free) > 10 * 1024 * 1024: | |
177 self.fail("psutil=%s, wmi=%s" % ( | |
178 usage.free, wmi_free)) | |
179 break | |
180 else: | |
181 self.fail("can't find partition %s" % repr(ps_part)) | |
182 | |
183 def test_disk_usage(self): | |
184 for disk in psutil.disk_partitions(): | |
185 if 'cdrom' in disk.opts: | |
186 continue | |
187 sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint) | |
188 psutil_value = psutil.disk_usage(disk.mountpoint) | |
189 self.assertAlmostEqual(sys_value[0], psutil_value.free, | |
190 delta=1024 * 1024) | |
191 self.assertAlmostEqual(sys_value[1], psutil_value.total, | |
192 delta=1024 * 1024) | |
193 self.assertEqual(psutil_value.used, | |
194 psutil_value.total - psutil_value.free) | |
195 | |
196 def test_disk_partitions(self): | |
197 sys_value = [ | |
198 x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00") | |
199 if x and not x.startswith('A:')] | |
200 psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)] | |
201 self.assertEqual(sys_value, psutil_value) | |
202 | |
203 def test_net_if_stats(self): | |
204 ps_names = set(cext.net_if_stats()) | |
205 wmi_adapters = wmi.WMI().Win32_NetworkAdapter() | |
206 wmi_names = set() | |
207 for wmi_adapter in wmi_adapters: | |
208 wmi_names.add(wmi_adapter.Name) | |
209 wmi_names.add(wmi_adapter.NetConnectionID) | |
210 self.assertTrue(ps_names & wmi_names, | |
211 "no common entries in %s, %s" % (ps_names, wmi_names)) | |
212 | |
213 def test_boot_time(self): | |
214 wmi_os = wmi.WMI().Win32_OperatingSystem() | |
215 wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0] | |
216 wmi_btime_dt = datetime.datetime.strptime( | |
217 wmi_btime_str, "%Y%m%d%H%M%S") | |
218 psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time()) | |
219 diff = abs((wmi_btime_dt - psutil_dt).total_seconds()) | |
220 self.assertLessEqual(diff, 3) | |
221 | |
222 def test_boot_time_fluctuation(self): | |
223 # https://github.com/giampaolo/psutil/issues/1007 | |
224 with mock.patch('psutil._pswindows.cext.boot_time', return_value=5): | |
225 self.assertEqual(psutil.boot_time(), 5) | |
226 with mock.patch('psutil._pswindows.cext.boot_time', return_value=4): | |
227 self.assertEqual(psutil.boot_time(), 5) | |
228 with mock.patch('psutil._pswindows.cext.boot_time', return_value=6): | |
229 self.assertEqual(psutil.boot_time(), 5) | |
230 with mock.patch('psutil._pswindows.cext.boot_time', return_value=333): | |
231 self.assertEqual(psutil.boot_time(), 333) | |
232 | |
233 | |
234 # =================================================================== | |
235 # sensors_battery() | |
236 # =================================================================== | |
237 | |
238 | |
239 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
240 class TestSensorsBattery(TestCase): | |
241 | |
242 def test_has_battery(self): | |
243 if win32api.GetPwrCapabilities()['SystemBatteriesPresent']: | |
244 self.assertIsNotNone(psutil.sensors_battery()) | |
245 else: | |
246 self.assertIsNone(psutil.sensors_battery()) | |
247 | |
248 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
249 def test_percent(self): | |
250 w = wmi.WMI() | |
251 battery_wmi = w.query('select * from Win32_Battery')[0] | |
252 battery_psutil = psutil.sensors_battery() | |
253 self.assertAlmostEqual( | |
254 battery_psutil.percent, battery_wmi.EstimatedChargeRemaining, | |
255 delta=1) | |
256 | |
257 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
258 def test_power_plugged(self): | |
259 w = wmi.WMI() | |
260 battery_wmi = w.query('select * from Win32_Battery')[0] | |
261 battery_psutil = psutil.sensors_battery() | |
262 # Status codes: | |
263 # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx | |
264 self.assertEqual(battery_psutil.power_plugged, | |
265 battery_wmi.BatteryStatus == 2) | |
266 | |
267 def test_emulate_no_battery(self): | |
268 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
269 return_value=(0, 128, 0, 0)) as m: | |
270 self.assertIsNone(psutil.sensors_battery()) | |
271 assert m.called | |
272 | |
273 def test_emulate_power_connected(self): | |
274 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
275 return_value=(1, 0, 0, 0)) as m: | |
276 self.assertEqual(psutil.sensors_battery().secsleft, | |
277 psutil.POWER_TIME_UNLIMITED) | |
278 assert m.called | |
279 | |
280 def test_emulate_power_charging(self): | |
281 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
282 return_value=(0, 8, 0, 0)) as m: | |
283 self.assertEqual(psutil.sensors_battery().secsleft, | |
284 psutil.POWER_TIME_UNLIMITED) | |
285 assert m.called | |
286 | |
287 def test_emulate_secs_left_unknown(self): | |
288 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
289 return_value=(0, 0, 0, -1)) as m: | |
290 self.assertEqual(psutil.sensors_battery().secsleft, | |
291 psutil.POWER_TIME_UNKNOWN) | |
292 assert m.called | |
293 | |
294 | |
295 # =================================================================== | |
296 # Process APIs | |
297 # =================================================================== | |
298 | |
299 | |
300 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
301 class TestProcess(TestCase): | |
302 | |
303 @classmethod | |
304 def setUpClass(cls): | |
305 cls.pid = get_test_subprocess().pid | |
306 | |
307 @classmethod | |
308 def tearDownClass(cls): | |
309 reap_children() | |
310 | |
311 def test_issue_24(self): | |
312 p = psutil.Process(0) | |
313 self.assertRaises(psutil.AccessDenied, p.kill) | |
314 | |
315 def test_special_pid(self): | |
316 p = psutil.Process(4) | |
317 self.assertEqual(p.name(), 'System') | |
318 # use __str__ to access all common Process properties to check | |
319 # that nothing strange happens | |
320 str(p) | |
321 p.username() | |
322 self.assertTrue(p.create_time() >= 0.0) | |
323 try: | |
324 rss, vms = p.memory_info()[:2] | |
325 except psutil.AccessDenied: | |
326 # expected on Windows Vista and Windows 7 | |
327 if not platform.uname()[1] in ('vista', 'win-7', 'win7'): | |
328 raise | |
329 else: | |
330 self.assertTrue(rss > 0) | |
331 | |
332 def test_send_signal(self): | |
333 p = psutil.Process(self.pid) | |
334 self.assertRaises(ValueError, p.send_signal, signal.SIGINT) | |
335 | |
336 def test_num_handles_increment(self): | |
337 p = psutil.Process(os.getpid()) | |
338 before = p.num_handles() | |
339 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
340 win32con.FALSE, os.getpid()) | |
341 after = p.num_handles() | |
342 self.assertEqual(after, before + 1) | |
343 win32api.CloseHandle(handle) | |
344 self.assertEqual(p.num_handles(), before) | |
345 | |
346 def test_handles_leak(self): | |
347 # Call all Process methods and make sure no handles are left | |
348 # open. This is here mainly to make sure functions using | |
349 # OpenProcess() always call CloseHandle(). | |
350 def call(p, attr): | |
351 attr = getattr(p, name, None) | |
352 if attr is not None and callable(attr): | |
353 attr() | |
354 else: | |
355 attr | |
356 | |
357 p = psutil.Process(self.pid) | |
358 failures = [] | |
359 for name in dir(psutil.Process): | |
360 if name.startswith('_') \ | |
361 or name in ('terminate', 'kill', 'suspend', 'resume', | |
362 'nice', 'send_signal', 'wait', 'children', | |
363 'as_dict', 'memory_info_ex'): | |
364 continue | |
365 else: | |
366 try: | |
367 call(p, name) | |
368 num1 = p.num_handles() | |
369 call(p, name) | |
370 num2 = p.num_handles() | |
371 except (psutil.NoSuchProcess, psutil.AccessDenied): | |
372 pass | |
373 else: | |
374 if num2 > num1: | |
375 fail = \ | |
376 "failure while processing Process.%s method " \ | |
377 "(before=%s, after=%s)" % (name, num1, num2) | |
378 failures.append(fail) | |
379 if failures: | |
380 self.fail('\n' + '\n'.join(failures)) | |
381 | |
382 @unittest.skipIf(not sys.version_info >= (2, 7), | |
383 "CTRL_* signals not supported") | |
384 def test_ctrl_signals(self): | |
385 p = psutil.Process(get_test_subprocess().pid) | |
386 p.send_signal(signal.CTRL_C_EVENT) | |
387 p.send_signal(signal.CTRL_BREAK_EVENT) | |
388 p.kill() | |
389 p.wait() | |
390 self.assertRaises(psutil.NoSuchProcess, | |
391 p.send_signal, signal.CTRL_C_EVENT) | |
392 self.assertRaises(psutil.NoSuchProcess, | |
393 p.send_signal, signal.CTRL_BREAK_EVENT) | |
394 | |
395 def test_username(self): | |
396 self.assertEqual(psutil.Process().username(), | |
397 win32api.GetUserNameEx(win32con.NameSamCompatible)) | |
398 | |
399 def test_cmdline(self): | |
400 sys_value = re.sub(' +', ' ', win32api.GetCommandLine()).strip() | |
401 psutil_value = ' '.join(psutil.Process().cmdline()) | |
402 self.assertEqual(sys_value, psutil_value) | |
403 | |
404 # XXX - occasional failures | |
405 | |
406 # def test_cpu_times(self): | |
407 # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
408 # win32con.FALSE, os.getpid()) | |
409 # self.addCleanup(win32api.CloseHandle, handle) | |
410 # sys_value = win32process.GetProcessTimes(handle) | |
411 # psutil_value = psutil.Process().cpu_times() | |
412 # self.assertAlmostEqual( | |
413 # psutil_value.user, sys_value['UserTime'] / 10000000.0, | |
414 # delta=0.2) | |
415 # self.assertAlmostEqual( | |
416 # psutil_value.user, sys_value['KernelTime'] / 10000000.0, | |
417 # delta=0.2) | |
418 | |
419 def test_nice(self): | |
420 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
421 win32con.FALSE, os.getpid()) | |
422 self.addCleanup(win32api.CloseHandle, handle) | |
423 sys_value = win32process.GetPriorityClass(handle) | |
424 psutil_value = psutil.Process().nice() | |
425 self.assertEqual(psutil_value, sys_value) | |
426 | |
427 def test_memory_info(self): | |
428 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
429 win32con.FALSE, self.pid) | |
430 self.addCleanup(win32api.CloseHandle, handle) | |
431 sys_value = win32process.GetProcessMemoryInfo(handle) | |
432 psutil_value = psutil.Process(self.pid).memory_info() | |
433 self.assertEqual( | |
434 sys_value['PeakWorkingSetSize'], psutil_value.peak_wset) | |
435 self.assertEqual( | |
436 sys_value['WorkingSetSize'], psutil_value.wset) | |
437 self.assertEqual( | |
438 sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool) | |
439 self.assertEqual( | |
440 sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool) | |
441 self.assertEqual( | |
442 sys_value['QuotaPeakNonPagedPoolUsage'], | |
443 psutil_value.peak_nonpaged_pool) | |
444 self.assertEqual( | |
445 sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool) | |
446 self.assertEqual( | |
447 sys_value['PagefileUsage'], psutil_value.pagefile) | |
448 self.assertEqual( | |
449 sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile) | |
450 | |
451 self.assertEqual(psutil_value.rss, psutil_value.wset) | |
452 self.assertEqual(psutil_value.vms, psutil_value.pagefile) | |
453 | |
454 def test_wait(self): | |
455 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
456 win32con.FALSE, self.pid) | |
457 self.addCleanup(win32api.CloseHandle, handle) | |
458 p = psutil.Process(self.pid) | |
459 p.terminate() | |
460 psutil_value = p.wait() | |
461 sys_value = win32process.GetExitCodeProcess(handle) | |
462 self.assertEqual(psutil_value, sys_value) | |
463 | |
464 def test_cpu_affinity(self): | |
465 def from_bitmask(x): | |
466 return [i for i in range(64) if (1 << i) & x] | |
467 | |
468 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
469 win32con.FALSE, self.pid) | |
470 self.addCleanup(win32api.CloseHandle, handle) | |
471 sys_value = from_bitmask( | |
472 win32process.GetProcessAffinityMask(handle)[0]) | |
473 psutil_value = psutil.Process(self.pid).cpu_affinity() | |
474 self.assertEqual(psutil_value, sys_value) | |
475 | |
476 def test_io_counters(self): | |
477 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
478 win32con.FALSE, os.getpid()) | |
479 self.addCleanup(win32api.CloseHandle, handle) | |
480 sys_value = win32process.GetProcessIoCounters(handle) | |
481 psutil_value = psutil.Process().io_counters() | |
482 self.assertEqual( | |
483 psutil_value.read_count, sys_value['ReadOperationCount']) | |
484 self.assertEqual( | |
485 psutil_value.write_count, sys_value['WriteOperationCount']) | |
486 self.assertEqual( | |
487 psutil_value.read_bytes, sys_value['ReadTransferCount']) | |
488 self.assertEqual( | |
489 psutil_value.write_bytes, sys_value['WriteTransferCount']) | |
490 self.assertEqual( | |
491 psutil_value.other_count, sys_value['OtherOperationCount']) | |
492 self.assertEqual( | |
493 psutil_value.other_bytes, sys_value['OtherTransferCount']) | |
494 | |
495 def test_num_handles(self): | |
496 import ctypes | |
497 import ctypes.wintypes | |
498 PROCESS_QUERY_INFORMATION = 0x400 | |
499 handle = ctypes.windll.kernel32.OpenProcess( | |
500 PROCESS_QUERY_INFORMATION, 0, self.pid) | |
501 self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle) | |
502 | |
503 hndcnt = ctypes.wintypes.DWORD() | |
504 ctypes.windll.kernel32.GetProcessHandleCount( | |
505 handle, ctypes.byref(hndcnt)) | |
506 sys_value = hndcnt.value | |
507 psutil_value = psutil.Process(self.pid).num_handles() | |
508 self.assertEqual(psutil_value, sys_value) | |
509 | |
510 def test_error_partial_copy(self): | |
511 # https://github.com/giampaolo/psutil/issues/875 | |
512 exc = WindowsError() | |
513 exc.winerror = 299 | |
514 with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc): | |
515 with mock.patch("time.sleep") as m: | |
516 p = psutil.Process() | |
517 self.assertRaises(psutil.AccessDenied, p.cwd) | |
518 self.assertGreaterEqual(m.call_count, 5) | |
519 | |
520 def test_exe(self): | |
521 # NtQuerySystemInformation succeeds if process is gone. Make sure | |
522 # it raises NSP for a non existent pid. | |
523 pid = psutil.pids()[-1] + 99999 | |
524 proc = psutil._psplatform.Process(pid) | |
525 self.assertRaises(psutil.NoSuchProcess, proc.exe) | |
526 | |
527 | |
528 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
529 class TestProcessWMI(TestCase): | |
530 """Compare Process API results with WMI.""" | |
531 | |
532 @classmethod | |
533 def setUpClass(cls): | |
534 cls.pid = get_test_subprocess().pid | |
535 | |
536 @classmethod | |
537 def tearDownClass(cls): | |
538 reap_children() | |
539 | |
540 def test_name(self): | |
541 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
542 p = psutil.Process(self.pid) | |
543 self.assertEqual(p.name(), w.Caption) | |
544 | |
545 def test_exe(self): | |
546 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
547 p = psutil.Process(self.pid) | |
548 # Note: wmi reports the exe as a lower case string. | |
549 # Being Windows paths case-insensitive we ignore that. | |
550 self.assertEqual(p.exe().lower(), w.ExecutablePath.lower()) | |
551 | |
552 def test_cmdline(self): | |
553 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
554 p = psutil.Process(self.pid) | |
555 self.assertEqual(' '.join(p.cmdline()), | |
556 w.CommandLine.replace('"', '')) | |
557 | |
558 def test_username(self): | |
559 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
560 p = psutil.Process(self.pid) | |
561 domain, _, username = w.GetOwner() | |
562 username = "%s\\%s" % (domain, username) | |
563 self.assertEqual(p.username(), username) | |
564 | |
565 def test_memory_rss(self): | |
566 time.sleep(0.1) | |
567 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
568 p = psutil.Process(self.pid) | |
569 rss = p.memory_info().rss | |
570 self.assertEqual(rss, int(w.WorkingSetSize)) | |
571 | |
572 def test_memory_vms(self): | |
573 time.sleep(0.1) | |
574 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
575 p = psutil.Process(self.pid) | |
576 vms = p.memory_info().vms | |
577 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx | |
578 # ...claims that PageFileUsage is represented in Kilo | |
579 # bytes but funnily enough on certain platforms bytes are | |
580 # returned instead. | |
581 wmi_usage = int(w.PageFileUsage) | |
582 if (vms != wmi_usage) and (vms != wmi_usage * 1024): | |
583 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) | |
584 | |
585 def test_create_time(self): | |
586 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
587 p = psutil.Process(self.pid) | |
588 wmic_create = str(w.CreationDate.split('.')[0]) | |
589 psutil_create = time.strftime("%Y%m%d%H%M%S", | |
590 time.localtime(p.create_time())) | |
591 self.assertEqual(wmic_create, psutil_create) | |
592 | |
593 | |
594 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
595 class TestDualProcessImplementation(TestCase): | |
596 """ | |
597 Certain APIs on Windows have 2 internal implementations, one | |
598 based on documented Windows APIs, another one based | |
599 NtQuerySystemInformation() which gets called as fallback in | |
600 case the first fails because of limited permission error. | |
601 Here we test that the two methods return the exact same value, | |
602 see: | |
603 https://github.com/giampaolo/psutil/issues/304 | |
604 """ | |
605 | |
606 @classmethod | |
607 def setUpClass(cls): | |
608 cls.pid = get_test_subprocess().pid | |
609 | |
610 @classmethod | |
611 def tearDownClass(cls): | |
612 reap_children() | |
613 | |
614 def test_memory_info(self): | |
615 mem_1 = psutil.Process(self.pid).memory_info() | |
616 with mock.patch("psutil._psplatform.cext.proc_memory_info", | |
617 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
618 mem_2 = psutil.Process(self.pid).memory_info() | |
619 self.assertEqual(len(mem_1), len(mem_2)) | |
620 for i in range(len(mem_1)): | |
621 self.assertGreaterEqual(mem_1[i], 0) | |
622 self.assertGreaterEqual(mem_2[i], 0) | |
623 self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512) | |
624 assert fun.called | |
625 | |
626 def test_create_time(self): | |
627 ctime = psutil.Process(self.pid).create_time() | |
628 with mock.patch("psutil._psplatform.cext.proc_times", | |
629 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
630 self.assertEqual(psutil.Process(self.pid).create_time(), ctime) | |
631 assert fun.called | |
632 | |
633 def test_cpu_times(self): | |
634 cpu_times_1 = psutil.Process(self.pid).cpu_times() | |
635 with mock.patch("psutil._psplatform.cext.proc_times", | |
636 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
637 cpu_times_2 = psutil.Process(self.pid).cpu_times() | |
638 assert fun.called | |
639 self.assertAlmostEqual( | |
640 cpu_times_1.user, cpu_times_2.user, delta=0.01) | |
641 self.assertAlmostEqual( | |
642 cpu_times_1.system, cpu_times_2.system, delta=0.01) | |
643 | |
644 def test_io_counters(self): | |
645 io_counters_1 = psutil.Process(self.pid).io_counters() | |
646 with mock.patch("psutil._psplatform.cext.proc_io_counters", | |
647 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
648 io_counters_2 = psutil.Process(self.pid).io_counters() | |
649 for i in range(len(io_counters_1)): | |
650 self.assertAlmostEqual( | |
651 io_counters_1[i], io_counters_2[i], delta=5) | |
652 assert fun.called | |
653 | |
654 def test_num_handles(self): | |
655 num_handles = psutil.Process(self.pid).num_handles() | |
656 with mock.patch("psutil._psplatform.cext.proc_num_handles", | |
657 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
658 self.assertEqual(psutil.Process(self.pid).num_handles(), | |
659 num_handles) | |
660 assert fun.called | |
661 | |
662 def test_cmdline(self): | |
663 from psutil._pswindows import convert_oserror | |
664 for pid in psutil.pids(): | |
665 try: | |
666 a = cext.proc_cmdline(pid, use_peb=True) | |
667 b = cext.proc_cmdline(pid, use_peb=False) | |
668 except OSError as err: | |
669 err = convert_oserror(err) | |
670 if not isinstance(err, (psutil.AccessDenied, | |
671 psutil.NoSuchProcess)): | |
672 raise | |
673 else: | |
674 self.assertEqual(a, b) | |
675 | |
676 | |
677 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
678 class RemoteProcessTestCase(TestCase): | |
679 """Certain functions require calling ReadProcessMemory. | |
680 This trivially works when called on the current process. | |
681 Check that this works on other processes, especially when they | |
682 have a different bitness. | |
683 """ | |
684 | |
685 @staticmethod | |
686 def find_other_interpreter(): | |
687 # find a python interpreter that is of the opposite bitness from us | |
688 code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))" | |
689 | |
690 # XXX: a different and probably more stable approach might be to access | |
691 # the registry but accessing 64 bit paths from a 32 bit process | |
692 for filename in glob.glob(r"C:\Python*\python.exe"): | |
693 proc = subprocess.Popen(args=[filename, "-c", code], | |
694 stdout=subprocess.PIPE, | |
695 stderr=subprocess.STDOUT) | |
696 output, _ = proc.communicate() | |
697 if output == str(not IS_64_BIT): | |
698 return filename | |
699 | |
700 @classmethod | |
701 def setUpClass(cls): | |
702 other_python = cls.find_other_interpreter() | |
703 | |
704 if other_python is None: | |
705 raise unittest.SkipTest( | |
706 "could not find interpreter with opposite bitness") | |
707 | |
708 if IS_64_BIT: | |
709 cls.python64 = sys.executable | |
710 cls.python32 = other_python | |
711 else: | |
712 cls.python64 = other_python | |
713 cls.python32 = sys.executable | |
714 | |
715 test_args = ["-c", "import sys; sys.stdin.read()"] | |
716 | |
717 def setUp(self): | |
718 env = os.environ.copy() | |
719 env["THINK_OF_A_NUMBER"] = str(os.getpid()) | |
720 self.proc32 = get_test_subprocess([self.python32] + self.test_args, | |
721 env=env, | |
722 stdin=subprocess.PIPE) | |
723 self.proc64 = get_test_subprocess([self.python64] + self.test_args, | |
724 env=env, | |
725 stdin=subprocess.PIPE) | |
726 | |
727 def tearDown(self): | |
728 self.proc32.communicate() | |
729 self.proc64.communicate() | |
730 reap_children() | |
731 | |
732 @classmethod | |
733 def tearDownClass(cls): | |
734 reap_children() | |
735 | |
736 def test_cmdline_32(self): | |
737 p = psutil.Process(self.proc32.pid) | |
738 self.assertEqual(len(p.cmdline()), 3) | |
739 self.assertEqual(p.cmdline()[1:], self.test_args) | |
740 | |
741 def test_cmdline_64(self): | |
742 p = psutil.Process(self.proc64.pid) | |
743 self.assertEqual(len(p.cmdline()), 3) | |
744 self.assertEqual(p.cmdline()[1:], self.test_args) | |
745 | |
746 def test_cwd_32(self): | |
747 p = psutil.Process(self.proc32.pid) | |
748 self.assertEqual(p.cwd(), os.getcwd()) | |
749 | |
750 def test_cwd_64(self): | |
751 p = psutil.Process(self.proc64.pid) | |
752 self.assertEqual(p.cwd(), os.getcwd()) | |
753 | |
754 def test_environ_32(self): | |
755 p = psutil.Process(self.proc32.pid) | |
756 e = p.environ() | |
757 self.assertIn("THINK_OF_A_NUMBER", e) | |
758 self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid())) | |
759 | |
760 def test_environ_64(self): | |
761 p = psutil.Process(self.proc64.pid) | |
762 try: | |
763 p.environ() | |
764 except psutil.AccessDenied: | |
765 pass | |
766 | |
767 | |
768 # =================================================================== | |
769 # Windows services | |
770 # =================================================================== | |
771 | |
772 | |
773 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
774 class TestServices(TestCase): | |
775 | |
776 def test_win_service_iter(self): | |
777 valid_statuses = set([ | |
778 "running", | |
779 "paused", | |
780 "start", | |
781 "pause", | |
782 "continue", | |
783 "stop", | |
784 "stopped", | |
785 ]) | |
786 valid_start_types = set([ | |
787 "automatic", | |
788 "manual", | |
789 "disabled", | |
790 ]) | |
791 valid_statuses = set([ | |
792 "running", | |
793 "paused", | |
794 "start_pending", | |
795 "pause_pending", | |
796 "continue_pending", | |
797 "stop_pending", | |
798 "stopped" | |
799 ]) | |
800 for serv in psutil.win_service_iter(): | |
801 data = serv.as_dict() | |
802 self.assertIsInstance(data['name'], str) | |
803 self.assertNotEqual(data['name'].strip(), "") | |
804 self.assertIsInstance(data['display_name'], str) | |
805 self.assertIsInstance(data['username'], str) | |
806 self.assertIn(data['status'], valid_statuses) | |
807 if data['pid'] is not None: | |
808 psutil.Process(data['pid']) | |
809 self.assertIsInstance(data['binpath'], str) | |
810 self.assertIsInstance(data['username'], str) | |
811 self.assertIsInstance(data['start_type'], str) | |
812 self.assertIn(data['start_type'], valid_start_types) | |
813 self.assertIn(data['status'], valid_statuses) | |
814 self.assertIsInstance(data['description'], str) | |
815 pid = serv.pid() | |
816 if pid is not None: | |
817 p = psutil.Process(pid) | |
818 self.assertTrue(p.is_running()) | |
819 # win_service_get | |
820 s = psutil.win_service_get(serv.name()) | |
821 # test __eq__ | |
822 self.assertEqual(serv, s) | |
823 | |
824 def test_win_service_get(self): | |
825 ERROR_SERVICE_DOES_NOT_EXIST = \ | |
826 psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST | |
827 ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED | |
828 | |
829 name = next(psutil.win_service_iter()).name() | |
830 with self.assertRaises(psutil.NoSuchProcess) as cm: | |
831 psutil.win_service_get(name + '???') | |
832 self.assertEqual(cm.exception.name, name + '???') | |
833 | |
834 # test NoSuchProcess | |
835 service = psutil.win_service_get(name) | |
836 if PY3: | |
837 args = (0, "msg", 0, ERROR_SERVICE_DOES_NOT_EXIST) | |
838 else: | |
839 args = (ERROR_SERVICE_DOES_NOT_EXIST, "msg") | |
840 exc = WindowsError(*args) | |
841 with mock.patch("psutil._psplatform.cext.winservice_query_status", | |
842 side_effect=exc): | |
843 self.assertRaises(psutil.NoSuchProcess, service.status) | |
844 with mock.patch("psutil._psplatform.cext.winservice_query_config", | |
845 side_effect=exc): | |
846 self.assertRaises(psutil.NoSuchProcess, service.username) | |
847 | |
848 # test AccessDenied | |
849 if PY3: | |
850 args = (0, "msg", 0, ERROR_ACCESS_DENIED) | |
851 else: | |
852 args = (ERROR_ACCESS_DENIED, "msg") | |
853 exc = WindowsError(*args) | |
854 with mock.patch("psutil._psplatform.cext.winservice_query_status", | |
855 side_effect=exc): | |
856 self.assertRaises(psutil.AccessDenied, service.status) | |
857 with mock.patch("psutil._psplatform.cext.winservice_query_config", | |
858 side_effect=exc): | |
859 self.assertRaises(psutil.AccessDenied, service.username) | |
860 | |
861 # test __str__ and __repr__ | |
862 self.assertIn(service.name(), str(service)) | |
863 self.assertIn(service.display_name(), str(service)) | |
864 self.assertIn(service.name(), repr(service)) | |
865 self.assertIn(service.display_name(), repr(service)) | |
866 | |
867 | |
868 if __name__ == '__main__': | |
869 from psutil.tests.runner import run | |
870 run(__file__) |