Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_testutils.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 #!/usr/bin/env python3 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
5 # Use of this source code is governed by a BSD-style license that can be | |
6 # found in the LICENSE file. | |
7 | |
8 """ | |
9 Tests for testing utils (psutil.tests namespace). | |
10 """ | |
11 | |
12 import collections | |
13 import contextlib | |
14 import errno | |
15 import os | |
16 import socket | |
17 import stat | |
18 import subprocess | |
19 | |
20 from psutil import FREEBSD | |
21 from psutil import NETBSD | |
22 from psutil import POSIX | |
23 from psutil._common import open_binary | |
24 from psutil._common import open_text | |
25 from psutil._common import supports_ipv6 | |
26 from psutil.tests import bind_socket | |
27 from psutil.tests import bind_unix_socket | |
28 from psutil.tests import call_until | |
29 from psutil.tests import chdir | |
30 from psutil.tests import CI_TESTING | |
31 from psutil.tests import create_sockets | |
32 from psutil.tests import get_free_port | |
33 from psutil.tests import HAS_CONNECTIONS_UNIX | |
34 from psutil.tests import is_namedtuple | |
35 from psutil.tests import mock | |
36 from psutil.tests import process_namespace | |
37 from psutil.tests import PsutilTestCase | |
38 from psutil.tests import PYTHON_EXE | |
39 from psutil.tests import reap_children | |
40 from psutil.tests import retry | |
41 from psutil.tests import retry_on_failure | |
42 from psutil.tests import safe_mkdir | |
43 from psutil.tests import safe_rmpath | |
44 from psutil.tests import serialrun | |
45 from psutil.tests import system_namespace | |
46 from psutil.tests import tcp_socketpair | |
47 from psutil.tests import terminate | |
48 from psutil.tests import TestMemoryLeak | |
49 from psutil.tests import unittest | |
50 from psutil.tests import unix_socketpair | |
51 from psutil.tests import wait_for_file | |
52 from psutil.tests import wait_for_pid | |
53 import psutil | |
54 import psutil.tests | |
55 | |
56 # =================================================================== | |
57 # --- Unit tests for test utilities. | |
58 # =================================================================== | |
59 | |
60 | |
61 class TestRetryDecorator(PsutilTestCase): | |
62 | |
63 @mock.patch('time.sleep') | |
64 def test_retry_success(self, sleep): | |
65 # Fail 3 times out of 5; make sure the decorated fun returns. | |
66 | |
67 @retry(retries=5, interval=1, logfun=None) | |
68 def foo(): | |
69 while queue: | |
70 queue.pop() | |
71 1 / 0 | |
72 return 1 | |
73 | |
74 queue = list(range(3)) | |
75 self.assertEqual(foo(), 1) | |
76 self.assertEqual(sleep.call_count, 3) | |
77 | |
78 @mock.patch('time.sleep') | |
79 def test_retry_failure(self, sleep): | |
80 # Fail 6 times out of 5; th function is supposed to raise exc. | |
81 @retry(retries=5, interval=1, logfun=None) | |
82 def foo(): | |
83 while queue: | |
84 queue.pop() | |
85 1 / 0 | |
86 return 1 | |
87 | |
88 queue = list(range(6)) | |
89 self.assertRaises(ZeroDivisionError, foo) | |
90 self.assertEqual(sleep.call_count, 5) | |
91 | |
92 @mock.patch('time.sleep') | |
93 def test_exception_arg(self, sleep): | |
94 @retry(exception=ValueError, interval=1) | |
95 def foo(): | |
96 raise TypeError | |
97 | |
98 self.assertRaises(TypeError, foo) | |
99 self.assertEqual(sleep.call_count, 0) | |
100 | |
101 @mock.patch('time.sleep') | |
102 def test_no_interval_arg(self, sleep): | |
103 # if interval is not specified sleep is not supposed to be called | |
104 | |
105 @retry(retries=5, interval=None, logfun=None) | |
106 def foo(): | |
107 1 / 0 | |
108 | |
109 self.assertRaises(ZeroDivisionError, foo) | |
110 self.assertEqual(sleep.call_count, 0) | |
111 | |
112 @mock.patch('time.sleep') | |
113 def test_retries_arg(self, sleep): | |
114 | |
115 @retry(retries=5, interval=1, logfun=None) | |
116 def foo(): | |
117 1 / 0 | |
118 | |
119 self.assertRaises(ZeroDivisionError, foo) | |
120 self.assertEqual(sleep.call_count, 5) | |
121 | |
122 @mock.patch('time.sleep') | |
123 def test_retries_and_timeout_args(self, sleep): | |
124 self.assertRaises(ValueError, retry, retries=5, timeout=1) | |
125 | |
126 | |
127 class TestSyncTestUtils(PsutilTestCase): | |
128 | |
129 def test_wait_for_pid(self): | |
130 wait_for_pid(os.getpid()) | |
131 nopid = max(psutil.pids()) + 99999 | |
132 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): | |
133 self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid) | |
134 | |
135 def test_wait_for_file(self): | |
136 testfn = self.get_testfn() | |
137 with open(testfn, 'w') as f: | |
138 f.write('foo') | |
139 wait_for_file(testfn) | |
140 assert not os.path.exists(testfn) | |
141 | |
142 def test_wait_for_file_empty(self): | |
143 testfn = self.get_testfn() | |
144 with open(testfn, 'w'): | |
145 pass | |
146 wait_for_file(testfn, empty=True) | |
147 assert not os.path.exists(testfn) | |
148 | |
149 def test_wait_for_file_no_file(self): | |
150 testfn = self.get_testfn() | |
151 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): | |
152 self.assertRaises(IOError, wait_for_file, testfn) | |
153 | |
154 def test_wait_for_file_no_delete(self): | |
155 testfn = self.get_testfn() | |
156 with open(testfn, 'w') as f: | |
157 f.write('foo') | |
158 wait_for_file(testfn, delete=False) | |
159 assert os.path.exists(testfn) | |
160 | |
161 def test_call_until(self): | |
162 ret = call_until(lambda: 1, "ret == 1") | |
163 self.assertEqual(ret, 1) | |
164 | |
165 | |
166 class TestFSTestUtils(PsutilTestCase): | |
167 | |
168 def test_open_text(self): | |
169 with open_text(__file__) as f: | |
170 self.assertEqual(f.mode, 'rt') | |
171 | |
172 def test_open_binary(self): | |
173 with open_binary(__file__) as f: | |
174 self.assertEqual(f.mode, 'rb') | |
175 | |
176 def test_safe_mkdir(self): | |
177 testfn = self.get_testfn() | |
178 safe_mkdir(testfn) | |
179 assert os.path.isdir(testfn) | |
180 safe_mkdir(testfn) | |
181 assert os.path.isdir(testfn) | |
182 | |
183 def test_safe_rmpath(self): | |
184 # test file is removed | |
185 testfn = self.get_testfn() | |
186 open(testfn, 'w').close() | |
187 safe_rmpath(testfn) | |
188 assert not os.path.exists(testfn) | |
189 # test no exception if path does not exist | |
190 safe_rmpath(testfn) | |
191 # test dir is removed | |
192 os.mkdir(testfn) | |
193 safe_rmpath(testfn) | |
194 assert not os.path.exists(testfn) | |
195 # test other exceptions are raised | |
196 with mock.patch('psutil.tests.os.stat', | |
197 side_effect=OSError(errno.EINVAL, "")) as m: | |
198 with self.assertRaises(OSError): | |
199 safe_rmpath(testfn) | |
200 assert m.called | |
201 | |
202 def test_chdir(self): | |
203 testfn = self.get_testfn() | |
204 base = os.getcwd() | |
205 os.mkdir(testfn) | |
206 with chdir(testfn): | |
207 self.assertEqual(os.getcwd(), os.path.join(base, testfn)) | |
208 self.assertEqual(os.getcwd(), base) | |
209 | |
210 | |
211 class TestProcessUtils(PsutilTestCase): | |
212 | |
213 def test_reap_children(self): | |
214 subp = self.spawn_testproc() | |
215 p = psutil.Process(subp.pid) | |
216 assert p.is_running() | |
217 reap_children() | |
218 assert not p.is_running() | |
219 assert not psutil.tests._pids_started | |
220 assert not psutil.tests._subprocesses_started | |
221 | |
222 def test_spawn_children_pair(self): | |
223 child, grandchild = self.spawn_children_pair() | |
224 self.assertNotEqual(child.pid, grandchild.pid) | |
225 assert child.is_running() | |
226 assert grandchild.is_running() | |
227 children = psutil.Process().children() | |
228 self.assertEqual(children, [child]) | |
229 children = psutil.Process().children(recursive=True) | |
230 self.assertEqual(len(children), 2) | |
231 self.assertIn(child, children) | |
232 self.assertIn(grandchild, children) | |
233 self.assertEqual(child.ppid(), os.getpid()) | |
234 self.assertEqual(grandchild.ppid(), child.pid) | |
235 | |
236 terminate(child) | |
237 assert not child.is_running() | |
238 assert grandchild.is_running() | |
239 | |
240 terminate(grandchild) | |
241 assert not grandchild.is_running() | |
242 | |
243 @unittest.skipIf(not POSIX, "POSIX only") | |
244 def test_spawn_zombie(self): | |
245 parent, zombie = self.spawn_zombie() | |
246 self.assertEqual(zombie.status(), psutil.STATUS_ZOMBIE) | |
247 | |
248 def test_terminate(self): | |
249 # by subprocess.Popen | |
250 p = self.spawn_testproc() | |
251 terminate(p) | |
252 self.assertProcessGone(p) | |
253 terminate(p) | |
254 # by psutil.Process | |
255 p = psutil.Process(self.spawn_testproc().pid) | |
256 terminate(p) | |
257 self.assertProcessGone(p) | |
258 terminate(p) | |
259 # by psutil.Popen | |
260 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] | |
261 p = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
262 terminate(p) | |
263 self.assertProcessGone(p) | |
264 terminate(p) | |
265 # by PID | |
266 pid = self.spawn_testproc().pid | |
267 terminate(pid) | |
268 self.assertProcessGone(p) | |
269 terminate(pid) | |
270 # zombie | |
271 if POSIX: | |
272 parent, zombie = self.spawn_zombie() | |
273 terminate(parent) | |
274 terminate(zombie) | |
275 self.assertProcessGone(parent) | |
276 self.assertProcessGone(zombie) | |
277 | |
278 | |
279 class TestNetUtils(PsutilTestCase): | |
280 | |
281 def bind_socket(self): | |
282 port = get_free_port() | |
283 with contextlib.closing(bind_socket(addr=('', port))) as s: | |
284 self.assertEqual(s.getsockname()[1], port) | |
285 | |
286 @unittest.skipIf(not POSIX, "POSIX only") | |
287 def test_bind_unix_socket(self): | |
288 name = self.get_testfn() | |
289 sock = bind_unix_socket(name) | |
290 with contextlib.closing(sock): | |
291 self.assertEqual(sock.family, socket.AF_UNIX) | |
292 self.assertEqual(sock.type, socket.SOCK_STREAM) | |
293 self.assertEqual(sock.getsockname(), name) | |
294 assert os.path.exists(name) | |
295 assert stat.S_ISSOCK(os.stat(name).st_mode) | |
296 # UDP | |
297 name = self.get_testfn() | |
298 sock = bind_unix_socket(name, type=socket.SOCK_DGRAM) | |
299 with contextlib.closing(sock): | |
300 self.assertEqual(sock.type, socket.SOCK_DGRAM) | |
301 | |
302 def tcp_tcp_socketpair(self): | |
303 addr = ("127.0.0.1", get_free_port()) | |
304 server, client = tcp_socketpair(socket.AF_INET, addr=addr) | |
305 with contextlib.closing(server): | |
306 with contextlib.closing(client): | |
307 # Ensure they are connected and the positions are | |
308 # correct. | |
309 self.assertEqual(server.getsockname(), addr) | |
310 self.assertEqual(client.getpeername(), addr) | |
311 self.assertNotEqual(client.getsockname(), addr) | |
312 | |
313 @unittest.skipIf(not POSIX, "POSIX only") | |
314 @unittest.skipIf(NETBSD or FREEBSD, | |
315 "/var/run/log UNIX socket opened by default") | |
316 def test_unix_socketpair(self): | |
317 p = psutil.Process() | |
318 num_fds = p.num_fds() | |
319 assert not p.connections(kind='unix') | |
320 name = self.get_testfn() | |
321 server, client = unix_socketpair(name) | |
322 try: | |
323 assert os.path.exists(name) | |
324 assert stat.S_ISSOCK(os.stat(name).st_mode) | |
325 self.assertEqual(p.num_fds() - num_fds, 2) | |
326 self.assertEqual(len(p.connections(kind='unix')), 2) | |
327 self.assertEqual(server.getsockname(), name) | |
328 self.assertEqual(client.getpeername(), name) | |
329 finally: | |
330 client.close() | |
331 server.close() | |
332 | |
333 def test_create_sockets(self): | |
334 with create_sockets() as socks: | |
335 fams = collections.defaultdict(int) | |
336 types = collections.defaultdict(int) | |
337 for s in socks: | |
338 fams[s.family] += 1 | |
339 # work around http://bugs.python.org/issue30204 | |
340 types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1 | |
341 self.assertGreaterEqual(fams[socket.AF_INET], 2) | |
342 if supports_ipv6(): | |
343 self.assertGreaterEqual(fams[socket.AF_INET6], 2) | |
344 if POSIX and HAS_CONNECTIONS_UNIX: | |
345 self.assertGreaterEqual(fams[socket.AF_UNIX], 2) | |
346 self.assertGreaterEqual(types[socket.SOCK_STREAM], 2) | |
347 self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2) | |
348 | |
349 | |
350 @serialrun | |
351 class TestMemLeakClass(TestMemoryLeak): | |
352 | |
353 def test_times(self): | |
354 def fun(): | |
355 cnt['cnt'] += 1 | |
356 cnt = {'cnt': 0} | |
357 self.execute(fun, times=10, warmup_times=15) | |
358 self.assertEqual(cnt['cnt'], 26) | |
359 | |
360 def test_param_err(self): | |
361 self.assertRaises(ValueError, self.execute, lambda: 0, times=0) | |
362 self.assertRaises(ValueError, self.execute, lambda: 0, times=-1) | |
363 self.assertRaises(ValueError, self.execute, lambda: 0, warmup_times=-1) | |
364 self.assertRaises(ValueError, self.execute, lambda: 0, tolerance=-1) | |
365 self.assertRaises(ValueError, self.execute, lambda: 0, retries=-1) | |
366 | |
367 @retry_on_failure() | |
368 @unittest.skipIf(CI_TESTING, "skipped on CI") | |
369 def test_leak_mem(self): | |
370 ls = [] | |
371 | |
372 def fun(ls=ls): | |
373 ls.append("x" * 24 * 1024) | |
374 | |
375 try: | |
376 # will consume around 3M in total | |
377 self.assertRaisesRegex(AssertionError, "extra-mem", | |
378 self.execute, fun, times=50) | |
379 finally: | |
380 del ls | |
381 | |
382 def test_unclosed_files(self): | |
383 def fun(): | |
384 f = open(__file__) | |
385 self.addCleanup(f.close) | |
386 box.append(f) | |
387 | |
388 box = [] | |
389 kind = "fd" if POSIX else "handle" | |
390 self.assertRaisesRegex(AssertionError, "unclosed " + kind, | |
391 self.execute, fun) | |
392 | |
393 def test_tolerance(self): | |
394 def fun(): | |
395 ls.append("x" * 24 * 1024) | |
396 ls = [] | |
397 times = 100 | |
398 self.execute(fun, times=times, warmup_times=0, | |
399 tolerance=200 * 1024 * 1024) | |
400 self.assertEqual(len(ls), times + 1) | |
401 | |
402 def test_execute_w_exc(self): | |
403 def fun(): | |
404 1 / 0 | |
405 self.execute_w_exc(ZeroDivisionError, fun) | |
406 with self.assertRaises(ZeroDivisionError): | |
407 self.execute_w_exc(OSError, fun) | |
408 | |
409 def fun(): | |
410 pass | |
411 with self.assertRaises(AssertionError): | |
412 self.execute_w_exc(ZeroDivisionError, fun) | |
413 | |
414 | |
415 class TestTestingUtils(PsutilTestCase): | |
416 | |
417 def test_process_namespace(self): | |
418 p = psutil.Process() | |
419 ns = process_namespace(p) | |
420 ns.test() | |
421 fun = [x for x in ns.iter(ns.getters) if x[1] == 'ppid'][0][0] | |
422 self.assertEqual(fun(), p.ppid()) | |
423 | |
424 def test_system_namespace(self): | |
425 ns = system_namespace() | |
426 fun = [x for x in ns.iter(ns.getters) if x[1] == 'net_if_addrs'][0][0] | |
427 self.assertEqual(fun(), psutil.net_if_addrs()) | |
428 | |
429 | |
430 class TestOtherUtils(PsutilTestCase): | |
431 | |
432 def test_is_namedtuple(self): | |
433 assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3)) | |
434 assert not is_namedtuple(tuple()) | |
435 | |
436 | |
437 if __name__ == '__main__': | |
438 from psutil.tests.runner import run_from_name | |
439 run_from_name(__file__) |