comparison planemo/lib/python3.7/site-packages/psutil/tests/runner.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """
8 Unit test runner, providing new features on top of unittest module:
9 - colourized output
10 - parallel run (UNIX only)
11 - print failures/tracebacks on CTRL+C
12 - re-run failed tests only (make test-failed)
13
14 Invocation examples:
15 - make test
16 - make test-failed
17
18 Parallel:
19 - make test-parallel
20 - make test-process ARGS=--parallel
21 """
22
23 from __future__ import print_function
24 import atexit
25 import optparse
26 import os
27 import sys
28 import textwrap
29 import time
30 import unittest
31 try:
32 import ctypes
33 except ImportError:
34 ctypes = None
35
36 try:
37 import concurrencytest # pip install concurrencytest
38 except ImportError:
39 concurrencytest = None
40
41 import psutil
42 from psutil._common import hilite
43 from psutil._common import print_color
44 from psutil._common import term_supports_colors
45 from psutil._compat import super
46 from psutil.tests import CI_TESTING
47 from psutil.tests import import_module_by_path
48 from psutil.tests import print_sysinfo
49 from psutil.tests import reap_children
50 from psutil.tests import safe_rmpath
51
52
53 VERBOSITY = 2
54 FAILED_TESTS_FNAME = '.failed-tests.txt'
55 NWORKERS = psutil.cpu_count() or 1
56 USE_COLORS = not CI_TESTING and term_supports_colors()
57
58 HERE = os.path.abspath(os.path.dirname(__file__))
59 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
60
61
62 def cprint(msg, color, bold=False, file=None):
63 if file is None:
64 file = sys.stderr if color == 'red' else sys.stdout
65 if USE_COLORS:
66 print_color(msg, color, bold=bold, file=file)
67 else:
68 print(msg, file=file)
69
70
71 class TestLoader:
72
73 testdir = HERE
74 skip_files = ['test_memleaks.py']
75 if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ:
76 skip_files.extend(['test_osx.py', 'test_linux.py', 'test_posix.py'])
77
78 def _get_testmods(self):
79 return [os.path.join(self.testdir, x)
80 for x in os.listdir(self.testdir)
81 if x.startswith('test_') and x.endswith('.py') and
82 x not in self.skip_files]
83
84 def _iter_testmod_classes(self):
85 """Iterate over all test files in this directory and return
86 all TestCase classes in them.
87 """
88 for path in self._get_testmods():
89 mod = import_module_by_path(path)
90 for name in dir(mod):
91 obj = getattr(mod, name)
92 if isinstance(obj, type) and \
93 issubclass(obj, unittest.TestCase):
94 yield obj
95
96 def all(self):
97 suite = unittest.TestSuite()
98 for obj in self._iter_testmod_classes():
99 test = loadTestsFromTestCase(obj)
100 suite.addTest(test)
101 return suite
102
103 def last_failed(self):
104 # ...from previously failed test run
105 suite = unittest.TestSuite()
106 if not os.path.isfile(FAILED_TESTS_FNAME):
107 return suite
108 with open(FAILED_TESTS_FNAME, 'rt') as f:
109 names = f.read().split()
110 for n in names:
111 test = unittest.defaultTestLoader.loadTestsFromName(n)
112 suite.addTest(test)
113 return suite
114
115 def from_name(self, name):
116 if name.endswith('.py'):
117 name = os.path.splitext(os.path.basename(name))[0]
118 return unittest.defaultTestLoader.loadTestsFromName(name)
119
120
121 class ColouredResult(unittest.TextTestResult):
122
123 def addSuccess(self, test):
124 unittest.TestResult.addSuccess(self, test)
125 cprint("OK", "green")
126
127 def addError(self, test, err):
128 unittest.TestResult.addError(self, test, err)
129 cprint("ERROR", "red", bold=True)
130
131 def addFailure(self, test, err):
132 unittest.TestResult.addFailure(self, test, err)
133 cprint("FAIL", "red")
134
135 def addSkip(self, test, reason):
136 unittest.TestResult.addSkip(self, test, reason)
137 cprint("skipped: %s" % reason.strip(), "brown")
138
139 def printErrorList(self, flavour, errors):
140 flavour = hilite(flavour, "red", bold=flavour == 'ERROR')
141 super().printErrorList(flavour, errors)
142
143
144 class ColouredTextRunner(unittest.TextTestRunner):
145 """
146 A coloured text runner which also prints failed tests on KeyboardInterrupt
147 and save failed tests in a file so that they can be re-run.
148 """
149 resultclass = ColouredResult if USE_COLORS else unittest.TextTestResult
150
151 def __init__(self, *args, **kwargs):
152 super().__init__(*args, **kwargs)
153 self.failed_tnames = set()
154
155 def _makeResult(self):
156 # Store result instance so that it can be accessed on
157 # KeyboardInterrupt.
158 self.result = super()._makeResult()
159 return self.result
160
161 def _write_last_failed(self):
162 if self.failed_tnames:
163 with open(FAILED_TESTS_FNAME, 'wt') as f:
164 for tname in self.failed_tnames:
165 f.write(tname + '\n')
166
167 def _save_result(self, result):
168 if not result.wasSuccessful():
169 for t in result.errors + result.failures:
170 tname = t[0].id()
171 self.failed_tnames.add(tname)
172
173 def _run(self, suite):
174 try:
175 result = super().run(suite)
176 except (KeyboardInterrupt, SystemExit):
177 result = self.runner.result
178 result.printErrors()
179 raise sys.exit(1)
180 else:
181 self._save_result(result)
182 return result
183
184 def _exit(self, success):
185 if success:
186 cprint("SUCCESS", "green", bold=True)
187 safe_rmpath(FAILED_TESTS_FNAME)
188 sys.exit(0)
189 else:
190 cprint("FAILED", "red", bold=True)
191 self._write_last_failed()
192 sys.exit(1)
193
194 def run(self, suite):
195 result = self._run(suite)
196 self._exit(result.wasSuccessful())
197
198
199 class ParallelRunner(ColouredTextRunner):
200
201 @staticmethod
202 def _parallelize(suite):
203 def fdopen(fd, mode, *kwds):
204 stream = orig_fdopen(fd, mode)
205 atexit.register(stream.close)
206 return stream
207
208 # Monkey patch concurrencytest lib bug (fdopen() stream not closed).
209 # https://github.com/cgoldberg/concurrencytest/issues/11
210 orig_fdopen = os.fdopen
211 concurrencytest.os.fdopen = fdopen
212 forker = concurrencytest.fork_for_tests(NWORKERS)
213 return concurrencytest.ConcurrentTestSuite(suite, forker)
214
215 @staticmethod
216 def _split_suite(suite):
217 serial = unittest.TestSuite()
218 parallel = unittest.TestSuite()
219 for test in suite:
220 if test.countTestCases() == 0:
221 continue
222 elif isinstance(test, unittest.TestSuite):
223 test_class = test._tests[0].__class__
224 elif isinstance(test, unittest.TestCase):
225 test_class = test
226 else:
227 raise TypeError("can't recognize type %r" % test)
228
229 if getattr(test_class, '_serialrun', False):
230 serial.addTest(test)
231 else:
232 parallel.addTest(test)
233 return (serial, parallel)
234
235 def run(self, suite):
236 ser_suite, par_suite = self._split_suite(suite)
237 par_suite = self._parallelize(par_suite)
238
239 # run parallel
240 cprint("starting parallel tests using %s workers" % NWORKERS,
241 "green", bold=True)
242 t = time.time()
243 par = self._run(par_suite)
244 par_elapsed = time.time() - t
245
246 # At this point we should have N zombies (the workers), which
247 # will disappear with wait().
248 orphans = psutil.Process().children()
249 gone, alive = psutil.wait_procs(orphans, timeout=1)
250 if alive:
251 cprint("alive processes %s" % alive, "red")
252 reap_children()
253
254 # run serial
255 t = time.time()
256 ser = self._run(ser_suite)
257 ser_elapsed = time.time() - t
258
259 # print
260 if not par.wasSuccessful() and ser_suite.countTestCases() > 0:
261 par.printErrors() # print them again at the bottom
262 par_fails, par_errs, par_skips = map(len, (par.failures,
263 par.errors,
264 par.skipped))
265 ser_fails, ser_errs, ser_skips = map(len, (ser.failures,
266 ser.errors,
267 ser.skipped))
268 print(textwrap.dedent("""
269 +----------+----------+----------+----------+----------+----------+
270 | | total | failures | errors | skipped | time |
271 +----------+----------+----------+----------+----------+----------+
272 | parallel | %3s | %3s | %3s | %3s | %.2fs |
273 +----------+----------+----------+----------+----------+----------+
274 | serial | %3s | %3s | %3s | %3s | %.2fs |
275 +----------+----------+----------+----------+----------+----------+
276 """ % (par.testsRun, par_fails, par_errs, par_skips, par_elapsed,
277 ser.testsRun, ser_fails, ser_errs, ser_skips, ser_elapsed)))
278 print("Ran %s tests in %.3fs using %s workers" % (
279 par.testsRun + ser.testsRun, par_elapsed + ser_elapsed, NWORKERS))
280 ok = par.wasSuccessful() and ser.wasSuccessful()
281 self._exit(ok)
282
283
284 def get_runner(parallel=False):
285 def warn(msg):
286 cprint(msg + " Running serial tests instead.", "red")
287 if parallel:
288 if psutil.WINDOWS:
289 warn("Can't run parallel tests on Windows.")
290 elif concurrencytest is None:
291 warn("concurrencytest module is not installed.")
292 elif NWORKERS == 1:
293 warn("Only 1 CPU available.")
294 else:
295 return ParallelRunner(verbosity=VERBOSITY)
296 return ColouredTextRunner(verbosity=VERBOSITY)
297
298
299 # Used by test_*,py modules.
300 def run_from_name(name):
301 suite = TestLoader().from_name(name)
302 runner = get_runner()
303 runner.run(suite)
304
305
306 def setup():
307 if 'PSUTIL_TESTING' not in os.environ:
308 # This won't work on Windows but set_testing() below will do it.
309 os.environ['PSUTIL_TESTING'] = '1'
310 psutil._psplatform.cext.set_testing()
311
312
313 def main():
314 setup()
315 usage = "python3 -m psutil.tests [opts] [test-name]"
316 parser = optparse.OptionParser(usage=usage, description="run unit tests")
317 parser.add_option("--last-failed",
318 action="store_true", default=False,
319 help="only run last failed tests")
320 parser.add_option("--parallel",
321 action="store_true", default=False,
322 help="run tests in parallel")
323 opts, args = parser.parse_args()
324
325 if not opts.last_failed:
326 safe_rmpath(FAILED_TESTS_FNAME)
327
328 # loader
329 loader = TestLoader()
330 if args:
331 if len(args) > 1:
332 parser.print_usage()
333 return sys.exit(1)
334 else:
335 suite = loader.from_name(args[0])
336 elif opts.last_failed:
337 suite = loader.last_failed()
338 else:
339 suite = loader.all()
340
341 if CI_TESTING:
342 print_sysinfo()
343 runner = get_runner(opts.parallel)
344 runner.run(suite)
345
346
347 if __name__ == '__main__':
348 main()