Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/runner.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
4 # Use of this source code is governed by a BSD-style license that can be | |
5 # found in the LICENSE file. | |
6 | |
7 """ | |
8 Unit test runner, providing new features on top of unittest module: | |
9 - colourized output | |
10 - parallel run (UNIX only) | |
11 - print failures/tracebacks on CTRL+C | |
12 - re-run failed tests only (make test-failed) | |
13 | |
14 Invocation examples: | |
15 - make test | |
16 - make test-failed | |
17 | |
18 Parallel: | |
19 - make test-parallel | |
20 - make test-process ARGS=--parallel | |
21 """ | |
22 | |
23 from __future__ import print_function | |
24 import atexit | |
25 import optparse | |
26 import os | |
27 import sys | |
28 import textwrap | |
29 import time | |
30 import unittest | |
31 try: | |
32 import ctypes | |
33 except ImportError: | |
34 ctypes = None | |
35 | |
36 try: | |
37 import concurrencytest # pip install concurrencytest | |
38 except ImportError: | |
39 concurrencytest = None | |
40 | |
41 import psutil | |
42 from psutil._common import hilite | |
43 from psutil._common import print_color | |
44 from psutil._common import term_supports_colors | |
45 from psutil._compat import super | |
46 from psutil.tests import CI_TESTING | |
47 from psutil.tests import import_module_by_path | |
48 from psutil.tests import print_sysinfo | |
49 from psutil.tests import reap_children | |
50 from psutil.tests import safe_rmpath | |
51 | |
52 | |
53 VERBOSITY = 2 | |
54 FAILED_TESTS_FNAME = '.failed-tests.txt' | |
55 NWORKERS = psutil.cpu_count() or 1 | |
56 USE_COLORS = not CI_TESTING and term_supports_colors() | |
57 | |
58 HERE = os.path.abspath(os.path.dirname(__file__)) | |
59 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase | |
60 | |
61 | |
62 def cprint(msg, color, bold=False, file=None): | |
63 if file is None: | |
64 file = sys.stderr if color == 'red' else sys.stdout | |
65 if USE_COLORS: | |
66 print_color(msg, color, bold=bold, file=file) | |
67 else: | |
68 print(msg, file=file) | |
69 | |
70 | |
71 class TestLoader: | |
72 | |
73 testdir = HERE | |
74 skip_files = ['test_memleaks.py'] | |
75 if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ: | |
76 skip_files.extend(['test_osx.py', 'test_linux.py', 'test_posix.py']) | |
77 | |
78 def _get_testmods(self): | |
79 return [os.path.join(self.testdir, x) | |
80 for x in os.listdir(self.testdir) | |
81 if x.startswith('test_') and x.endswith('.py') and | |
82 x not in self.skip_files] | |
83 | |
84 def _iter_testmod_classes(self): | |
85 """Iterate over all test files in this directory and return | |
86 all TestCase classes in them. | |
87 """ | |
88 for path in self._get_testmods(): | |
89 mod = import_module_by_path(path) | |
90 for name in dir(mod): | |
91 obj = getattr(mod, name) | |
92 if isinstance(obj, type) and \ | |
93 issubclass(obj, unittest.TestCase): | |
94 yield obj | |
95 | |
96 def all(self): | |
97 suite = unittest.TestSuite() | |
98 for obj in self._iter_testmod_classes(): | |
99 test = loadTestsFromTestCase(obj) | |
100 suite.addTest(test) | |
101 return suite | |
102 | |
103 def last_failed(self): | |
104 # ...from previously failed test run | |
105 suite = unittest.TestSuite() | |
106 if not os.path.isfile(FAILED_TESTS_FNAME): | |
107 return suite | |
108 with open(FAILED_TESTS_FNAME, 'rt') as f: | |
109 names = f.read().split() | |
110 for n in names: | |
111 test = unittest.defaultTestLoader.loadTestsFromName(n) | |
112 suite.addTest(test) | |
113 return suite | |
114 | |
115 def from_name(self, name): | |
116 if name.endswith('.py'): | |
117 name = os.path.splitext(os.path.basename(name))[0] | |
118 return unittest.defaultTestLoader.loadTestsFromName(name) | |
119 | |
120 | |
121 class ColouredResult(unittest.TextTestResult): | |
122 | |
123 def addSuccess(self, test): | |
124 unittest.TestResult.addSuccess(self, test) | |
125 cprint("OK", "green") | |
126 | |
127 def addError(self, test, err): | |
128 unittest.TestResult.addError(self, test, err) | |
129 cprint("ERROR", "red", bold=True) | |
130 | |
131 def addFailure(self, test, err): | |
132 unittest.TestResult.addFailure(self, test, err) | |
133 cprint("FAIL", "red") | |
134 | |
135 def addSkip(self, test, reason): | |
136 unittest.TestResult.addSkip(self, test, reason) | |
137 cprint("skipped: %s" % reason.strip(), "brown") | |
138 | |
139 def printErrorList(self, flavour, errors): | |
140 flavour = hilite(flavour, "red", bold=flavour == 'ERROR') | |
141 super().printErrorList(flavour, errors) | |
142 | |
143 | |
144 class ColouredTextRunner(unittest.TextTestRunner): | |
145 """ | |
146 A coloured text runner which also prints failed tests on KeyboardInterrupt | |
147 and save failed tests in a file so that they can be re-run. | |
148 """ | |
149 resultclass = ColouredResult if USE_COLORS else unittest.TextTestResult | |
150 | |
151 def __init__(self, *args, **kwargs): | |
152 super().__init__(*args, **kwargs) | |
153 self.failed_tnames = set() | |
154 | |
155 def _makeResult(self): | |
156 # Store result instance so that it can be accessed on | |
157 # KeyboardInterrupt. | |
158 self.result = super()._makeResult() | |
159 return self.result | |
160 | |
161 def _write_last_failed(self): | |
162 if self.failed_tnames: | |
163 with open(FAILED_TESTS_FNAME, 'wt') as f: | |
164 for tname in self.failed_tnames: | |
165 f.write(tname + '\n') | |
166 | |
167 def _save_result(self, result): | |
168 if not result.wasSuccessful(): | |
169 for t in result.errors + result.failures: | |
170 tname = t[0].id() | |
171 self.failed_tnames.add(tname) | |
172 | |
173 def _run(self, suite): | |
174 try: | |
175 result = super().run(suite) | |
176 except (KeyboardInterrupt, SystemExit): | |
177 result = self.runner.result | |
178 result.printErrors() | |
179 raise sys.exit(1) | |
180 else: | |
181 self._save_result(result) | |
182 return result | |
183 | |
184 def _exit(self, success): | |
185 if success: | |
186 cprint("SUCCESS", "green", bold=True) | |
187 safe_rmpath(FAILED_TESTS_FNAME) | |
188 sys.exit(0) | |
189 else: | |
190 cprint("FAILED", "red", bold=True) | |
191 self._write_last_failed() | |
192 sys.exit(1) | |
193 | |
194 def run(self, suite): | |
195 result = self._run(suite) | |
196 self._exit(result.wasSuccessful()) | |
197 | |
198 | |
199 class ParallelRunner(ColouredTextRunner): | |
200 | |
201 @staticmethod | |
202 def _parallelize(suite): | |
203 def fdopen(fd, mode, *kwds): | |
204 stream = orig_fdopen(fd, mode) | |
205 atexit.register(stream.close) | |
206 return stream | |
207 | |
208 # Monkey patch concurrencytest lib bug (fdopen() stream not closed). | |
209 # https://github.com/cgoldberg/concurrencytest/issues/11 | |
210 orig_fdopen = os.fdopen | |
211 concurrencytest.os.fdopen = fdopen | |
212 forker = concurrencytest.fork_for_tests(NWORKERS) | |
213 return concurrencytest.ConcurrentTestSuite(suite, forker) | |
214 | |
215 @staticmethod | |
216 def _split_suite(suite): | |
217 serial = unittest.TestSuite() | |
218 parallel = unittest.TestSuite() | |
219 for test in suite: | |
220 if test.countTestCases() == 0: | |
221 continue | |
222 elif isinstance(test, unittest.TestSuite): | |
223 test_class = test._tests[0].__class__ | |
224 elif isinstance(test, unittest.TestCase): | |
225 test_class = test | |
226 else: | |
227 raise TypeError("can't recognize type %r" % test) | |
228 | |
229 if getattr(test_class, '_serialrun', False): | |
230 serial.addTest(test) | |
231 else: | |
232 parallel.addTest(test) | |
233 return (serial, parallel) | |
234 | |
235 def run(self, suite): | |
236 ser_suite, par_suite = self._split_suite(suite) | |
237 par_suite = self._parallelize(par_suite) | |
238 | |
239 # run parallel | |
240 cprint("starting parallel tests using %s workers" % NWORKERS, | |
241 "green", bold=True) | |
242 t = time.time() | |
243 par = self._run(par_suite) | |
244 par_elapsed = time.time() - t | |
245 | |
246 # At this point we should have N zombies (the workers), which | |
247 # will disappear with wait(). | |
248 orphans = psutil.Process().children() | |
249 gone, alive = psutil.wait_procs(orphans, timeout=1) | |
250 if alive: | |
251 cprint("alive processes %s" % alive, "red") | |
252 reap_children() | |
253 | |
254 # run serial | |
255 t = time.time() | |
256 ser = self._run(ser_suite) | |
257 ser_elapsed = time.time() - t | |
258 | |
259 # print | |
260 if not par.wasSuccessful() and ser_suite.countTestCases() > 0: | |
261 par.printErrors() # print them again at the bottom | |
262 par_fails, par_errs, par_skips = map(len, (par.failures, | |
263 par.errors, | |
264 par.skipped)) | |
265 ser_fails, ser_errs, ser_skips = map(len, (ser.failures, | |
266 ser.errors, | |
267 ser.skipped)) | |
268 print(textwrap.dedent(""" | |
269 +----------+----------+----------+----------+----------+----------+ | |
270 | | total | failures | errors | skipped | time | | |
271 +----------+----------+----------+----------+----------+----------+ | |
272 | parallel | %3s | %3s | %3s | %3s | %.2fs | | |
273 +----------+----------+----------+----------+----------+----------+ | |
274 | serial | %3s | %3s | %3s | %3s | %.2fs | | |
275 +----------+----------+----------+----------+----------+----------+ | |
276 """ % (par.testsRun, par_fails, par_errs, par_skips, par_elapsed, | |
277 ser.testsRun, ser_fails, ser_errs, ser_skips, ser_elapsed))) | |
278 print("Ran %s tests in %.3fs using %s workers" % ( | |
279 par.testsRun + ser.testsRun, par_elapsed + ser_elapsed, NWORKERS)) | |
280 ok = par.wasSuccessful() and ser.wasSuccessful() | |
281 self._exit(ok) | |
282 | |
283 | |
284 def get_runner(parallel=False): | |
285 def warn(msg): | |
286 cprint(msg + " Running serial tests instead.", "red") | |
287 if parallel: | |
288 if psutil.WINDOWS: | |
289 warn("Can't run parallel tests on Windows.") | |
290 elif concurrencytest is None: | |
291 warn("concurrencytest module is not installed.") | |
292 elif NWORKERS == 1: | |
293 warn("Only 1 CPU available.") | |
294 else: | |
295 return ParallelRunner(verbosity=VERBOSITY) | |
296 return ColouredTextRunner(verbosity=VERBOSITY) | |
297 | |
298 | |
299 # Used by test_*,py modules. | |
300 def run_from_name(name): | |
301 suite = TestLoader().from_name(name) | |
302 runner = get_runner() | |
303 runner.run(suite) | |
304 | |
305 | |
306 def setup(): | |
307 if 'PSUTIL_TESTING' not in os.environ: | |
308 # This won't work on Windows but set_testing() below will do it. | |
309 os.environ['PSUTIL_TESTING'] = '1' | |
310 psutil._psplatform.cext.set_testing() | |
311 | |
312 | |
313 def main(): | |
314 setup() | |
315 usage = "python3 -m psutil.tests [opts] [test-name]" | |
316 parser = optparse.OptionParser(usage=usage, description="run unit tests") | |
317 parser.add_option("--last-failed", | |
318 action="store_true", default=False, | |
319 help="only run last failed tests") | |
320 parser.add_option("--parallel", | |
321 action="store_true", default=False, | |
322 help="run tests in parallel") | |
323 opts, args = parser.parse_args() | |
324 | |
325 if not opts.last_failed: | |
326 safe_rmpath(FAILED_TESTS_FNAME) | |
327 | |
328 # loader | |
329 loader = TestLoader() | |
330 if args: | |
331 if len(args) > 1: | |
332 parser.print_usage() | |
333 return sys.exit(1) | |
334 else: | |
335 suite = loader.from_name(args[0]) | |
336 elif opts.last_failed: | |
337 suite = loader.last_failed() | |
338 else: | |
339 suite = loader.all() | |
340 | |
341 if CI_TESTING: | |
342 print_sysinfo() | |
343 runner = get_runner(opts.parallel) | |
344 runner.run(suite) | |
345 | |
346 | |
347 if __name__ == '__main__': | |
348 main() |