Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/psutil/tests/test_testutils.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/psutil/tests/test_testutils.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,439 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +""" +Tests for testing utils (psutil.tests namespace). +""" + +import collections +import contextlib +import errno +import os +import socket +import stat +import subprocess + +from psutil import FREEBSD +from psutil import NETBSD +from psutil import POSIX +from psutil._common import open_binary +from psutil._common import open_text +from psutil._common import supports_ipv6 +from psutil.tests import bind_socket +from psutil.tests import bind_unix_socket +from psutil.tests import call_until +from psutil.tests import chdir +from psutil.tests import CI_TESTING +from psutil.tests import create_sockets +from psutil.tests import get_free_port +from psutil.tests import HAS_CONNECTIONS_UNIX +from psutil.tests import is_namedtuple +from psutil.tests import mock +from psutil.tests import process_namespace +from psutil.tests import PsutilTestCase +from psutil.tests import PYTHON_EXE +from psutil.tests import reap_children +from psutil.tests import retry +from psutil.tests import retry_on_failure +from psutil.tests import safe_mkdir +from psutil.tests import safe_rmpath +from psutil.tests import serialrun +from psutil.tests import system_namespace +from psutil.tests import tcp_socketpair +from psutil.tests import terminate +from psutil.tests import TestMemoryLeak +from psutil.tests import unittest +from psutil.tests import unix_socketpair +from psutil.tests import wait_for_file +from psutil.tests import wait_for_pid +import psutil +import psutil.tests + +# =================================================================== +# --- Unit tests for test utilities. +# =================================================================== + + +class TestRetryDecorator(PsutilTestCase): + + @mock.patch('time.sleep') + def test_retry_success(self, sleep): + # Fail 3 times out of 5; make sure the decorated fun returns. + + @retry(retries=5, interval=1, logfun=None) + def foo(): + while queue: + queue.pop() + 1 / 0 + return 1 + + queue = list(range(3)) + self.assertEqual(foo(), 1) + self.assertEqual(sleep.call_count, 3) + + @mock.patch('time.sleep') + def test_retry_failure(self, sleep): + # Fail 6 times out of 5; th function is supposed to raise exc. + @retry(retries=5, interval=1, logfun=None) + def foo(): + while queue: + queue.pop() + 1 / 0 + return 1 + + queue = list(range(6)) + self.assertRaises(ZeroDivisionError, foo) + self.assertEqual(sleep.call_count, 5) + + @mock.patch('time.sleep') + def test_exception_arg(self, sleep): + @retry(exception=ValueError, interval=1) + def foo(): + raise TypeError + + self.assertRaises(TypeError, foo) + self.assertEqual(sleep.call_count, 0) + + @mock.patch('time.sleep') + def test_no_interval_arg(self, sleep): + # if interval is not specified sleep is not supposed to be called + + @retry(retries=5, interval=None, logfun=None) + def foo(): + 1 / 0 + + self.assertRaises(ZeroDivisionError, foo) + self.assertEqual(sleep.call_count, 0) + + @mock.patch('time.sleep') + def test_retries_arg(self, sleep): + + @retry(retries=5, interval=1, logfun=None) + def foo(): + 1 / 0 + + self.assertRaises(ZeroDivisionError, foo) + self.assertEqual(sleep.call_count, 5) + + @mock.patch('time.sleep') + def test_retries_and_timeout_args(self, sleep): + self.assertRaises(ValueError, retry, retries=5, timeout=1) + + +class TestSyncTestUtils(PsutilTestCase): + + def test_wait_for_pid(self): + wait_for_pid(os.getpid()) + nopid = max(psutil.pids()) + 99999 + with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): + self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid) + + def test_wait_for_file(self): + testfn = self.get_testfn() + with open(testfn, 'w') as f: + f.write('foo') + wait_for_file(testfn) + assert not os.path.exists(testfn) + + def test_wait_for_file_empty(self): + testfn = self.get_testfn() + with open(testfn, 'w'): + pass + wait_for_file(testfn, empty=True) + assert not os.path.exists(testfn) + + def test_wait_for_file_no_file(self): + testfn = self.get_testfn() + with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): + self.assertRaises(IOError, wait_for_file, testfn) + + def test_wait_for_file_no_delete(self): + testfn = self.get_testfn() + with open(testfn, 'w') as f: + f.write('foo') + wait_for_file(testfn, delete=False) + assert os.path.exists(testfn) + + def test_call_until(self): + ret = call_until(lambda: 1, "ret == 1") + self.assertEqual(ret, 1) + + +class TestFSTestUtils(PsutilTestCase): + + def test_open_text(self): + with open_text(__file__) as f: + self.assertEqual(f.mode, 'rt') + + def test_open_binary(self): + with open_binary(__file__) as f: + self.assertEqual(f.mode, 'rb') + + def test_safe_mkdir(self): + testfn = self.get_testfn() + safe_mkdir(testfn) + assert os.path.isdir(testfn) + safe_mkdir(testfn) + assert os.path.isdir(testfn) + + def test_safe_rmpath(self): + # test file is removed + testfn = self.get_testfn() + open(testfn, 'w').close() + safe_rmpath(testfn) + assert not os.path.exists(testfn) + # test no exception if path does not exist + safe_rmpath(testfn) + # test dir is removed + os.mkdir(testfn) + safe_rmpath(testfn) + assert not os.path.exists(testfn) + # test other exceptions are raised + with mock.patch('psutil.tests.os.stat', + side_effect=OSError(errno.EINVAL, "")) as m: + with self.assertRaises(OSError): + safe_rmpath(testfn) + assert m.called + + def test_chdir(self): + testfn = self.get_testfn() + base = os.getcwd() + os.mkdir(testfn) + with chdir(testfn): + self.assertEqual(os.getcwd(), os.path.join(base, testfn)) + self.assertEqual(os.getcwd(), base) + + +class TestProcessUtils(PsutilTestCase): + + def test_reap_children(self): + subp = self.spawn_testproc() + p = psutil.Process(subp.pid) + assert p.is_running() + reap_children() + assert not p.is_running() + assert not psutil.tests._pids_started + assert not psutil.tests._subprocesses_started + + def test_spawn_children_pair(self): + child, grandchild = self.spawn_children_pair() + self.assertNotEqual(child.pid, grandchild.pid) + assert child.is_running() + assert grandchild.is_running() + children = psutil.Process().children() + self.assertEqual(children, [child]) + children = psutil.Process().children(recursive=True) + self.assertEqual(len(children), 2) + self.assertIn(child, children) + self.assertIn(grandchild, children) + self.assertEqual(child.ppid(), os.getpid()) + self.assertEqual(grandchild.ppid(), child.pid) + + terminate(child) + assert not child.is_running() + assert grandchild.is_running() + + terminate(grandchild) + assert not grandchild.is_running() + + @unittest.skipIf(not POSIX, "POSIX only") + def test_spawn_zombie(self): + parent, zombie = self.spawn_zombie() + self.assertEqual(zombie.status(), psutil.STATUS_ZOMBIE) + + def test_terminate(self): + # by subprocess.Popen + p = self.spawn_testproc() + terminate(p) + self.assertProcessGone(p) + terminate(p) + # by psutil.Process + p = psutil.Process(self.spawn_testproc().pid) + terminate(p) + self.assertProcessGone(p) + terminate(p) + # by psutil.Popen + cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] + p = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + terminate(p) + self.assertProcessGone(p) + terminate(p) + # by PID + pid = self.spawn_testproc().pid + terminate(pid) + self.assertProcessGone(p) + terminate(pid) + # zombie + if POSIX: + parent, zombie = self.spawn_zombie() + terminate(parent) + terminate(zombie) + self.assertProcessGone(parent) + self.assertProcessGone(zombie) + + +class TestNetUtils(PsutilTestCase): + + def bind_socket(self): + port = get_free_port() + with contextlib.closing(bind_socket(addr=('', port))) as s: + self.assertEqual(s.getsockname()[1], port) + + @unittest.skipIf(not POSIX, "POSIX only") + def test_bind_unix_socket(self): + name = self.get_testfn() + sock = bind_unix_socket(name) + with contextlib.closing(sock): + self.assertEqual(sock.family, socket.AF_UNIX) + self.assertEqual(sock.type, socket.SOCK_STREAM) + self.assertEqual(sock.getsockname(), name) + assert os.path.exists(name) + assert stat.S_ISSOCK(os.stat(name).st_mode) + # UDP + name = self.get_testfn() + sock = bind_unix_socket(name, type=socket.SOCK_DGRAM) + with contextlib.closing(sock): + self.assertEqual(sock.type, socket.SOCK_DGRAM) + + def tcp_tcp_socketpair(self): + addr = ("127.0.0.1", get_free_port()) + server, client = tcp_socketpair(socket.AF_INET, addr=addr) + with contextlib.closing(server): + with contextlib.closing(client): + # Ensure they are connected and the positions are + # correct. + self.assertEqual(server.getsockname(), addr) + self.assertEqual(client.getpeername(), addr) + self.assertNotEqual(client.getsockname(), addr) + + @unittest.skipIf(not POSIX, "POSIX only") + @unittest.skipIf(NETBSD or FREEBSD, + "/var/run/log UNIX socket opened by default") + def test_unix_socketpair(self): + p = psutil.Process() + num_fds = p.num_fds() + assert not p.connections(kind='unix') + name = self.get_testfn() + server, client = unix_socketpair(name) + try: + assert os.path.exists(name) + assert stat.S_ISSOCK(os.stat(name).st_mode) + self.assertEqual(p.num_fds() - num_fds, 2) + self.assertEqual(len(p.connections(kind='unix')), 2) + self.assertEqual(server.getsockname(), name) + self.assertEqual(client.getpeername(), name) + finally: + client.close() + server.close() + + def test_create_sockets(self): + with create_sockets() as socks: + fams = collections.defaultdict(int) + types = collections.defaultdict(int) + for s in socks: + fams[s.family] += 1 + # work around http://bugs.python.org/issue30204 + types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1 + self.assertGreaterEqual(fams[socket.AF_INET], 2) + if supports_ipv6(): + self.assertGreaterEqual(fams[socket.AF_INET6], 2) + if POSIX and HAS_CONNECTIONS_UNIX: + self.assertGreaterEqual(fams[socket.AF_UNIX], 2) + self.assertGreaterEqual(types[socket.SOCK_STREAM], 2) + self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2) + + +@serialrun +class TestMemLeakClass(TestMemoryLeak): + + def test_times(self): + def fun(): + cnt['cnt'] += 1 + cnt = {'cnt': 0} + self.execute(fun, times=10, warmup_times=15) + self.assertEqual(cnt['cnt'], 26) + + def test_param_err(self): + self.assertRaises(ValueError, self.execute, lambda: 0, times=0) + self.assertRaises(ValueError, self.execute, lambda: 0, times=-1) + self.assertRaises(ValueError, self.execute, lambda: 0, warmup_times=-1) + self.assertRaises(ValueError, self.execute, lambda: 0, tolerance=-1) + self.assertRaises(ValueError, self.execute, lambda: 0, retries=-1) + + @retry_on_failure() + @unittest.skipIf(CI_TESTING, "skipped on CI") + def test_leak_mem(self): + ls = [] + + def fun(ls=ls): + ls.append("x" * 24 * 1024) + + try: + # will consume around 3M in total + self.assertRaisesRegex(AssertionError, "extra-mem", + self.execute, fun, times=50) + finally: + del ls + + def test_unclosed_files(self): + def fun(): + f = open(__file__) + self.addCleanup(f.close) + box.append(f) + + box = [] + kind = "fd" if POSIX else "handle" + self.assertRaisesRegex(AssertionError, "unclosed " + kind, + self.execute, fun) + + def test_tolerance(self): + def fun(): + ls.append("x" * 24 * 1024) + ls = [] + times = 100 + self.execute(fun, times=times, warmup_times=0, + tolerance=200 * 1024 * 1024) + self.assertEqual(len(ls), times + 1) + + def test_execute_w_exc(self): + def fun(): + 1 / 0 + self.execute_w_exc(ZeroDivisionError, fun) + with self.assertRaises(ZeroDivisionError): + self.execute_w_exc(OSError, fun) + + def fun(): + pass + with self.assertRaises(AssertionError): + self.execute_w_exc(ZeroDivisionError, fun) + + +class TestTestingUtils(PsutilTestCase): + + def test_process_namespace(self): + p = psutil.Process() + ns = process_namespace(p) + ns.test() + fun = [x for x in ns.iter(ns.getters) if x[1] == 'ppid'][0][0] + self.assertEqual(fun(), p.ppid()) + + def test_system_namespace(self): + ns = system_namespace() + fun = [x for x in ns.iter(ns.getters) if x[1] == 'net_if_addrs'][0][0] + self.assertEqual(fun(), psutil.net_if_addrs()) + + +class TestOtherUtils(PsutilTestCase): + + def test_is_namedtuple(self): + assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3)) + assert not is_namedtuple(tuple()) + + +if __name__ == '__main__': + from psutil.tests.runner import run_from_name + run_from_name(__file__)