Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_memleaks.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
4 # Use of this source code is governed by a BSD-style license that can be | |
5 # found in the LICENSE file. | |
6 | |
7 """ | |
8 Tests for detecting function memory leaks (typically the ones | |
9 implemented in C). It does so by calling a function many times and | |
10 checking whether process memory usage keeps increasing between | |
11 calls or over time. | |
12 Note that this may produce false positives (especially on Windows | |
13 for some reason). | |
14 PyPy appears to be completely unstable for this framework, probably | |
15 because of how its JIT handles memory, so tests are skipped. | |
16 """ | |
17 | |
18 from __future__ import print_function | |
19 import functools | |
20 import os | |
21 | |
22 import psutil | |
23 import psutil._common | |
24 from psutil import LINUX | |
25 from psutil import MACOS | |
26 from psutil import OPENBSD | |
27 from psutil import POSIX | |
28 from psutil import SUNOS | |
29 from psutil import WINDOWS | |
30 from psutil._compat import ProcessLookupError | |
31 from psutil._compat import super | |
32 from psutil.tests import create_sockets | |
33 from psutil.tests import get_testfn | |
34 from psutil.tests import HAS_CPU_AFFINITY | |
35 from psutil.tests import HAS_CPU_FREQ | |
36 from psutil.tests import HAS_ENVIRON | |
37 from psutil.tests import HAS_IONICE | |
38 from psutil.tests import HAS_MEMORY_MAPS | |
39 from psutil.tests import HAS_NET_IO_COUNTERS | |
40 from psutil.tests import HAS_PROC_CPU_NUM | |
41 from psutil.tests import HAS_PROC_IO_COUNTERS | |
42 from psutil.tests import HAS_RLIMIT | |
43 from psutil.tests import HAS_SENSORS_BATTERY | |
44 from psutil.tests import HAS_SENSORS_FANS | |
45 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
46 from psutil.tests import process_namespace | |
47 from psutil.tests import skip_on_access_denied | |
48 from psutil.tests import spawn_testproc | |
49 from psutil.tests import system_namespace | |
50 from psutil.tests import terminate | |
51 from psutil.tests import TestMemoryLeak | |
52 from psutil.tests import TRAVIS | |
53 from psutil.tests import unittest | |
54 | |
55 | |
56 cext = psutil._psplatform.cext | |
57 thisproc = psutil.Process() | |
58 FEW_TIMES = 5 | |
59 | |
60 | |
61 def fewtimes_if_linux(): | |
62 """Decorator for those Linux functions which are implemented in pure | |
63 Python, and which we want to run faster. | |
64 """ | |
65 def decorator(fun): | |
66 @functools.wraps(fun) | |
67 def wrapper(self, *args, **kwargs): | |
68 if LINUX: | |
69 before = self.__class__.times | |
70 try: | |
71 self.__class__.times = FEW_TIMES | |
72 return fun(self, *args, **kwargs) | |
73 finally: | |
74 self.__class__.times = before | |
75 else: | |
76 return fun(self, *args, **kwargs) | |
77 return wrapper | |
78 return decorator | |
79 | |
80 | |
81 # =================================================================== | |
82 # Process class | |
83 # =================================================================== | |
84 | |
85 | |
86 class TestProcessObjectLeaks(TestMemoryLeak): | |
87 """Test leaks of Process class methods.""" | |
88 | |
89 proc = thisproc | |
90 | |
91 def test_coverage(self): | |
92 ns = process_namespace(None) | |
93 ns.test_class_coverage(self, ns.getters + ns.setters) | |
94 | |
95 @fewtimes_if_linux() | |
96 def test_name(self): | |
97 self.execute(self.proc.name) | |
98 | |
99 @fewtimes_if_linux() | |
100 def test_cmdline(self): | |
101 self.execute(self.proc.cmdline) | |
102 | |
103 @fewtimes_if_linux() | |
104 def test_exe(self): | |
105 self.execute(self.proc.exe) | |
106 | |
107 @fewtimes_if_linux() | |
108 def test_ppid(self): | |
109 self.execute(self.proc.ppid) | |
110 | |
111 @unittest.skipIf(not POSIX, "POSIX only") | |
112 @fewtimes_if_linux() | |
113 def test_uids(self): | |
114 self.execute(self.proc.uids) | |
115 | |
116 @unittest.skipIf(not POSIX, "POSIX only") | |
117 @fewtimes_if_linux() | |
118 def test_gids(self): | |
119 self.execute(self.proc.gids) | |
120 | |
121 @fewtimes_if_linux() | |
122 def test_status(self): | |
123 self.execute(self.proc.status) | |
124 | |
125 def test_nice(self): | |
126 self.execute(self.proc.nice) | |
127 | |
128 def test_nice_set(self): | |
129 niceness = thisproc.nice() | |
130 self.execute(lambda: self.proc.nice(niceness)) | |
131 | |
132 @unittest.skipIf(not HAS_IONICE, "not supported") | |
133 def test_ionice(self): | |
134 self.execute(self.proc.ionice) | |
135 | |
136 @unittest.skipIf(not HAS_IONICE, "not supported") | |
137 def test_ionice_set(self): | |
138 if WINDOWS: | |
139 value = thisproc.ionice() | |
140 self.execute(lambda: self.proc.ionice(value)) | |
141 else: | |
142 self.execute(lambda: self.proc.ionice(psutil.IOPRIO_CLASS_NONE)) | |
143 fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0) | |
144 self.execute_w_exc(OSError, fun) | |
145 | |
146 @unittest.skipIf(not HAS_PROC_IO_COUNTERS, "not supported") | |
147 @fewtimes_if_linux() | |
148 def test_io_counters(self): | |
149 self.execute(self.proc.io_counters) | |
150 | |
151 @unittest.skipIf(POSIX, "worthless on POSIX") | |
152 def test_username(self): | |
153 # always open 1 handle on Windows (only once) | |
154 psutil.Process().username() | |
155 self.execute(self.proc.username) | |
156 | |
157 @fewtimes_if_linux() | |
158 def test_create_time(self): | |
159 self.execute(self.proc.create_time) | |
160 | |
161 @fewtimes_if_linux() | |
162 @skip_on_access_denied(only_if=OPENBSD) | |
163 def test_num_threads(self): | |
164 self.execute(self.proc.num_threads) | |
165 | |
166 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
167 def test_num_handles(self): | |
168 self.execute(self.proc.num_handles) | |
169 | |
170 @unittest.skipIf(not POSIX, "POSIX only") | |
171 @fewtimes_if_linux() | |
172 def test_num_fds(self): | |
173 self.execute(self.proc.num_fds) | |
174 | |
175 @fewtimes_if_linux() | |
176 def test_num_ctx_switches(self): | |
177 self.execute(self.proc.num_ctx_switches) | |
178 | |
179 @fewtimes_if_linux() | |
180 @skip_on_access_denied(only_if=OPENBSD) | |
181 def test_threads(self): | |
182 self.execute(self.proc.threads) | |
183 | |
184 @fewtimes_if_linux() | |
185 def test_cpu_times(self): | |
186 self.execute(self.proc.cpu_times) | |
187 | |
188 @fewtimes_if_linux() | |
189 @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported") | |
190 def test_cpu_num(self): | |
191 self.execute(self.proc.cpu_num) | |
192 | |
193 @fewtimes_if_linux() | |
194 def test_memory_info(self): | |
195 self.execute(self.proc.memory_info) | |
196 | |
197 @fewtimes_if_linux() | |
198 def test_memory_full_info(self): | |
199 self.execute(self.proc.memory_full_info) | |
200 | |
201 @unittest.skipIf(not POSIX, "POSIX only") | |
202 @fewtimes_if_linux() | |
203 def test_terminal(self): | |
204 self.execute(self.proc.terminal) | |
205 | |
206 def test_resume(self): | |
207 times = FEW_TIMES if POSIX else self.times | |
208 self.execute(self.proc.resume, times=times) | |
209 | |
210 @fewtimes_if_linux() | |
211 def test_cwd(self): | |
212 self.execute(self.proc.cwd) | |
213 | |
214 @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported") | |
215 def test_cpu_affinity(self): | |
216 self.execute(self.proc.cpu_affinity) | |
217 | |
218 @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported") | |
219 def test_cpu_affinity_set(self): | |
220 affinity = thisproc.cpu_affinity() | |
221 self.execute(lambda: self.proc.cpu_affinity(affinity)) | |
222 if not TRAVIS: | |
223 self.execute_w_exc( | |
224 ValueError, lambda: self.proc.cpu_affinity([-1])) | |
225 | |
226 @fewtimes_if_linux() | |
227 def test_open_files(self): | |
228 with open(get_testfn(), 'w'): | |
229 self.execute(self.proc.open_files) | |
230 | |
231 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
232 @fewtimes_if_linux() | |
233 def test_memory_maps(self): | |
234 self.execute(self.proc.memory_maps) | |
235 | |
236 @unittest.skipIf(not LINUX, "LINUX only") | |
237 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
238 def test_rlimit(self): | |
239 self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE)) | |
240 | |
241 @unittest.skipIf(not LINUX, "LINUX only") | |
242 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
243 def test_rlimit_set(self): | |
244 limit = thisproc.rlimit(psutil.RLIMIT_NOFILE) | |
245 self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE, limit)) | |
246 self.execute_w_exc(OSError, lambda: self.proc.rlimit(-1)) | |
247 | |
248 @fewtimes_if_linux() | |
249 # Windows implementation is based on a single system-wide | |
250 # function (tested later). | |
251 @unittest.skipIf(WINDOWS, "worthless on WINDOWS") | |
252 def test_connections(self): | |
253 # TODO: UNIX sockets are temporarily implemented by parsing | |
254 # 'pfiles' cmd output; we don't want that part of the code to | |
255 # be executed. | |
256 with create_sockets(): | |
257 kind = 'inet' if SUNOS else 'all' | |
258 self.execute(lambda: self.proc.connections(kind)) | |
259 | |
260 @unittest.skipIf(not HAS_ENVIRON, "not supported") | |
261 def test_environ(self): | |
262 self.execute(self.proc.environ) | |
263 | |
264 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
265 def test_proc_info(self): | |
266 self.execute(lambda: cext.proc_info(os.getpid())) | |
267 | |
268 | |
269 class TestTerminatedProcessLeaks(TestProcessObjectLeaks): | |
270 """Repeat the tests above looking for leaks occurring when dealing | |
271 with terminated processes raising NoSuchProcess exception. | |
272 The C functions are still invoked but will follow different code | |
273 paths. We'll check those code paths. | |
274 """ | |
275 | |
276 @classmethod | |
277 def setUpClass(cls): | |
278 super().setUpClass() | |
279 cls.subp = spawn_testproc() | |
280 cls.proc = psutil.Process(cls.subp.pid) | |
281 cls.proc.kill() | |
282 cls.proc.wait() | |
283 | |
284 @classmethod | |
285 def tearDownClass(cls): | |
286 super().tearDownClass() | |
287 terminate(cls.subp) | |
288 | |
289 def call(self, fun): | |
290 try: | |
291 fun() | |
292 except psutil.NoSuchProcess: | |
293 pass | |
294 | |
295 if WINDOWS: | |
296 | |
297 def test_kill(self): | |
298 self.execute(self.proc.kill) | |
299 | |
300 def test_terminate(self): | |
301 self.execute(self.proc.terminate) | |
302 | |
303 def test_suspend(self): | |
304 self.execute(self.proc.suspend) | |
305 | |
306 def test_resume(self): | |
307 self.execute(self.proc.resume) | |
308 | |
309 def test_wait(self): | |
310 self.execute(self.proc.wait) | |
311 | |
312 def test_proc_info(self): | |
313 # test dual implementation | |
314 def call(): | |
315 try: | |
316 return cext.proc_info(self.proc.pid) | |
317 except ProcessLookupError: | |
318 pass | |
319 | |
320 self.execute(call) | |
321 | |
322 | |
323 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
324 class TestProcessDualImplementation(TestMemoryLeak): | |
325 | |
326 def test_cmdline_peb_true(self): | |
327 self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=True)) | |
328 | |
329 def test_cmdline_peb_false(self): | |
330 self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=False)) | |
331 | |
332 | |
333 # =================================================================== | |
334 # system APIs | |
335 # =================================================================== | |
336 | |
337 | |
338 class TestModuleFunctionsLeaks(TestMemoryLeak): | |
339 """Test leaks of psutil module functions.""" | |
340 | |
341 def test_coverage(self): | |
342 ns = system_namespace() | |
343 ns.test_class_coverage(self, ns.all) | |
344 | |
345 # --- cpu | |
346 | |
347 @fewtimes_if_linux() | |
348 def test_cpu_count(self): # logical | |
349 self.execute(lambda: psutil.cpu_count(logical=True)) | |
350 | |
351 @fewtimes_if_linux() | |
352 def test_cpu_count_physical(self): | |
353 self.execute(lambda: psutil.cpu_count(logical=False)) | |
354 | |
355 @fewtimes_if_linux() | |
356 def test_cpu_times(self): | |
357 self.execute(psutil.cpu_times) | |
358 | |
359 @fewtimes_if_linux() | |
360 def test_per_cpu_times(self): | |
361 self.execute(lambda: psutil.cpu_times(percpu=True)) | |
362 | |
363 @fewtimes_if_linux() | |
364 def test_cpu_stats(self): | |
365 self.execute(psutil.cpu_stats) | |
366 | |
367 @fewtimes_if_linux() | |
368 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
369 def test_cpu_freq(self): | |
370 self.execute(psutil.cpu_freq) | |
371 | |
372 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
373 def test_getloadavg(self): | |
374 psutil.getloadavg() | |
375 self.execute(psutil.getloadavg) | |
376 | |
377 # --- mem | |
378 | |
379 def test_virtual_memory(self): | |
380 self.execute(psutil.virtual_memory) | |
381 | |
382 # TODO: remove this skip when this gets fixed | |
383 @unittest.skipIf(SUNOS, "worthless on SUNOS (uses a subprocess)") | |
384 def test_swap_memory(self): | |
385 self.execute(psutil.swap_memory) | |
386 | |
387 def test_pid_exists(self): | |
388 times = FEW_TIMES if POSIX else self.times | |
389 self.execute(lambda: psutil.pid_exists(os.getpid()), times=times) | |
390 | |
391 # --- disk | |
392 | |
393 def test_disk_usage(self): | |
394 times = FEW_TIMES if POSIX else self.times | |
395 self.execute(lambda: psutil.disk_usage('.'), times=times) | |
396 | |
397 def test_disk_partitions(self): | |
398 self.execute(psutil.disk_partitions) | |
399 | |
400 @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), | |
401 '/proc/diskstats not available on this Linux version') | |
402 @fewtimes_if_linux() | |
403 def test_disk_io_counters(self): | |
404 self.execute(lambda: psutil.disk_io_counters(nowrap=False)) | |
405 | |
406 # --- proc | |
407 | |
408 @fewtimes_if_linux() | |
409 def test_pids(self): | |
410 self.execute(psutil.pids) | |
411 | |
412 # --- net | |
413 | |
414 @fewtimes_if_linux() | |
415 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
416 def test_net_io_counters(self): | |
417 self.execute(lambda: psutil.net_io_counters(nowrap=False)) | |
418 | |
419 @fewtimes_if_linux() | |
420 @unittest.skipIf(MACOS and os.getuid() != 0, "need root access") | |
421 def test_net_connections(self): | |
422 # always opens and handle on Windows() (once) | |
423 psutil.net_connections(kind='all') | |
424 with create_sockets(): | |
425 self.execute(lambda: psutil.net_connections(kind='all')) | |
426 | |
427 def test_net_if_addrs(self): | |
428 # Note: verified that on Windows this was a false positive. | |
429 tolerance = 80 * 1024 if WINDOWS else self.tolerance | |
430 self.execute(psutil.net_if_addrs, tolerance=tolerance) | |
431 | |
432 # @unittest.skipIf(TRAVIS, "EPERM on travis") | |
433 def test_net_if_stats(self): | |
434 self.execute(psutil.net_if_stats) | |
435 | |
436 # --- sensors | |
437 | |
438 @fewtimes_if_linux() | |
439 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
440 def test_sensors_battery(self): | |
441 self.execute(psutil.sensors_battery) | |
442 | |
443 @fewtimes_if_linux() | |
444 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
445 def test_sensors_temperatures(self): | |
446 self.execute(psutil.sensors_temperatures) | |
447 | |
448 @fewtimes_if_linux() | |
449 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
450 def test_sensors_fans(self): | |
451 self.execute(psutil.sensors_fans) | |
452 | |
453 # --- others | |
454 | |
455 @fewtimes_if_linux() | |
456 def test_boot_time(self): | |
457 self.execute(psutil.boot_time) | |
458 | |
459 def test_users(self): | |
460 self.execute(psutil.users) | |
461 | |
462 if WINDOWS: | |
463 | |
464 # --- win services | |
465 | |
466 def test_win_service_iter(self): | |
467 self.execute(cext.winservice_enumerate) | |
468 | |
469 def test_win_service_get(self): | |
470 pass | |
471 | |
472 def test_win_service_get_config(self): | |
473 name = next(psutil.win_service_iter()).name() | |
474 self.execute(lambda: cext.winservice_query_config(name)) | |
475 | |
476 def test_win_service_get_status(self): | |
477 name = next(psutil.win_service_iter()).name() | |
478 self.execute(lambda: cext.winservice_query_status(name)) | |
479 | |
480 def test_win_service_get_description(self): | |
481 name = next(psutil.win_service_iter()).name() | |
482 self.execute(lambda: cext.winservice_query_descr(name)) | |
483 | |
484 | |
485 if __name__ == '__main__': | |
486 from psutil.tests.runner import run_from_name | |
487 run_from_name(__file__) |