Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/psutil/tests/test_windows.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/psutil/tests/test_windows.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,870 @@ +#!/usr/bin/env python3 +# -*- coding: UTF-8 -* + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Windows specific tests.""" + +import datetime +import errno +import glob +import os +import platform +import re +import signal +import subprocess +import sys +import time +import warnings + +import psutil +from psutil import WINDOWS +from psutil._compat import FileNotFoundError +from psutil.tests import APPVEYOR +from psutil.tests import get_test_subprocess +from psutil.tests import HAS_BATTERY +from psutil.tests import mock +from psutil.tests import PY3 +from psutil.tests import PYPY +from psutil.tests import reap_children +from psutil.tests import retry_on_failure +from psutil.tests import sh +from psutil.tests import unittest + + +if WINDOWS and not PYPY: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + import win32api # requires "pip install pypiwin32" + import win32con + import win32process + import wmi # requires "pip install wmi" / "make setup-dev-env" + + +cext = psutil._psplatform.cext + +# are we a 64 bit process +IS_64_BIT = sys.maxsize > 2**32 + + +def wrap_exceptions(fun): + def wrapper(self, *args, **kwargs): + try: + return fun(self, *args, **kwargs) + except OSError as err: + from psutil._pswindows import ACCESS_DENIED_SET + if err.errno in ACCESS_DENIED_SET: + raise psutil.AccessDenied(None, None) + if err.errno == errno.ESRCH: + raise psutil.NoSuchProcess(None, None) + raise + return wrapper + + +@unittest.skipIf(PYPY, "pywin32 not available on PYPY") # skip whole module +class TestCase(unittest.TestCase): + pass + + +# =================================================================== +# System APIs +# =================================================================== + + +@unittest.skipIf(not WINDOWS, "WINDOWS only") +class TestCpuAPIs(TestCase): + + @unittest.skipIf('NUMBER_OF_PROCESSORS' not in os.environ, + 'NUMBER_OF_PROCESSORS env var is not available') + def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self): + # Will likely fail on many-cores systems: + # https://stackoverflow.com/questions/31209256 + num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) + self.assertEqual(num_cpus, psutil.cpu_count()) + + def test_cpu_count_vs_GetSystemInfo(self): + # Will likely fail on many-cores systems: + # https://stackoverflow.com/questions/31209256 + sys_value = win32api.GetSystemInfo()[5] + psutil_value = psutil.cpu_count() + self.assertEqual(sys_value, psutil_value) + + def test_cpu_count_logical_vs_wmi(self): + w = wmi.WMI() + proc = w.Win32_Processor()[0] + self.assertEqual(psutil.cpu_count(), proc.NumberOfLogicalProcessors) + + def test_cpu_count_phys_vs_wmi(self): + w = wmi.WMI() + proc = w.Win32_Processor()[0] + self.assertEqual(psutil.cpu_count(logical=False), proc.NumberOfCores) + + def test_cpu_count_vs_cpu_times(self): + self.assertEqual(psutil.cpu_count(), + len(psutil.cpu_times(percpu=True))) + + def test_cpu_freq(self): + w = wmi.WMI() + proc = w.Win32_Processor()[0] + self.assertEqual(proc.CurrentClockSpeed, psutil.cpu_freq().current) + self.assertEqual(proc.MaxClockSpeed, psutil.cpu_freq().max) + + +@unittest.skipIf(not WINDOWS, "WINDOWS only") +class TestSystemAPIs(TestCase): + + def test_nic_names(self): + out = sh('ipconfig /all') + nics = psutil.net_io_counters(pernic=True).keys() + for nic in nics: + if "pseudo-interface" in nic.replace(' ', '-').lower(): + continue + if nic not in out: + self.fail( + "%r nic wasn't found in 'ipconfig /all' output" % nic) + + def test_total_phymem(self): + w = wmi.WMI().Win32_ComputerSystem()[0] + self.assertEqual(int(w.TotalPhysicalMemory), + psutil.virtual_memory().total) + + # @unittest.skipIf(wmi is None, "wmi module is not installed") + # def test__UPTIME(self): + # # _UPTIME constant is not public but it is used internally + # # as value to return for pid 0 creation time. + # # WMI behaves the same. + # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + # p = psutil.Process(0) + # wmic_create = str(w.CreationDate.split('.')[0]) + # psutil_create = time.strftime("%Y%m%d%H%M%S", + # time.localtime(p.create_time())) + + # Note: this test is not very reliable + @unittest.skipIf(APPVEYOR, "test not relieable on appveyor") + @retry_on_failure() + def test_pids(self): + # Note: this test might fail if the OS is starting/killing + # other processes in the meantime + w = wmi.WMI().Win32_Process() + wmi_pids = set([x.ProcessId for x in w]) + psutil_pids = set(psutil.pids()) + self.assertEqual(wmi_pids, psutil_pids) + + @retry_on_failure() + def test_disks(self): + ps_parts = psutil.disk_partitions(all=True) + wmi_parts = wmi.WMI().Win32_LogicalDisk() + for ps_part in ps_parts: + for wmi_part in wmi_parts: + if ps_part.device.replace('\\', '') == wmi_part.DeviceID: + if not ps_part.mountpoint: + # this is usually a CD-ROM with no disk inserted + break + if 'cdrom' in ps_part.opts: + break + try: + usage = psutil.disk_usage(ps_part.mountpoint) + except FileNotFoundError: + # usually this is the floppy + break + self.assertEqual(usage.total, int(wmi_part.Size)) + wmi_free = int(wmi_part.FreeSpace) + self.assertEqual(usage.free, wmi_free) + # 10 MB tollerance + if abs(usage.free - wmi_free) > 10 * 1024 * 1024: + self.fail("psutil=%s, wmi=%s" % ( + usage.free, wmi_free)) + break + else: + self.fail("can't find partition %s" % repr(ps_part)) + + def test_disk_usage(self): + for disk in psutil.disk_partitions(): + if 'cdrom' in disk.opts: + continue + sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint) + psutil_value = psutil.disk_usage(disk.mountpoint) + self.assertAlmostEqual(sys_value[0], psutil_value.free, + delta=1024 * 1024) + self.assertAlmostEqual(sys_value[1], psutil_value.total, + delta=1024 * 1024) + self.assertEqual(psutil_value.used, + psutil_value.total - psutil_value.free) + + def test_disk_partitions(self): + sys_value = [ + x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00") + if x and not x.startswith('A:')] + psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)] + self.assertEqual(sys_value, psutil_value) + + def test_net_if_stats(self): + ps_names = set(cext.net_if_stats()) + wmi_adapters = wmi.WMI().Win32_NetworkAdapter() + wmi_names = set() + for wmi_adapter in wmi_adapters: + wmi_names.add(wmi_adapter.Name) + wmi_names.add(wmi_adapter.NetConnectionID) + self.assertTrue(ps_names & wmi_names, + "no common entries in %s, %s" % (ps_names, wmi_names)) + + def test_boot_time(self): + wmi_os = wmi.WMI().Win32_OperatingSystem() + wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0] + wmi_btime_dt = datetime.datetime.strptime( + wmi_btime_str, "%Y%m%d%H%M%S") + psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time()) + diff = abs((wmi_btime_dt - psutil_dt).total_seconds()) + self.assertLessEqual(diff, 3) + + def test_boot_time_fluctuation(self): + # https://github.com/giampaolo/psutil/issues/1007 + with mock.patch('psutil._pswindows.cext.boot_time', return_value=5): + self.assertEqual(psutil.boot_time(), 5) + with mock.patch('psutil._pswindows.cext.boot_time', return_value=4): + self.assertEqual(psutil.boot_time(), 5) + with mock.patch('psutil._pswindows.cext.boot_time', return_value=6): + self.assertEqual(psutil.boot_time(), 5) + with mock.patch('psutil._pswindows.cext.boot_time', return_value=333): + self.assertEqual(psutil.boot_time(), 333) + + +# =================================================================== +# sensors_battery() +# =================================================================== + + +@unittest.skipIf(not WINDOWS, "WINDOWS only") +class TestSensorsBattery(TestCase): + + def test_has_battery(self): + if win32api.GetPwrCapabilities()['SystemBatteriesPresent']: + self.assertIsNotNone(psutil.sensors_battery()) + else: + self.assertIsNone(psutil.sensors_battery()) + + @unittest.skipIf(not HAS_BATTERY, "no battery") + def test_percent(self): + w = wmi.WMI() + battery_wmi = w.query('select * from Win32_Battery')[0] + battery_psutil = psutil.sensors_battery() + self.assertAlmostEqual( + battery_psutil.percent, battery_wmi.EstimatedChargeRemaining, + delta=1) + + @unittest.skipIf(not HAS_BATTERY, "no battery") + def test_power_plugged(self): + w = wmi.WMI() + battery_wmi = w.query('select * from Win32_Battery')[0] + battery_psutil = psutil.sensors_battery() + # Status codes: + # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx + self.assertEqual(battery_psutil.power_plugged, + battery_wmi.BatteryStatus == 2) + + def test_emulate_no_battery(self): + with mock.patch("psutil._pswindows.cext.sensors_battery", + return_value=(0, 128, 0, 0)) as m: + self.assertIsNone(psutil.sensors_battery()) + assert m.called + + def test_emulate_power_connected(self): + with mock.patch("psutil._pswindows.cext.sensors_battery", + return_value=(1, 0, 0, 0)) as m: + self.assertEqual(psutil.sensors_battery().secsleft, + psutil.POWER_TIME_UNLIMITED) + assert m.called + + def test_emulate_power_charging(self): + with mock.patch("psutil._pswindows.cext.sensors_battery", + return_value=(0, 8, 0, 0)) as m: + self.assertEqual(psutil.sensors_battery().secsleft, + psutil.POWER_TIME_UNLIMITED) + assert m.called + + def test_emulate_secs_left_unknown(self): + with mock.patch("psutil._pswindows.cext.sensors_battery", + return_value=(0, 0, 0, -1)) as m: + self.assertEqual(psutil.sensors_battery().secsleft, + psutil.POWER_TIME_UNKNOWN) + assert m.called + + +# =================================================================== +# Process APIs +# =================================================================== + + +@unittest.skipIf(not WINDOWS, "WINDOWS only") +class TestProcess(TestCase): + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess().pid + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_issue_24(self): + p = psutil.Process(0) + self.assertRaises(psutil.AccessDenied, p.kill) + + def test_special_pid(self): + p = psutil.Process(4) + self.assertEqual(p.name(), 'System') + # use __str__ to access all common Process properties to check + # that nothing strange happens + str(p) + p.username() + self.assertTrue(p.create_time() >= 0.0) + try: + rss, vms = p.memory_info()[:2] + except psutil.AccessDenied: + # expected on Windows Vista and Windows 7 + if not platform.uname()[1] in ('vista', 'win-7', 'win7'): + raise + else: + self.assertTrue(rss > 0) + + def test_send_signal(self): + p = psutil.Process(self.pid) + self.assertRaises(ValueError, p.send_signal, signal.SIGINT) + + def test_num_handles_increment(self): + p = psutil.Process(os.getpid()) + before = p.num_handles() + handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, + win32con.FALSE, os.getpid()) + after = p.num_handles() + self.assertEqual(after, before + 1) + win32api.CloseHandle(handle) + self.assertEqual(p.num_handles(), before) + + def test_handles_leak(self): + # Call all Process methods and make sure no handles are left + # open. This is here mainly to make sure functions using + # OpenProcess() always call CloseHandle(). + def call(p, attr): + attr = getattr(p, name, None) + if attr is not None and callable(attr): + attr() + else: + attr + + p = psutil.Process(self.pid) + failures = [] + for name in dir(psutil.Process): + if name.startswith('_') \ + or name in ('terminate', 'kill', 'suspend', 'resume', + 'nice', 'send_signal', 'wait', 'children', + 'as_dict', 'memory_info_ex'): + continue + else: + try: + call(p, name) + num1 = p.num_handles() + call(p, name) + num2 = p.num_handles() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + else: + if num2 > num1: + fail = \ + "failure while processing Process.%s method " \ + "(before=%s, after=%s)" % (name, num1, num2) + failures.append(fail) + if failures: + self.fail('\n' + '\n'.join(failures)) + + @unittest.skipIf(not sys.version_info >= (2, 7), + "CTRL_* signals not supported") + def test_ctrl_signals(self): + p = psutil.Process(get_test_subprocess().pid) + p.send_signal(signal.CTRL_C_EVENT) + p.send_signal(signal.CTRL_BREAK_EVENT) + p.kill() + p.wait() + self.assertRaises(psutil.NoSuchProcess, + p.send_signal, signal.CTRL_C_EVENT) + self.assertRaises(psutil.NoSuchProcess, + p.send_signal, signal.CTRL_BREAK_EVENT) + + def test_username(self): + self.assertEqual(psutil.Process().username(), + win32api.GetUserNameEx(win32con.NameSamCompatible)) + + def test_cmdline(self): + sys_value = re.sub(' +', ' ', win32api.GetCommandLine()).strip() + psutil_value = ' '.join(psutil.Process().cmdline()) + self.assertEqual(sys_value, psutil_value) + + # XXX - occasional failures + + # def test_cpu_times(self): + # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, + # win32con.FALSE, os.getpid()) + # self.addCleanup(win32api.CloseHandle, handle) + # sys_value = win32process.GetProcessTimes(handle) + # psutil_value = psutil.Process().cpu_times() + # self.assertAlmostEqual( + # psutil_value.user, sys_value['UserTime'] / 10000000.0, + # delta=0.2) + # self.assertAlmostEqual( + # psutil_value.user, sys_value['KernelTime'] / 10000000.0, + # delta=0.2) + + def test_nice(self): + handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, + win32con.FALSE, os.getpid()) + self.addCleanup(win32api.CloseHandle, handle) + sys_value = win32process.GetPriorityClass(handle) + psutil_value = psutil.Process().nice() + self.assertEqual(psutil_value, sys_value) + + def test_memory_info(self): + handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, + win32con.FALSE, self.pid) + self.addCleanup(win32api.CloseHandle, handle) + sys_value = win32process.GetProcessMemoryInfo(handle) + psutil_value = psutil.Process(self.pid).memory_info() + self.assertEqual( + sys_value['PeakWorkingSetSize'], psutil_value.peak_wset) + self.assertEqual( + sys_value['WorkingSetSize'], psutil_value.wset) + self.assertEqual( + sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool) + self.assertEqual( + sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool) + self.assertEqual( + sys_value['QuotaPeakNonPagedPoolUsage'], + psutil_value.peak_nonpaged_pool) + self.assertEqual( + sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool) + self.assertEqual( + sys_value['PagefileUsage'], psutil_value.pagefile) + self.assertEqual( + sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile) + + self.assertEqual(psutil_value.rss, psutil_value.wset) + self.assertEqual(psutil_value.vms, psutil_value.pagefile) + + def test_wait(self): + handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, + win32con.FALSE, self.pid) + self.addCleanup(win32api.CloseHandle, handle) + p = psutil.Process(self.pid) + p.terminate() + psutil_value = p.wait() + sys_value = win32process.GetExitCodeProcess(handle) + self.assertEqual(psutil_value, sys_value) + + def test_cpu_affinity(self): + def from_bitmask(x): + return [i for i in range(64) if (1 << i) & x] + + handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, + win32con.FALSE, self.pid) + self.addCleanup(win32api.CloseHandle, handle) + sys_value = from_bitmask( + win32process.GetProcessAffinityMask(handle)[0]) + psutil_value = psutil.Process(self.pid).cpu_affinity() + self.assertEqual(psutil_value, sys_value) + + def test_io_counters(self): + handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, + win32con.FALSE, os.getpid()) + self.addCleanup(win32api.CloseHandle, handle) + sys_value = win32process.GetProcessIoCounters(handle) + psutil_value = psutil.Process().io_counters() + self.assertEqual( + psutil_value.read_count, sys_value['ReadOperationCount']) + self.assertEqual( + psutil_value.write_count, sys_value['WriteOperationCount']) + self.assertEqual( + psutil_value.read_bytes, sys_value['ReadTransferCount']) + self.assertEqual( + psutil_value.write_bytes, sys_value['WriteTransferCount']) + self.assertEqual( + psutil_value.other_count, sys_value['OtherOperationCount']) + self.assertEqual( + psutil_value.other_bytes, sys_value['OtherTransferCount']) + + def test_num_handles(self): + import ctypes + import ctypes.wintypes + PROCESS_QUERY_INFORMATION = 0x400 + handle = ctypes.windll.kernel32.OpenProcess( + PROCESS_QUERY_INFORMATION, 0, self.pid) + self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle) + + hndcnt = ctypes.wintypes.DWORD() + ctypes.windll.kernel32.GetProcessHandleCount( + handle, ctypes.byref(hndcnt)) + sys_value = hndcnt.value + psutil_value = psutil.Process(self.pid).num_handles() + self.assertEqual(psutil_value, sys_value) + + def test_error_partial_copy(self): + # https://github.com/giampaolo/psutil/issues/875 + exc = WindowsError() + exc.winerror = 299 + with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc): + with mock.patch("time.sleep") as m: + p = psutil.Process() + self.assertRaises(psutil.AccessDenied, p.cwd) + self.assertGreaterEqual(m.call_count, 5) + + def test_exe(self): + # NtQuerySystemInformation succeeds if process is gone. Make sure + # it raises NSP for a non existent pid. + pid = psutil.pids()[-1] + 99999 + proc = psutil._psplatform.Process(pid) + self.assertRaises(psutil.NoSuchProcess, proc.exe) + + +@unittest.skipIf(not WINDOWS, "WINDOWS only") +class TestProcessWMI(TestCase): + """Compare Process API results with WMI.""" + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess().pid + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_name(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(p.name(), w.Caption) + + def test_exe(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + # Note: wmi reports the exe as a lower case string. + # Being Windows paths case-insensitive we ignore that. + self.assertEqual(p.exe().lower(), w.ExecutablePath.lower()) + + def test_cmdline(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(' '.join(p.cmdline()), + w.CommandLine.replace('"', '')) + + def test_username(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + domain, _, username = w.GetOwner() + username = "%s\\%s" % (domain, username) + self.assertEqual(p.username(), username) + + def test_memory_rss(self): + time.sleep(0.1) + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + rss = p.memory_info().rss + self.assertEqual(rss, int(w.WorkingSetSize)) + + def test_memory_vms(self): + time.sleep(0.1) + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + vms = p.memory_info().vms + # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx + # ...claims that PageFileUsage is represented in Kilo + # bytes but funnily enough on certain platforms bytes are + # returned instead. + wmi_usage = int(w.PageFileUsage) + if (vms != wmi_usage) and (vms != wmi_usage * 1024): + self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) + + def test_create_time(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + wmic_create = str(w.CreationDate.split('.')[0]) + psutil_create = time.strftime("%Y%m%d%H%M%S", + time.localtime(p.create_time())) + self.assertEqual(wmic_create, psutil_create) + + +@unittest.skipIf(not WINDOWS, "WINDOWS only") +class TestDualProcessImplementation(TestCase): + """ + Certain APIs on Windows have 2 internal implementations, one + based on documented Windows APIs, another one based + NtQuerySystemInformation() which gets called as fallback in + case the first fails because of limited permission error. + Here we test that the two methods return the exact same value, + see: + https://github.com/giampaolo/psutil/issues/304 + """ + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess().pid + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_memory_info(self): + mem_1 = psutil.Process(self.pid).memory_info() + with mock.patch("psutil._psplatform.cext.proc_memory_info", + side_effect=OSError(errno.EPERM, "msg")) as fun: + mem_2 = psutil.Process(self.pid).memory_info() + self.assertEqual(len(mem_1), len(mem_2)) + for i in range(len(mem_1)): + self.assertGreaterEqual(mem_1[i], 0) + self.assertGreaterEqual(mem_2[i], 0) + self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512) + assert fun.called + + def test_create_time(self): + ctime = psutil.Process(self.pid).create_time() + with mock.patch("psutil._psplatform.cext.proc_times", + side_effect=OSError(errno.EPERM, "msg")) as fun: + self.assertEqual(psutil.Process(self.pid).create_time(), ctime) + assert fun.called + + def test_cpu_times(self): + cpu_times_1 = psutil.Process(self.pid).cpu_times() + with mock.patch("psutil._psplatform.cext.proc_times", + side_effect=OSError(errno.EPERM, "msg")) as fun: + cpu_times_2 = psutil.Process(self.pid).cpu_times() + assert fun.called + self.assertAlmostEqual( + cpu_times_1.user, cpu_times_2.user, delta=0.01) + self.assertAlmostEqual( + cpu_times_1.system, cpu_times_2.system, delta=0.01) + + def test_io_counters(self): + io_counters_1 = psutil.Process(self.pid).io_counters() + with mock.patch("psutil._psplatform.cext.proc_io_counters", + side_effect=OSError(errno.EPERM, "msg")) as fun: + io_counters_2 = psutil.Process(self.pid).io_counters() + for i in range(len(io_counters_1)): + self.assertAlmostEqual( + io_counters_1[i], io_counters_2[i], delta=5) + assert fun.called + + def test_num_handles(self): + num_handles = psutil.Process(self.pid).num_handles() + with mock.patch("psutil._psplatform.cext.proc_num_handles", + side_effect=OSError(errno.EPERM, "msg")) as fun: + self.assertEqual(psutil.Process(self.pid).num_handles(), + num_handles) + assert fun.called + + def test_cmdline(self): + from psutil._pswindows import convert_oserror + for pid in psutil.pids(): + try: + a = cext.proc_cmdline(pid, use_peb=True) + b = cext.proc_cmdline(pid, use_peb=False) + except OSError as err: + err = convert_oserror(err) + if not isinstance(err, (psutil.AccessDenied, + psutil.NoSuchProcess)): + raise + else: + self.assertEqual(a, b) + + +@unittest.skipIf(not WINDOWS, "WINDOWS only") +class RemoteProcessTestCase(TestCase): + """Certain functions require calling ReadProcessMemory. + This trivially works when called on the current process. + Check that this works on other processes, especially when they + have a different bitness. + """ + + @staticmethod + def find_other_interpreter(): + # find a python interpreter that is of the opposite bitness from us + code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))" + + # XXX: a different and probably more stable approach might be to access + # the registry but accessing 64 bit paths from a 32 bit process + for filename in glob.glob(r"C:\Python*\python.exe"): + proc = subprocess.Popen(args=[filename, "-c", code], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + output, _ = proc.communicate() + if output == str(not IS_64_BIT): + return filename + + @classmethod + def setUpClass(cls): + other_python = cls.find_other_interpreter() + + if other_python is None: + raise unittest.SkipTest( + "could not find interpreter with opposite bitness") + + if IS_64_BIT: + cls.python64 = sys.executable + cls.python32 = other_python + else: + cls.python64 = other_python + cls.python32 = sys.executable + + test_args = ["-c", "import sys; sys.stdin.read()"] + + def setUp(self): + env = os.environ.copy() + env["THINK_OF_A_NUMBER"] = str(os.getpid()) + self.proc32 = get_test_subprocess([self.python32] + self.test_args, + env=env, + stdin=subprocess.PIPE) + self.proc64 = get_test_subprocess([self.python64] + self.test_args, + env=env, + stdin=subprocess.PIPE) + + def tearDown(self): + self.proc32.communicate() + self.proc64.communicate() + reap_children() + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_cmdline_32(self): + p = psutil.Process(self.proc32.pid) + self.assertEqual(len(p.cmdline()), 3) + self.assertEqual(p.cmdline()[1:], self.test_args) + + def test_cmdline_64(self): + p = psutil.Process(self.proc64.pid) + self.assertEqual(len(p.cmdline()), 3) + self.assertEqual(p.cmdline()[1:], self.test_args) + + def test_cwd_32(self): + p = psutil.Process(self.proc32.pid) + self.assertEqual(p.cwd(), os.getcwd()) + + def test_cwd_64(self): + p = psutil.Process(self.proc64.pid) + self.assertEqual(p.cwd(), os.getcwd()) + + def test_environ_32(self): + p = psutil.Process(self.proc32.pid) + e = p.environ() + self.assertIn("THINK_OF_A_NUMBER", e) + self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid())) + + def test_environ_64(self): + p = psutil.Process(self.proc64.pid) + try: + p.environ() + except psutil.AccessDenied: + pass + + +# =================================================================== +# Windows services +# =================================================================== + + +@unittest.skipIf(not WINDOWS, "WINDOWS only") +class TestServices(TestCase): + + def test_win_service_iter(self): + valid_statuses = set([ + "running", + "paused", + "start", + "pause", + "continue", + "stop", + "stopped", + ]) + valid_start_types = set([ + "automatic", + "manual", + "disabled", + ]) + valid_statuses = set([ + "running", + "paused", + "start_pending", + "pause_pending", + "continue_pending", + "stop_pending", + "stopped" + ]) + for serv in psutil.win_service_iter(): + data = serv.as_dict() + self.assertIsInstance(data['name'], str) + self.assertNotEqual(data['name'].strip(), "") + self.assertIsInstance(data['display_name'], str) + self.assertIsInstance(data['username'], str) + self.assertIn(data['status'], valid_statuses) + if data['pid'] is not None: + psutil.Process(data['pid']) + self.assertIsInstance(data['binpath'], str) + self.assertIsInstance(data['username'], str) + self.assertIsInstance(data['start_type'], str) + self.assertIn(data['start_type'], valid_start_types) + self.assertIn(data['status'], valid_statuses) + self.assertIsInstance(data['description'], str) + pid = serv.pid() + if pid is not None: + p = psutil.Process(pid) + self.assertTrue(p.is_running()) + # win_service_get + s = psutil.win_service_get(serv.name()) + # test __eq__ + self.assertEqual(serv, s) + + def test_win_service_get(self): + ERROR_SERVICE_DOES_NOT_EXIST = \ + psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST + ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED + + name = next(psutil.win_service_iter()).name() + with self.assertRaises(psutil.NoSuchProcess) as cm: + psutil.win_service_get(name + '???') + self.assertEqual(cm.exception.name, name + '???') + + # test NoSuchProcess + service = psutil.win_service_get(name) + if PY3: + args = (0, "msg", 0, ERROR_SERVICE_DOES_NOT_EXIST) + else: + args = (ERROR_SERVICE_DOES_NOT_EXIST, "msg") + exc = WindowsError(*args) + with mock.patch("psutil._psplatform.cext.winservice_query_status", + side_effect=exc): + self.assertRaises(psutil.NoSuchProcess, service.status) + with mock.patch("psutil._psplatform.cext.winservice_query_config", + side_effect=exc): + self.assertRaises(psutil.NoSuchProcess, service.username) + + # test AccessDenied + if PY3: + args = (0, "msg", 0, ERROR_ACCESS_DENIED) + else: + args = (ERROR_ACCESS_DENIED, "msg") + exc = WindowsError(*args) + with mock.patch("psutil._psplatform.cext.winservice_query_status", + side_effect=exc): + self.assertRaises(psutil.AccessDenied, service.status) + with mock.patch("psutil._psplatform.cext.winservice_query_config", + side_effect=exc): + self.assertRaises(psutil.AccessDenied, service.username) + + # test __str__ and __repr__ + self.assertIn(service.name(), str(service)) + self.assertIn(service.display_name(), str(service)) + self.assertIn(service.name(), repr(service)) + self.assertIn(service.display_name(), repr(service)) + + +if __name__ == '__main__': + from psutil.tests.runner import run + run(__file__)