Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/psutil/tests/runner.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/psutil/tests/runner.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +""" +Unit test runner, providing new features on top of unittest module: +- colourized output +- parallel run (UNIX only) +- print failures/tracebacks on CTRL+C +- re-run failed tests only (make test-failed) + +Invocation examples: +- make test +- make test-failed + +Parallel: +- make test-parallel +- make test-process ARGS=--parallel +""" + +from __future__ import print_function +import atexit +import optparse +import os +import sys +import textwrap +import time +import unittest +try: + import ctypes +except ImportError: + ctypes = None + +try: + import concurrencytest # pip install concurrencytest +except ImportError: + concurrencytest = None + +import psutil +from psutil._common import hilite +from psutil._common import print_color +from psutil._common import term_supports_colors +from psutil._compat import super +from psutil.tests import CI_TESTING +from psutil.tests import import_module_by_path +from psutil.tests import print_sysinfo +from psutil.tests import reap_children +from psutil.tests import safe_rmpath + + +VERBOSITY = 2 +FAILED_TESTS_FNAME = '.failed-tests.txt' +NWORKERS = psutil.cpu_count() or 1 +USE_COLORS = not CI_TESTING and term_supports_colors() + +HERE = os.path.abspath(os.path.dirname(__file__)) +loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase + + +def cprint(msg, color, bold=False, file=None): + if file is None: + file = sys.stderr if color == 'red' else sys.stdout + if USE_COLORS: + print_color(msg, color, bold=bold, file=file) + else: + print(msg, file=file) + + +class TestLoader: + + testdir = HERE + skip_files = ['test_memleaks.py'] + if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ: + skip_files.extend(['test_osx.py', 'test_linux.py', 'test_posix.py']) + + def _get_testmods(self): + return [os.path.join(self.testdir, x) + for x in os.listdir(self.testdir) + if x.startswith('test_') and x.endswith('.py') and + x not in self.skip_files] + + def _iter_testmod_classes(self): + """Iterate over all test files in this directory and return + all TestCase classes in them. + """ + for path in self._get_testmods(): + mod = import_module_by_path(path) + for name in dir(mod): + obj = getattr(mod, name) + if isinstance(obj, type) and \ + issubclass(obj, unittest.TestCase): + yield obj + + def all(self): + suite = unittest.TestSuite() + for obj in self._iter_testmod_classes(): + test = loadTestsFromTestCase(obj) + suite.addTest(test) + return suite + + def last_failed(self): + # ...from previously failed test run + suite = unittest.TestSuite() + if not os.path.isfile(FAILED_TESTS_FNAME): + return suite + with open(FAILED_TESTS_FNAME, 'rt') as f: + names = f.read().split() + for n in names: + test = unittest.defaultTestLoader.loadTestsFromName(n) + suite.addTest(test) + return suite + + def from_name(self, name): + if name.endswith('.py'): + name = os.path.splitext(os.path.basename(name))[0] + return unittest.defaultTestLoader.loadTestsFromName(name) + + +class ColouredResult(unittest.TextTestResult): + + def addSuccess(self, test): + unittest.TestResult.addSuccess(self, test) + cprint("OK", "green") + + def addError(self, test, err): + unittest.TestResult.addError(self, test, err) + cprint("ERROR", "red", bold=True) + + def addFailure(self, test, err): + unittest.TestResult.addFailure(self, test, err) + cprint("FAIL", "red") + + def addSkip(self, test, reason): + unittest.TestResult.addSkip(self, test, reason) + cprint("skipped: %s" % reason.strip(), "brown") + + def printErrorList(self, flavour, errors): + flavour = hilite(flavour, "red", bold=flavour == 'ERROR') + super().printErrorList(flavour, errors) + + +class ColouredTextRunner(unittest.TextTestRunner): + """ + A coloured text runner which also prints failed tests on KeyboardInterrupt + and save failed tests in a file so that they can be re-run. + """ + resultclass = ColouredResult if USE_COLORS else unittest.TextTestResult + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.failed_tnames = set() + + def _makeResult(self): + # Store result instance so that it can be accessed on + # KeyboardInterrupt. + self.result = super()._makeResult() + return self.result + + def _write_last_failed(self): + if self.failed_tnames: + with open(FAILED_TESTS_FNAME, 'wt') as f: + for tname in self.failed_tnames: + f.write(tname + '\n') + + def _save_result(self, result): + if not result.wasSuccessful(): + for t in result.errors + result.failures: + tname = t[0].id() + self.failed_tnames.add(tname) + + def _run(self, suite): + try: + result = super().run(suite) + except (KeyboardInterrupt, SystemExit): + result = self.runner.result + result.printErrors() + raise sys.exit(1) + else: + self._save_result(result) + return result + + def _exit(self, success): + if success: + cprint("SUCCESS", "green", bold=True) + safe_rmpath(FAILED_TESTS_FNAME) + sys.exit(0) + else: + cprint("FAILED", "red", bold=True) + self._write_last_failed() + sys.exit(1) + + def run(self, suite): + result = self._run(suite) + self._exit(result.wasSuccessful()) + + +class ParallelRunner(ColouredTextRunner): + + @staticmethod + def _parallelize(suite): + def fdopen(fd, mode, *kwds): + stream = orig_fdopen(fd, mode) + atexit.register(stream.close) + return stream + + # Monkey patch concurrencytest lib bug (fdopen() stream not closed). + # https://github.com/cgoldberg/concurrencytest/issues/11 + orig_fdopen = os.fdopen + concurrencytest.os.fdopen = fdopen + forker = concurrencytest.fork_for_tests(NWORKERS) + return concurrencytest.ConcurrentTestSuite(suite, forker) + + @staticmethod + def _split_suite(suite): + serial = unittest.TestSuite() + parallel = unittest.TestSuite() + for test in suite: + if test.countTestCases() == 0: + continue + elif isinstance(test, unittest.TestSuite): + test_class = test._tests[0].__class__ + elif isinstance(test, unittest.TestCase): + test_class = test + else: + raise TypeError("can't recognize type %r" % test) + + if getattr(test_class, '_serialrun', False): + serial.addTest(test) + else: + parallel.addTest(test) + return (serial, parallel) + + def run(self, suite): + ser_suite, par_suite = self._split_suite(suite) + par_suite = self._parallelize(par_suite) + + # run parallel + cprint("starting parallel tests using %s workers" % NWORKERS, + "green", bold=True) + t = time.time() + par = self._run(par_suite) + par_elapsed = time.time() - t + + # At this point we should have N zombies (the workers), which + # will disappear with wait(). + orphans = psutil.Process().children() + gone, alive = psutil.wait_procs(orphans, timeout=1) + if alive: + cprint("alive processes %s" % alive, "red") + reap_children() + + # run serial + t = time.time() + ser = self._run(ser_suite) + ser_elapsed = time.time() - t + + # print + if not par.wasSuccessful() and ser_suite.countTestCases() > 0: + par.printErrors() # print them again at the bottom + par_fails, par_errs, par_skips = map(len, (par.failures, + par.errors, + par.skipped)) + ser_fails, ser_errs, ser_skips = map(len, (ser.failures, + ser.errors, + ser.skipped)) + print(textwrap.dedent(""" + +----------+----------+----------+----------+----------+----------+ + | | total | failures | errors | skipped | time | + +----------+----------+----------+----------+----------+----------+ + | parallel | %3s | %3s | %3s | %3s | %.2fs | + +----------+----------+----------+----------+----------+----------+ + | serial | %3s | %3s | %3s | %3s | %.2fs | + +----------+----------+----------+----------+----------+----------+ + """ % (par.testsRun, par_fails, par_errs, par_skips, par_elapsed, + ser.testsRun, ser_fails, ser_errs, ser_skips, ser_elapsed))) + print("Ran %s tests in %.3fs using %s workers" % ( + par.testsRun + ser.testsRun, par_elapsed + ser_elapsed, NWORKERS)) + ok = par.wasSuccessful() and ser.wasSuccessful() + self._exit(ok) + + +def get_runner(parallel=False): + def warn(msg): + cprint(msg + " Running serial tests instead.", "red") + if parallel: + if psutil.WINDOWS: + warn("Can't run parallel tests on Windows.") + elif concurrencytest is None: + warn("concurrencytest module is not installed.") + elif NWORKERS == 1: + warn("Only 1 CPU available.") + else: + return ParallelRunner(verbosity=VERBOSITY) + return ColouredTextRunner(verbosity=VERBOSITY) + + +# Used by test_*,py modules. +def run_from_name(name): + suite = TestLoader().from_name(name) + runner = get_runner() + runner.run(suite) + + +def setup(): + if 'PSUTIL_TESTING' not in os.environ: + # This won't work on Windows but set_testing() below will do it. + os.environ['PSUTIL_TESTING'] = '1' + psutil._psplatform.cext.set_testing() + + +def main(): + setup() + usage = "python3 -m psutil.tests [opts] [test-name]" + parser = optparse.OptionParser(usage=usage, description="run unit tests") + parser.add_option("--last-failed", + action="store_true", default=False, + help="only run last failed tests") + parser.add_option("--parallel", + action="store_true", default=False, + help="run tests in parallel") + opts, args = parser.parse_args() + + if not opts.last_failed: + safe_rmpath(FAILED_TESTS_FNAME) + + # loader + loader = TestLoader() + if args: + if len(args) > 1: + parser.print_usage() + return sys.exit(1) + else: + suite = loader.from_name(args[0]) + elif opts.last_failed: + suite = loader.last_failed() + else: + suite = loader.all() + + if CI_TESTING: + print_sysinfo() + runner = get_runner(opts.parallel) + runner.run(suite) + + +if __name__ == '__main__': + main()