comparison env/lib/python3.7/site-packages/psutil/tests/test_windows.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 #!/usr/bin/env python3
2 # -*- coding: UTF-8 -*
3
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
5 # Use of this source code is governed by a BSD-style license that can be
6 # found in the LICENSE file.
7
8 """Windows specific tests."""
9
10 import datetime
11 import errno
12 import glob
13 import os
14 import platform
15 import re
16 import signal
17 import subprocess
18 import sys
19 import time
20 import warnings
21
22 import psutil
23 from psutil import WINDOWS
24 from psutil._compat import FileNotFoundError
25 from psutil.tests import APPVEYOR
26 from psutil.tests import get_test_subprocess
27 from psutil.tests import HAS_BATTERY
28 from psutil.tests import mock
29 from psutil.tests import PY3
30 from psutil.tests import PYPY
31 from psutil.tests import reap_children
32 from psutil.tests import retry_on_failure
33 from psutil.tests import sh
34 from psutil.tests import unittest
35
36
37 if WINDOWS and not PYPY:
38 with warnings.catch_warnings():
39 warnings.simplefilter("ignore")
40 import win32api # requires "pip install pypiwin32"
41 import win32con
42 import win32process
43 import wmi # requires "pip install wmi" / "make setup-dev-env"
44
45
46 cext = psutil._psplatform.cext
47
48 # are we a 64 bit process
49 IS_64_BIT = sys.maxsize > 2**32
50
51
52 def wrap_exceptions(fun):
53 def wrapper(self, *args, **kwargs):
54 try:
55 return fun(self, *args, **kwargs)
56 except OSError as err:
57 from psutil._pswindows import ACCESS_DENIED_SET
58 if err.errno in ACCESS_DENIED_SET:
59 raise psutil.AccessDenied(None, None)
60 if err.errno == errno.ESRCH:
61 raise psutil.NoSuchProcess(None, None)
62 raise
63 return wrapper
64
65
66 @unittest.skipIf(PYPY, "pywin32 not available on PYPY") # skip whole module
67 class TestCase(unittest.TestCase):
68 pass
69
70
71 # ===================================================================
72 # System APIs
73 # ===================================================================
74
75
76 @unittest.skipIf(not WINDOWS, "WINDOWS only")
77 class TestCpuAPIs(TestCase):
78
79 @unittest.skipIf('NUMBER_OF_PROCESSORS' not in os.environ,
80 'NUMBER_OF_PROCESSORS env var is not available')
81 def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self):
82 # Will likely fail on many-cores systems:
83 # https://stackoverflow.com/questions/31209256
84 num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
85 self.assertEqual(num_cpus, psutil.cpu_count())
86
87 def test_cpu_count_vs_GetSystemInfo(self):
88 # Will likely fail on many-cores systems:
89 # https://stackoverflow.com/questions/31209256
90 sys_value = win32api.GetSystemInfo()[5]
91 psutil_value = psutil.cpu_count()
92 self.assertEqual(sys_value, psutil_value)
93
94 def test_cpu_count_logical_vs_wmi(self):
95 w = wmi.WMI()
96 proc = w.Win32_Processor()[0]
97 self.assertEqual(psutil.cpu_count(), proc.NumberOfLogicalProcessors)
98
99 def test_cpu_count_phys_vs_wmi(self):
100 w = wmi.WMI()
101 proc = w.Win32_Processor()[0]
102 self.assertEqual(psutil.cpu_count(logical=False), proc.NumberOfCores)
103
104 def test_cpu_count_vs_cpu_times(self):
105 self.assertEqual(psutil.cpu_count(),
106 len(psutil.cpu_times(percpu=True)))
107
108 def test_cpu_freq(self):
109 w = wmi.WMI()
110 proc = w.Win32_Processor()[0]
111 self.assertEqual(proc.CurrentClockSpeed, psutil.cpu_freq().current)
112 self.assertEqual(proc.MaxClockSpeed, psutil.cpu_freq().max)
113
114
115 @unittest.skipIf(not WINDOWS, "WINDOWS only")
116 class TestSystemAPIs(TestCase):
117
118 def test_nic_names(self):
119 out = sh('ipconfig /all')
120 nics = psutil.net_io_counters(pernic=True).keys()
121 for nic in nics:
122 if "pseudo-interface" in nic.replace(' ', '-').lower():
123 continue
124 if nic not in out:
125 self.fail(
126 "%r nic wasn't found in 'ipconfig /all' output" % nic)
127
128 def test_total_phymem(self):
129 w = wmi.WMI().Win32_ComputerSystem()[0]
130 self.assertEqual(int(w.TotalPhysicalMemory),
131 psutil.virtual_memory().total)
132
133 # @unittest.skipIf(wmi is None, "wmi module is not installed")
134 # def test__UPTIME(self):
135 # # _UPTIME constant is not public but it is used internally
136 # # as value to return for pid 0 creation time.
137 # # WMI behaves the same.
138 # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
139 # p = psutil.Process(0)
140 # wmic_create = str(w.CreationDate.split('.')[0])
141 # psutil_create = time.strftime("%Y%m%d%H%M%S",
142 # time.localtime(p.create_time()))
143
144 # Note: this test is not very reliable
145 @unittest.skipIf(APPVEYOR, "test not relieable on appveyor")
146 @retry_on_failure()
147 def test_pids(self):
148 # Note: this test might fail if the OS is starting/killing
149 # other processes in the meantime
150 w = wmi.WMI().Win32_Process()
151 wmi_pids = set([x.ProcessId for x in w])
152 psutil_pids = set(psutil.pids())
153 self.assertEqual(wmi_pids, psutil_pids)
154
155 @retry_on_failure()
156 def test_disks(self):
157 ps_parts = psutil.disk_partitions(all=True)
158 wmi_parts = wmi.WMI().Win32_LogicalDisk()
159 for ps_part in ps_parts:
160 for wmi_part in wmi_parts:
161 if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
162 if not ps_part.mountpoint:
163 # this is usually a CD-ROM with no disk inserted
164 break
165 if 'cdrom' in ps_part.opts:
166 break
167 try:
168 usage = psutil.disk_usage(ps_part.mountpoint)
169 except FileNotFoundError:
170 # usually this is the floppy
171 break
172 self.assertEqual(usage.total, int(wmi_part.Size))
173 wmi_free = int(wmi_part.FreeSpace)
174 self.assertEqual(usage.free, wmi_free)
175 # 10 MB tollerance
176 if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
177 self.fail("psutil=%s, wmi=%s" % (
178 usage.free, wmi_free))
179 break
180 else:
181 self.fail("can't find partition %s" % repr(ps_part))
182
183 def test_disk_usage(self):
184 for disk in psutil.disk_partitions():
185 if 'cdrom' in disk.opts:
186 continue
187 sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint)
188 psutil_value = psutil.disk_usage(disk.mountpoint)
189 self.assertAlmostEqual(sys_value[0], psutil_value.free,
190 delta=1024 * 1024)
191 self.assertAlmostEqual(sys_value[1], psutil_value.total,
192 delta=1024 * 1024)
193 self.assertEqual(psutil_value.used,
194 psutil_value.total - psutil_value.free)
195
196 def test_disk_partitions(self):
197 sys_value = [
198 x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00")
199 if x and not x.startswith('A:')]
200 psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)]
201 self.assertEqual(sys_value, psutil_value)
202
203 def test_net_if_stats(self):
204 ps_names = set(cext.net_if_stats())
205 wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
206 wmi_names = set()
207 for wmi_adapter in wmi_adapters:
208 wmi_names.add(wmi_adapter.Name)
209 wmi_names.add(wmi_adapter.NetConnectionID)
210 self.assertTrue(ps_names & wmi_names,
211 "no common entries in %s, %s" % (ps_names, wmi_names))
212
213 def test_boot_time(self):
214 wmi_os = wmi.WMI().Win32_OperatingSystem()
215 wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0]
216 wmi_btime_dt = datetime.datetime.strptime(
217 wmi_btime_str, "%Y%m%d%H%M%S")
218 psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time())
219 diff = abs((wmi_btime_dt - psutil_dt).total_seconds())
220 self.assertLessEqual(diff, 3)
221
222 def test_boot_time_fluctuation(self):
223 # https://github.com/giampaolo/psutil/issues/1007
224 with mock.patch('psutil._pswindows.cext.boot_time', return_value=5):
225 self.assertEqual(psutil.boot_time(), 5)
226 with mock.patch('psutil._pswindows.cext.boot_time', return_value=4):
227 self.assertEqual(psutil.boot_time(), 5)
228 with mock.patch('psutil._pswindows.cext.boot_time', return_value=6):
229 self.assertEqual(psutil.boot_time(), 5)
230 with mock.patch('psutil._pswindows.cext.boot_time', return_value=333):
231 self.assertEqual(psutil.boot_time(), 333)
232
233
234 # ===================================================================
235 # sensors_battery()
236 # ===================================================================
237
238
239 @unittest.skipIf(not WINDOWS, "WINDOWS only")
240 class TestSensorsBattery(TestCase):
241
242 def test_has_battery(self):
243 if win32api.GetPwrCapabilities()['SystemBatteriesPresent']:
244 self.assertIsNotNone(psutil.sensors_battery())
245 else:
246 self.assertIsNone(psutil.sensors_battery())
247
248 @unittest.skipIf(not HAS_BATTERY, "no battery")
249 def test_percent(self):
250 w = wmi.WMI()
251 battery_wmi = w.query('select * from Win32_Battery')[0]
252 battery_psutil = psutil.sensors_battery()
253 self.assertAlmostEqual(
254 battery_psutil.percent, battery_wmi.EstimatedChargeRemaining,
255 delta=1)
256
257 @unittest.skipIf(not HAS_BATTERY, "no battery")
258 def test_power_plugged(self):
259 w = wmi.WMI()
260 battery_wmi = w.query('select * from Win32_Battery')[0]
261 battery_psutil = psutil.sensors_battery()
262 # Status codes:
263 # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx
264 self.assertEqual(battery_psutil.power_plugged,
265 battery_wmi.BatteryStatus == 2)
266
267 def test_emulate_no_battery(self):
268 with mock.patch("psutil._pswindows.cext.sensors_battery",
269 return_value=(0, 128, 0, 0)) as m:
270 self.assertIsNone(psutil.sensors_battery())
271 assert m.called
272
273 def test_emulate_power_connected(self):
274 with mock.patch("psutil._pswindows.cext.sensors_battery",
275 return_value=(1, 0, 0, 0)) as m:
276 self.assertEqual(psutil.sensors_battery().secsleft,
277 psutil.POWER_TIME_UNLIMITED)
278 assert m.called
279
280 def test_emulate_power_charging(self):
281 with mock.patch("psutil._pswindows.cext.sensors_battery",
282 return_value=(0, 8, 0, 0)) as m:
283 self.assertEqual(psutil.sensors_battery().secsleft,
284 psutil.POWER_TIME_UNLIMITED)
285 assert m.called
286
287 def test_emulate_secs_left_unknown(self):
288 with mock.patch("psutil._pswindows.cext.sensors_battery",
289 return_value=(0, 0, 0, -1)) as m:
290 self.assertEqual(psutil.sensors_battery().secsleft,
291 psutil.POWER_TIME_UNKNOWN)
292 assert m.called
293
294
295 # ===================================================================
296 # Process APIs
297 # ===================================================================
298
299
300 @unittest.skipIf(not WINDOWS, "WINDOWS only")
301 class TestProcess(TestCase):
302
303 @classmethod
304 def setUpClass(cls):
305 cls.pid = get_test_subprocess().pid
306
307 @classmethod
308 def tearDownClass(cls):
309 reap_children()
310
311 def test_issue_24(self):
312 p = psutil.Process(0)
313 self.assertRaises(psutil.AccessDenied, p.kill)
314
315 def test_special_pid(self):
316 p = psutil.Process(4)
317 self.assertEqual(p.name(), 'System')
318 # use __str__ to access all common Process properties to check
319 # that nothing strange happens
320 str(p)
321 p.username()
322 self.assertTrue(p.create_time() >= 0.0)
323 try:
324 rss, vms = p.memory_info()[:2]
325 except psutil.AccessDenied:
326 # expected on Windows Vista and Windows 7
327 if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
328 raise
329 else:
330 self.assertTrue(rss > 0)
331
332 def test_send_signal(self):
333 p = psutil.Process(self.pid)
334 self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
335
336 def test_num_handles_increment(self):
337 p = psutil.Process(os.getpid())
338 before = p.num_handles()
339 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
340 win32con.FALSE, os.getpid())
341 after = p.num_handles()
342 self.assertEqual(after, before + 1)
343 win32api.CloseHandle(handle)
344 self.assertEqual(p.num_handles(), before)
345
346 def test_handles_leak(self):
347 # Call all Process methods and make sure no handles are left
348 # open. This is here mainly to make sure functions using
349 # OpenProcess() always call CloseHandle().
350 def call(p, attr):
351 attr = getattr(p, name, None)
352 if attr is not None and callable(attr):
353 attr()
354 else:
355 attr
356
357 p = psutil.Process(self.pid)
358 failures = []
359 for name in dir(psutil.Process):
360 if name.startswith('_') \
361 or name in ('terminate', 'kill', 'suspend', 'resume',
362 'nice', 'send_signal', 'wait', 'children',
363 'as_dict', 'memory_info_ex'):
364 continue
365 else:
366 try:
367 call(p, name)
368 num1 = p.num_handles()
369 call(p, name)
370 num2 = p.num_handles()
371 except (psutil.NoSuchProcess, psutil.AccessDenied):
372 pass
373 else:
374 if num2 > num1:
375 fail = \
376 "failure while processing Process.%s method " \
377 "(before=%s, after=%s)" % (name, num1, num2)
378 failures.append(fail)
379 if failures:
380 self.fail('\n' + '\n'.join(failures))
381
382 @unittest.skipIf(not sys.version_info >= (2, 7),
383 "CTRL_* signals not supported")
384 def test_ctrl_signals(self):
385 p = psutil.Process(get_test_subprocess().pid)
386 p.send_signal(signal.CTRL_C_EVENT)
387 p.send_signal(signal.CTRL_BREAK_EVENT)
388 p.kill()
389 p.wait()
390 self.assertRaises(psutil.NoSuchProcess,
391 p.send_signal, signal.CTRL_C_EVENT)
392 self.assertRaises(psutil.NoSuchProcess,
393 p.send_signal, signal.CTRL_BREAK_EVENT)
394
395 def test_username(self):
396 self.assertEqual(psutil.Process().username(),
397 win32api.GetUserNameEx(win32con.NameSamCompatible))
398
399 def test_cmdline(self):
400 sys_value = re.sub(' +', ' ', win32api.GetCommandLine()).strip()
401 psutil_value = ' '.join(psutil.Process().cmdline())
402 self.assertEqual(sys_value, psutil_value)
403
404 # XXX - occasional failures
405
406 # def test_cpu_times(self):
407 # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
408 # win32con.FALSE, os.getpid())
409 # self.addCleanup(win32api.CloseHandle, handle)
410 # sys_value = win32process.GetProcessTimes(handle)
411 # psutil_value = psutil.Process().cpu_times()
412 # self.assertAlmostEqual(
413 # psutil_value.user, sys_value['UserTime'] / 10000000.0,
414 # delta=0.2)
415 # self.assertAlmostEqual(
416 # psutil_value.user, sys_value['KernelTime'] / 10000000.0,
417 # delta=0.2)
418
419 def test_nice(self):
420 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
421 win32con.FALSE, os.getpid())
422 self.addCleanup(win32api.CloseHandle, handle)
423 sys_value = win32process.GetPriorityClass(handle)
424 psutil_value = psutil.Process().nice()
425 self.assertEqual(psutil_value, sys_value)
426
427 def test_memory_info(self):
428 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
429 win32con.FALSE, self.pid)
430 self.addCleanup(win32api.CloseHandle, handle)
431 sys_value = win32process.GetProcessMemoryInfo(handle)
432 psutil_value = psutil.Process(self.pid).memory_info()
433 self.assertEqual(
434 sys_value['PeakWorkingSetSize'], psutil_value.peak_wset)
435 self.assertEqual(
436 sys_value['WorkingSetSize'], psutil_value.wset)
437 self.assertEqual(
438 sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool)
439 self.assertEqual(
440 sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool)
441 self.assertEqual(
442 sys_value['QuotaPeakNonPagedPoolUsage'],
443 psutil_value.peak_nonpaged_pool)
444 self.assertEqual(
445 sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool)
446 self.assertEqual(
447 sys_value['PagefileUsage'], psutil_value.pagefile)
448 self.assertEqual(
449 sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile)
450
451 self.assertEqual(psutil_value.rss, psutil_value.wset)
452 self.assertEqual(psutil_value.vms, psutil_value.pagefile)
453
454 def test_wait(self):
455 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
456 win32con.FALSE, self.pid)
457 self.addCleanup(win32api.CloseHandle, handle)
458 p = psutil.Process(self.pid)
459 p.terminate()
460 psutil_value = p.wait()
461 sys_value = win32process.GetExitCodeProcess(handle)
462 self.assertEqual(psutil_value, sys_value)
463
464 def test_cpu_affinity(self):
465 def from_bitmask(x):
466 return [i for i in range(64) if (1 << i) & x]
467
468 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
469 win32con.FALSE, self.pid)
470 self.addCleanup(win32api.CloseHandle, handle)
471 sys_value = from_bitmask(
472 win32process.GetProcessAffinityMask(handle)[0])
473 psutil_value = psutil.Process(self.pid).cpu_affinity()
474 self.assertEqual(psutil_value, sys_value)
475
476 def test_io_counters(self):
477 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
478 win32con.FALSE, os.getpid())
479 self.addCleanup(win32api.CloseHandle, handle)
480 sys_value = win32process.GetProcessIoCounters(handle)
481 psutil_value = psutil.Process().io_counters()
482 self.assertEqual(
483 psutil_value.read_count, sys_value['ReadOperationCount'])
484 self.assertEqual(
485 psutil_value.write_count, sys_value['WriteOperationCount'])
486 self.assertEqual(
487 psutil_value.read_bytes, sys_value['ReadTransferCount'])
488 self.assertEqual(
489 psutil_value.write_bytes, sys_value['WriteTransferCount'])
490 self.assertEqual(
491 psutil_value.other_count, sys_value['OtherOperationCount'])
492 self.assertEqual(
493 psutil_value.other_bytes, sys_value['OtherTransferCount'])
494
495 def test_num_handles(self):
496 import ctypes
497 import ctypes.wintypes
498 PROCESS_QUERY_INFORMATION = 0x400
499 handle = ctypes.windll.kernel32.OpenProcess(
500 PROCESS_QUERY_INFORMATION, 0, self.pid)
501 self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle)
502
503 hndcnt = ctypes.wintypes.DWORD()
504 ctypes.windll.kernel32.GetProcessHandleCount(
505 handle, ctypes.byref(hndcnt))
506 sys_value = hndcnt.value
507 psutil_value = psutil.Process(self.pid).num_handles()
508 self.assertEqual(psutil_value, sys_value)
509
510 def test_error_partial_copy(self):
511 # https://github.com/giampaolo/psutil/issues/875
512 exc = WindowsError()
513 exc.winerror = 299
514 with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc):
515 with mock.patch("time.sleep") as m:
516 p = psutil.Process()
517 self.assertRaises(psutil.AccessDenied, p.cwd)
518 self.assertGreaterEqual(m.call_count, 5)
519
520 def test_exe(self):
521 # NtQuerySystemInformation succeeds if process is gone. Make sure
522 # it raises NSP for a non existent pid.
523 pid = psutil.pids()[-1] + 99999
524 proc = psutil._psplatform.Process(pid)
525 self.assertRaises(psutil.NoSuchProcess, proc.exe)
526
527
528 @unittest.skipIf(not WINDOWS, "WINDOWS only")
529 class TestProcessWMI(TestCase):
530 """Compare Process API results with WMI."""
531
532 @classmethod
533 def setUpClass(cls):
534 cls.pid = get_test_subprocess().pid
535
536 @classmethod
537 def tearDownClass(cls):
538 reap_children()
539
540 def test_name(self):
541 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
542 p = psutil.Process(self.pid)
543 self.assertEqual(p.name(), w.Caption)
544
545 def test_exe(self):
546 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
547 p = psutil.Process(self.pid)
548 # Note: wmi reports the exe as a lower case string.
549 # Being Windows paths case-insensitive we ignore that.
550 self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
551
552 def test_cmdline(self):
553 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
554 p = psutil.Process(self.pid)
555 self.assertEqual(' '.join(p.cmdline()),
556 w.CommandLine.replace('"', ''))
557
558 def test_username(self):
559 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
560 p = psutil.Process(self.pid)
561 domain, _, username = w.GetOwner()
562 username = "%s\\%s" % (domain, username)
563 self.assertEqual(p.username(), username)
564
565 def test_memory_rss(self):
566 time.sleep(0.1)
567 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
568 p = psutil.Process(self.pid)
569 rss = p.memory_info().rss
570 self.assertEqual(rss, int(w.WorkingSetSize))
571
572 def test_memory_vms(self):
573 time.sleep(0.1)
574 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
575 p = psutil.Process(self.pid)
576 vms = p.memory_info().vms
577 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
578 # ...claims that PageFileUsage is represented in Kilo
579 # bytes but funnily enough on certain platforms bytes are
580 # returned instead.
581 wmi_usage = int(w.PageFileUsage)
582 if (vms != wmi_usage) and (vms != wmi_usage * 1024):
583 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
584
585 def test_create_time(self):
586 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
587 p = psutil.Process(self.pid)
588 wmic_create = str(w.CreationDate.split('.')[0])
589 psutil_create = time.strftime("%Y%m%d%H%M%S",
590 time.localtime(p.create_time()))
591 self.assertEqual(wmic_create, psutil_create)
592
593
594 @unittest.skipIf(not WINDOWS, "WINDOWS only")
595 class TestDualProcessImplementation(TestCase):
596 """
597 Certain APIs on Windows have 2 internal implementations, one
598 based on documented Windows APIs, another one based
599 NtQuerySystemInformation() which gets called as fallback in
600 case the first fails because of limited permission error.
601 Here we test that the two methods return the exact same value,
602 see:
603 https://github.com/giampaolo/psutil/issues/304
604 """
605
606 @classmethod
607 def setUpClass(cls):
608 cls.pid = get_test_subprocess().pid
609
610 @classmethod
611 def tearDownClass(cls):
612 reap_children()
613
614 def test_memory_info(self):
615 mem_1 = psutil.Process(self.pid).memory_info()
616 with mock.patch("psutil._psplatform.cext.proc_memory_info",
617 side_effect=OSError(errno.EPERM, "msg")) as fun:
618 mem_2 = psutil.Process(self.pid).memory_info()
619 self.assertEqual(len(mem_1), len(mem_2))
620 for i in range(len(mem_1)):
621 self.assertGreaterEqual(mem_1[i], 0)
622 self.assertGreaterEqual(mem_2[i], 0)
623 self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512)
624 assert fun.called
625
626 def test_create_time(self):
627 ctime = psutil.Process(self.pid).create_time()
628 with mock.patch("psutil._psplatform.cext.proc_times",
629 side_effect=OSError(errno.EPERM, "msg")) as fun:
630 self.assertEqual(psutil.Process(self.pid).create_time(), ctime)
631 assert fun.called
632
633 def test_cpu_times(self):
634 cpu_times_1 = psutil.Process(self.pid).cpu_times()
635 with mock.patch("psutil._psplatform.cext.proc_times",
636 side_effect=OSError(errno.EPERM, "msg")) as fun:
637 cpu_times_2 = psutil.Process(self.pid).cpu_times()
638 assert fun.called
639 self.assertAlmostEqual(
640 cpu_times_1.user, cpu_times_2.user, delta=0.01)
641 self.assertAlmostEqual(
642 cpu_times_1.system, cpu_times_2.system, delta=0.01)
643
644 def test_io_counters(self):
645 io_counters_1 = psutil.Process(self.pid).io_counters()
646 with mock.patch("psutil._psplatform.cext.proc_io_counters",
647 side_effect=OSError(errno.EPERM, "msg")) as fun:
648 io_counters_2 = psutil.Process(self.pid).io_counters()
649 for i in range(len(io_counters_1)):
650 self.assertAlmostEqual(
651 io_counters_1[i], io_counters_2[i], delta=5)
652 assert fun.called
653
654 def test_num_handles(self):
655 num_handles = psutil.Process(self.pid).num_handles()
656 with mock.patch("psutil._psplatform.cext.proc_num_handles",
657 side_effect=OSError(errno.EPERM, "msg")) as fun:
658 self.assertEqual(psutil.Process(self.pid).num_handles(),
659 num_handles)
660 assert fun.called
661
662 def test_cmdline(self):
663 from psutil._pswindows import convert_oserror
664 for pid in psutil.pids():
665 try:
666 a = cext.proc_cmdline(pid, use_peb=True)
667 b = cext.proc_cmdline(pid, use_peb=False)
668 except OSError as err:
669 err = convert_oserror(err)
670 if not isinstance(err, (psutil.AccessDenied,
671 psutil.NoSuchProcess)):
672 raise
673 else:
674 self.assertEqual(a, b)
675
676
677 @unittest.skipIf(not WINDOWS, "WINDOWS only")
678 class RemoteProcessTestCase(TestCase):
679 """Certain functions require calling ReadProcessMemory.
680 This trivially works when called on the current process.
681 Check that this works on other processes, especially when they
682 have a different bitness.
683 """
684
685 @staticmethod
686 def find_other_interpreter():
687 # find a python interpreter that is of the opposite bitness from us
688 code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))"
689
690 # XXX: a different and probably more stable approach might be to access
691 # the registry but accessing 64 bit paths from a 32 bit process
692 for filename in glob.glob(r"C:\Python*\python.exe"):
693 proc = subprocess.Popen(args=[filename, "-c", code],
694 stdout=subprocess.PIPE,
695 stderr=subprocess.STDOUT)
696 output, _ = proc.communicate()
697 if output == str(not IS_64_BIT):
698 return filename
699
700 @classmethod
701 def setUpClass(cls):
702 other_python = cls.find_other_interpreter()
703
704 if other_python is None:
705 raise unittest.SkipTest(
706 "could not find interpreter with opposite bitness")
707
708 if IS_64_BIT:
709 cls.python64 = sys.executable
710 cls.python32 = other_python
711 else:
712 cls.python64 = other_python
713 cls.python32 = sys.executable
714
715 test_args = ["-c", "import sys; sys.stdin.read()"]
716
717 def setUp(self):
718 env = os.environ.copy()
719 env["THINK_OF_A_NUMBER"] = str(os.getpid())
720 self.proc32 = get_test_subprocess([self.python32] + self.test_args,
721 env=env,
722 stdin=subprocess.PIPE)
723 self.proc64 = get_test_subprocess([self.python64] + self.test_args,
724 env=env,
725 stdin=subprocess.PIPE)
726
727 def tearDown(self):
728 self.proc32.communicate()
729 self.proc64.communicate()
730 reap_children()
731
732 @classmethod
733 def tearDownClass(cls):
734 reap_children()
735
736 def test_cmdline_32(self):
737 p = psutil.Process(self.proc32.pid)
738 self.assertEqual(len(p.cmdline()), 3)
739 self.assertEqual(p.cmdline()[1:], self.test_args)
740
741 def test_cmdline_64(self):
742 p = psutil.Process(self.proc64.pid)
743 self.assertEqual(len(p.cmdline()), 3)
744 self.assertEqual(p.cmdline()[1:], self.test_args)
745
746 def test_cwd_32(self):
747 p = psutil.Process(self.proc32.pid)
748 self.assertEqual(p.cwd(), os.getcwd())
749
750 def test_cwd_64(self):
751 p = psutil.Process(self.proc64.pid)
752 self.assertEqual(p.cwd(), os.getcwd())
753
754 def test_environ_32(self):
755 p = psutil.Process(self.proc32.pid)
756 e = p.environ()
757 self.assertIn("THINK_OF_A_NUMBER", e)
758 self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid()))
759
760 def test_environ_64(self):
761 p = psutil.Process(self.proc64.pid)
762 try:
763 p.environ()
764 except psutil.AccessDenied:
765 pass
766
767
768 # ===================================================================
769 # Windows services
770 # ===================================================================
771
772
773 @unittest.skipIf(not WINDOWS, "WINDOWS only")
774 class TestServices(TestCase):
775
776 def test_win_service_iter(self):
777 valid_statuses = set([
778 "running",
779 "paused",
780 "start",
781 "pause",
782 "continue",
783 "stop",
784 "stopped",
785 ])
786 valid_start_types = set([
787 "automatic",
788 "manual",
789 "disabled",
790 ])
791 valid_statuses = set([
792 "running",
793 "paused",
794 "start_pending",
795 "pause_pending",
796 "continue_pending",
797 "stop_pending",
798 "stopped"
799 ])
800 for serv in psutil.win_service_iter():
801 data = serv.as_dict()
802 self.assertIsInstance(data['name'], str)
803 self.assertNotEqual(data['name'].strip(), "")
804 self.assertIsInstance(data['display_name'], str)
805 self.assertIsInstance(data['username'], str)
806 self.assertIn(data['status'], valid_statuses)
807 if data['pid'] is not None:
808 psutil.Process(data['pid'])
809 self.assertIsInstance(data['binpath'], str)
810 self.assertIsInstance(data['username'], str)
811 self.assertIsInstance(data['start_type'], str)
812 self.assertIn(data['start_type'], valid_start_types)
813 self.assertIn(data['status'], valid_statuses)
814 self.assertIsInstance(data['description'], str)
815 pid = serv.pid()
816 if pid is not None:
817 p = psutil.Process(pid)
818 self.assertTrue(p.is_running())
819 # win_service_get
820 s = psutil.win_service_get(serv.name())
821 # test __eq__
822 self.assertEqual(serv, s)
823
824 def test_win_service_get(self):
825 ERROR_SERVICE_DOES_NOT_EXIST = \
826 psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST
827 ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED
828
829 name = next(psutil.win_service_iter()).name()
830 with self.assertRaises(psutil.NoSuchProcess) as cm:
831 psutil.win_service_get(name + '???')
832 self.assertEqual(cm.exception.name, name + '???')
833
834 # test NoSuchProcess
835 service = psutil.win_service_get(name)
836 if PY3:
837 args = (0, "msg", 0, ERROR_SERVICE_DOES_NOT_EXIST)
838 else:
839 args = (ERROR_SERVICE_DOES_NOT_EXIST, "msg")
840 exc = WindowsError(*args)
841 with mock.patch("psutil._psplatform.cext.winservice_query_status",
842 side_effect=exc):
843 self.assertRaises(psutil.NoSuchProcess, service.status)
844 with mock.patch("psutil._psplatform.cext.winservice_query_config",
845 side_effect=exc):
846 self.assertRaises(psutil.NoSuchProcess, service.username)
847
848 # test AccessDenied
849 if PY3:
850 args = (0, "msg", 0, ERROR_ACCESS_DENIED)
851 else:
852 args = (ERROR_ACCESS_DENIED, "msg")
853 exc = WindowsError(*args)
854 with mock.patch("psutil._psplatform.cext.winservice_query_status",
855 side_effect=exc):
856 self.assertRaises(psutil.AccessDenied, service.status)
857 with mock.patch("psutil._psplatform.cext.winservice_query_config",
858 side_effect=exc):
859 self.assertRaises(psutil.AccessDenied, service.username)
860
861 # test __str__ and __repr__
862 self.assertIn(service.name(), str(service))
863 self.assertIn(service.display_name(), str(service))
864 self.assertIn(service.name(), repr(service))
865 self.assertIn(service.display_name(), repr(service))
866
867
868 if __name__ == '__main__':
869 from psutil.tests.runner import run
870 run(__file__)