comparison planemo/lib/python3.7/site-packages/psutil/tests/test_testutils.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
5 # Use of this source code is governed by a BSD-style license that can be
6 # found in the LICENSE file.
7
8 """
9 Tests for testing utils (psutil.tests namespace).
10 """
11
12 import collections
13 import contextlib
14 import errno
15 import os
16 import socket
17 import stat
18 import subprocess
19
20 from psutil import FREEBSD
21 from psutil import NETBSD
22 from psutil import POSIX
23 from psutil._common import open_binary
24 from psutil._common import open_text
25 from psutil._common import supports_ipv6
26 from psutil.tests import bind_socket
27 from psutil.tests import bind_unix_socket
28 from psutil.tests import call_until
29 from psutil.tests import chdir
30 from psutil.tests import CI_TESTING
31 from psutil.tests import create_sockets
32 from psutil.tests import get_free_port
33 from psutil.tests import HAS_CONNECTIONS_UNIX
34 from psutil.tests import is_namedtuple
35 from psutil.tests import mock
36 from psutil.tests import process_namespace
37 from psutil.tests import PsutilTestCase
38 from psutil.tests import PYTHON_EXE
39 from psutil.tests import reap_children
40 from psutil.tests import retry
41 from psutil.tests import retry_on_failure
42 from psutil.tests import safe_mkdir
43 from psutil.tests import safe_rmpath
44 from psutil.tests import serialrun
45 from psutil.tests import system_namespace
46 from psutil.tests import tcp_socketpair
47 from psutil.tests import terminate
48 from psutil.tests import TestMemoryLeak
49 from psutil.tests import unittest
50 from psutil.tests import unix_socketpair
51 from psutil.tests import wait_for_file
52 from psutil.tests import wait_for_pid
53 import psutil
54 import psutil.tests
55
56 # ===================================================================
57 # --- Unit tests for test utilities.
58 # ===================================================================
59
60
61 class TestRetryDecorator(PsutilTestCase):
62
63 @mock.patch('time.sleep')
64 def test_retry_success(self, sleep):
65 # Fail 3 times out of 5; make sure the decorated fun returns.
66
67 @retry(retries=5, interval=1, logfun=None)
68 def foo():
69 while queue:
70 queue.pop()
71 1 / 0
72 return 1
73
74 queue = list(range(3))
75 self.assertEqual(foo(), 1)
76 self.assertEqual(sleep.call_count, 3)
77
78 @mock.patch('time.sleep')
79 def test_retry_failure(self, sleep):
80 # Fail 6 times out of 5; th function is supposed to raise exc.
81 @retry(retries=5, interval=1, logfun=None)
82 def foo():
83 while queue:
84 queue.pop()
85 1 / 0
86 return 1
87
88 queue = list(range(6))
89 self.assertRaises(ZeroDivisionError, foo)
90 self.assertEqual(sleep.call_count, 5)
91
92 @mock.patch('time.sleep')
93 def test_exception_arg(self, sleep):
94 @retry(exception=ValueError, interval=1)
95 def foo():
96 raise TypeError
97
98 self.assertRaises(TypeError, foo)
99 self.assertEqual(sleep.call_count, 0)
100
101 @mock.patch('time.sleep')
102 def test_no_interval_arg(self, sleep):
103 # if interval is not specified sleep is not supposed to be called
104
105 @retry(retries=5, interval=None, logfun=None)
106 def foo():
107 1 / 0
108
109 self.assertRaises(ZeroDivisionError, foo)
110 self.assertEqual(sleep.call_count, 0)
111
112 @mock.patch('time.sleep')
113 def test_retries_arg(self, sleep):
114
115 @retry(retries=5, interval=1, logfun=None)
116 def foo():
117 1 / 0
118
119 self.assertRaises(ZeroDivisionError, foo)
120 self.assertEqual(sleep.call_count, 5)
121
122 @mock.patch('time.sleep')
123 def test_retries_and_timeout_args(self, sleep):
124 self.assertRaises(ValueError, retry, retries=5, timeout=1)
125
126
127 class TestSyncTestUtils(PsutilTestCase):
128
129 def test_wait_for_pid(self):
130 wait_for_pid(os.getpid())
131 nopid = max(psutil.pids()) + 99999
132 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
133 self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
134
135 def test_wait_for_file(self):
136 testfn = self.get_testfn()
137 with open(testfn, 'w') as f:
138 f.write('foo')
139 wait_for_file(testfn)
140 assert not os.path.exists(testfn)
141
142 def test_wait_for_file_empty(self):
143 testfn = self.get_testfn()
144 with open(testfn, 'w'):
145 pass
146 wait_for_file(testfn, empty=True)
147 assert not os.path.exists(testfn)
148
149 def test_wait_for_file_no_file(self):
150 testfn = self.get_testfn()
151 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
152 self.assertRaises(IOError, wait_for_file, testfn)
153
154 def test_wait_for_file_no_delete(self):
155 testfn = self.get_testfn()
156 with open(testfn, 'w') as f:
157 f.write('foo')
158 wait_for_file(testfn, delete=False)
159 assert os.path.exists(testfn)
160
161 def test_call_until(self):
162 ret = call_until(lambda: 1, "ret == 1")
163 self.assertEqual(ret, 1)
164
165
166 class TestFSTestUtils(PsutilTestCase):
167
168 def test_open_text(self):
169 with open_text(__file__) as f:
170 self.assertEqual(f.mode, 'rt')
171
172 def test_open_binary(self):
173 with open_binary(__file__) as f:
174 self.assertEqual(f.mode, 'rb')
175
176 def test_safe_mkdir(self):
177 testfn = self.get_testfn()
178 safe_mkdir(testfn)
179 assert os.path.isdir(testfn)
180 safe_mkdir(testfn)
181 assert os.path.isdir(testfn)
182
183 def test_safe_rmpath(self):
184 # test file is removed
185 testfn = self.get_testfn()
186 open(testfn, 'w').close()
187 safe_rmpath(testfn)
188 assert not os.path.exists(testfn)
189 # test no exception if path does not exist
190 safe_rmpath(testfn)
191 # test dir is removed
192 os.mkdir(testfn)
193 safe_rmpath(testfn)
194 assert not os.path.exists(testfn)
195 # test other exceptions are raised
196 with mock.patch('psutil.tests.os.stat',
197 side_effect=OSError(errno.EINVAL, "")) as m:
198 with self.assertRaises(OSError):
199 safe_rmpath(testfn)
200 assert m.called
201
202 def test_chdir(self):
203 testfn = self.get_testfn()
204 base = os.getcwd()
205 os.mkdir(testfn)
206 with chdir(testfn):
207 self.assertEqual(os.getcwd(), os.path.join(base, testfn))
208 self.assertEqual(os.getcwd(), base)
209
210
211 class TestProcessUtils(PsutilTestCase):
212
213 def test_reap_children(self):
214 subp = self.spawn_testproc()
215 p = psutil.Process(subp.pid)
216 assert p.is_running()
217 reap_children()
218 assert not p.is_running()
219 assert not psutil.tests._pids_started
220 assert not psutil.tests._subprocesses_started
221
222 def test_spawn_children_pair(self):
223 child, grandchild = self.spawn_children_pair()
224 self.assertNotEqual(child.pid, grandchild.pid)
225 assert child.is_running()
226 assert grandchild.is_running()
227 children = psutil.Process().children()
228 self.assertEqual(children, [child])
229 children = psutil.Process().children(recursive=True)
230 self.assertEqual(len(children), 2)
231 self.assertIn(child, children)
232 self.assertIn(grandchild, children)
233 self.assertEqual(child.ppid(), os.getpid())
234 self.assertEqual(grandchild.ppid(), child.pid)
235
236 terminate(child)
237 assert not child.is_running()
238 assert grandchild.is_running()
239
240 terminate(grandchild)
241 assert not grandchild.is_running()
242
243 @unittest.skipIf(not POSIX, "POSIX only")
244 def test_spawn_zombie(self):
245 parent, zombie = self.spawn_zombie()
246 self.assertEqual(zombie.status(), psutil.STATUS_ZOMBIE)
247
248 def test_terminate(self):
249 # by subprocess.Popen
250 p = self.spawn_testproc()
251 terminate(p)
252 self.assertProcessGone(p)
253 terminate(p)
254 # by psutil.Process
255 p = psutil.Process(self.spawn_testproc().pid)
256 terminate(p)
257 self.assertProcessGone(p)
258 terminate(p)
259 # by psutil.Popen
260 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
261 p = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
262 terminate(p)
263 self.assertProcessGone(p)
264 terminate(p)
265 # by PID
266 pid = self.spawn_testproc().pid
267 terminate(pid)
268 self.assertProcessGone(p)
269 terminate(pid)
270 # zombie
271 if POSIX:
272 parent, zombie = self.spawn_zombie()
273 terminate(parent)
274 terminate(zombie)
275 self.assertProcessGone(parent)
276 self.assertProcessGone(zombie)
277
278
279 class TestNetUtils(PsutilTestCase):
280
281 def bind_socket(self):
282 port = get_free_port()
283 with contextlib.closing(bind_socket(addr=('', port))) as s:
284 self.assertEqual(s.getsockname()[1], port)
285
286 @unittest.skipIf(not POSIX, "POSIX only")
287 def test_bind_unix_socket(self):
288 name = self.get_testfn()
289 sock = bind_unix_socket(name)
290 with contextlib.closing(sock):
291 self.assertEqual(sock.family, socket.AF_UNIX)
292 self.assertEqual(sock.type, socket.SOCK_STREAM)
293 self.assertEqual(sock.getsockname(), name)
294 assert os.path.exists(name)
295 assert stat.S_ISSOCK(os.stat(name).st_mode)
296 # UDP
297 name = self.get_testfn()
298 sock = bind_unix_socket(name, type=socket.SOCK_DGRAM)
299 with contextlib.closing(sock):
300 self.assertEqual(sock.type, socket.SOCK_DGRAM)
301
302 def tcp_tcp_socketpair(self):
303 addr = ("127.0.0.1", get_free_port())
304 server, client = tcp_socketpair(socket.AF_INET, addr=addr)
305 with contextlib.closing(server):
306 with contextlib.closing(client):
307 # Ensure they are connected and the positions are
308 # correct.
309 self.assertEqual(server.getsockname(), addr)
310 self.assertEqual(client.getpeername(), addr)
311 self.assertNotEqual(client.getsockname(), addr)
312
313 @unittest.skipIf(not POSIX, "POSIX only")
314 @unittest.skipIf(NETBSD or FREEBSD,
315 "/var/run/log UNIX socket opened by default")
316 def test_unix_socketpair(self):
317 p = psutil.Process()
318 num_fds = p.num_fds()
319 assert not p.connections(kind='unix')
320 name = self.get_testfn()
321 server, client = unix_socketpair(name)
322 try:
323 assert os.path.exists(name)
324 assert stat.S_ISSOCK(os.stat(name).st_mode)
325 self.assertEqual(p.num_fds() - num_fds, 2)
326 self.assertEqual(len(p.connections(kind='unix')), 2)
327 self.assertEqual(server.getsockname(), name)
328 self.assertEqual(client.getpeername(), name)
329 finally:
330 client.close()
331 server.close()
332
333 def test_create_sockets(self):
334 with create_sockets() as socks:
335 fams = collections.defaultdict(int)
336 types = collections.defaultdict(int)
337 for s in socks:
338 fams[s.family] += 1
339 # work around http://bugs.python.org/issue30204
340 types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1
341 self.assertGreaterEqual(fams[socket.AF_INET], 2)
342 if supports_ipv6():
343 self.assertGreaterEqual(fams[socket.AF_INET6], 2)
344 if POSIX and HAS_CONNECTIONS_UNIX:
345 self.assertGreaterEqual(fams[socket.AF_UNIX], 2)
346 self.assertGreaterEqual(types[socket.SOCK_STREAM], 2)
347 self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2)
348
349
350 @serialrun
351 class TestMemLeakClass(TestMemoryLeak):
352
353 def test_times(self):
354 def fun():
355 cnt['cnt'] += 1
356 cnt = {'cnt': 0}
357 self.execute(fun, times=10, warmup_times=15)
358 self.assertEqual(cnt['cnt'], 26)
359
360 def test_param_err(self):
361 self.assertRaises(ValueError, self.execute, lambda: 0, times=0)
362 self.assertRaises(ValueError, self.execute, lambda: 0, times=-1)
363 self.assertRaises(ValueError, self.execute, lambda: 0, warmup_times=-1)
364 self.assertRaises(ValueError, self.execute, lambda: 0, tolerance=-1)
365 self.assertRaises(ValueError, self.execute, lambda: 0, retries=-1)
366
367 @retry_on_failure()
368 @unittest.skipIf(CI_TESTING, "skipped on CI")
369 def test_leak_mem(self):
370 ls = []
371
372 def fun(ls=ls):
373 ls.append("x" * 24 * 1024)
374
375 try:
376 # will consume around 3M in total
377 self.assertRaisesRegex(AssertionError, "extra-mem",
378 self.execute, fun, times=50)
379 finally:
380 del ls
381
382 def test_unclosed_files(self):
383 def fun():
384 f = open(__file__)
385 self.addCleanup(f.close)
386 box.append(f)
387
388 box = []
389 kind = "fd" if POSIX else "handle"
390 self.assertRaisesRegex(AssertionError, "unclosed " + kind,
391 self.execute, fun)
392
393 def test_tolerance(self):
394 def fun():
395 ls.append("x" * 24 * 1024)
396 ls = []
397 times = 100
398 self.execute(fun, times=times, warmup_times=0,
399 tolerance=200 * 1024 * 1024)
400 self.assertEqual(len(ls), times + 1)
401
402 def test_execute_w_exc(self):
403 def fun():
404 1 / 0
405 self.execute_w_exc(ZeroDivisionError, fun)
406 with self.assertRaises(ZeroDivisionError):
407 self.execute_w_exc(OSError, fun)
408
409 def fun():
410 pass
411 with self.assertRaises(AssertionError):
412 self.execute_w_exc(ZeroDivisionError, fun)
413
414
415 class TestTestingUtils(PsutilTestCase):
416
417 def test_process_namespace(self):
418 p = psutil.Process()
419 ns = process_namespace(p)
420 ns.test()
421 fun = [x for x in ns.iter(ns.getters) if x[1] == 'ppid'][0][0]
422 self.assertEqual(fun(), p.ppid())
423
424 def test_system_namespace(self):
425 ns = system_namespace()
426 fun = [x for x in ns.iter(ns.getters) if x[1] == 'net_if_addrs'][0][0]
427 self.assertEqual(fun(), psutil.net_if_addrs())
428
429
430 class TestOtherUtils(PsutilTestCase):
431
432 def test_is_namedtuple(self):
433 assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3))
434 assert not is_namedtuple(tuple())
435
436
437 if __name__ == '__main__':
438 from psutil.tests.runner import run_from_name
439 run_from_name(__file__)