diff planemo/lib/python3.7/site-packages/psutil/tests/runner.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/psutil/tests/runner.py	Fri Jul 31 00:32:28 2020 -0400
@@ -0,0 +1,348 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+Unit test runner, providing new features on top of unittest module:
+- colourized output
+- parallel run (UNIX only)
+- print failures/tracebacks on CTRL+C
+- re-run failed tests only (make test-failed)
+
+Invocation examples:
+- make test
+- make test-failed
+
+Parallel:
+- make test-parallel
+- make test-process ARGS=--parallel
+"""
+
+from __future__ import print_function
+import atexit
+import optparse
+import os
+import sys
+import textwrap
+import time
+import unittest
+try:
+    import ctypes
+except ImportError:
+    ctypes = None
+
+try:
+    import concurrencytest  # pip install concurrencytest
+except ImportError:
+    concurrencytest = None
+
+import psutil
+from psutil._common import hilite
+from psutil._common import print_color
+from psutil._common import term_supports_colors
+from psutil._compat import super
+from psutil.tests import CI_TESTING
+from psutil.tests import import_module_by_path
+from psutil.tests import print_sysinfo
+from psutil.tests import reap_children
+from psutil.tests import safe_rmpath
+
+
+VERBOSITY = 2
+FAILED_TESTS_FNAME = '.failed-tests.txt'
+NWORKERS = psutil.cpu_count() or 1
+USE_COLORS = not CI_TESTING and term_supports_colors()
+
+HERE = os.path.abspath(os.path.dirname(__file__))
+loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
+
+
+def cprint(msg, color, bold=False, file=None):
+    if file is None:
+        file = sys.stderr if color == 'red' else sys.stdout
+    if USE_COLORS:
+        print_color(msg, color, bold=bold, file=file)
+    else:
+        print(msg, file=file)
+
+
+class TestLoader:
+
+    testdir = HERE
+    skip_files = ['test_memleaks.py']
+    if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ:
+        skip_files.extend(['test_osx.py', 'test_linux.py', 'test_posix.py'])
+
+    def _get_testmods(self):
+        return [os.path.join(self.testdir, x)
+                for x in os.listdir(self.testdir)
+                if x.startswith('test_') and x.endswith('.py') and
+                x not in self.skip_files]
+
+    def _iter_testmod_classes(self):
+        """Iterate over all test files in this directory and return
+        all TestCase classes in them.
+        """
+        for path in self._get_testmods():
+            mod = import_module_by_path(path)
+            for name in dir(mod):
+                obj = getattr(mod, name)
+                if isinstance(obj, type) and \
+                        issubclass(obj, unittest.TestCase):
+                    yield obj
+
+    def all(self):
+        suite = unittest.TestSuite()
+        for obj in self._iter_testmod_classes():
+            test = loadTestsFromTestCase(obj)
+            suite.addTest(test)
+        return suite
+
+    def last_failed(self):
+        # ...from previously failed test run
+        suite = unittest.TestSuite()
+        if not os.path.isfile(FAILED_TESTS_FNAME):
+            return suite
+        with open(FAILED_TESTS_FNAME, 'rt') as f:
+            names = f.read().split()
+        for n in names:
+            test = unittest.defaultTestLoader.loadTestsFromName(n)
+            suite.addTest(test)
+        return suite
+
+    def from_name(self, name):
+        if name.endswith('.py'):
+            name = os.path.splitext(os.path.basename(name))[0]
+        return unittest.defaultTestLoader.loadTestsFromName(name)
+
+
+class ColouredResult(unittest.TextTestResult):
+
+    def addSuccess(self, test):
+        unittest.TestResult.addSuccess(self, test)
+        cprint("OK", "green")
+
+    def addError(self, test, err):
+        unittest.TestResult.addError(self, test, err)
+        cprint("ERROR", "red", bold=True)
+
+    def addFailure(self, test, err):
+        unittest.TestResult.addFailure(self, test, err)
+        cprint("FAIL", "red")
+
+    def addSkip(self, test, reason):
+        unittest.TestResult.addSkip(self, test, reason)
+        cprint("skipped: %s" % reason.strip(), "brown")
+
+    def printErrorList(self, flavour, errors):
+        flavour = hilite(flavour, "red", bold=flavour == 'ERROR')
+        super().printErrorList(flavour, errors)
+
+
+class ColouredTextRunner(unittest.TextTestRunner):
+    """
+    A coloured text runner which also prints failed tests on KeyboardInterrupt
+    and save failed tests in a file so that they can be re-run.
+    """
+    resultclass = ColouredResult if USE_COLORS else unittest.TextTestResult
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.failed_tnames = set()
+
+    def _makeResult(self):
+        # Store result instance so that it can be accessed on
+        # KeyboardInterrupt.
+        self.result = super()._makeResult()
+        return self.result
+
+    def _write_last_failed(self):
+        if self.failed_tnames:
+            with open(FAILED_TESTS_FNAME, 'wt') as f:
+                for tname in self.failed_tnames:
+                    f.write(tname + '\n')
+
+    def _save_result(self, result):
+        if not result.wasSuccessful():
+            for t in result.errors + result.failures:
+                tname = t[0].id()
+                self.failed_tnames.add(tname)
+
+    def _run(self, suite):
+        try:
+            result = super().run(suite)
+        except (KeyboardInterrupt, SystemExit):
+            result = self.runner.result
+            result.printErrors()
+            raise sys.exit(1)
+        else:
+            self._save_result(result)
+            return result
+
+    def _exit(self, success):
+        if success:
+            cprint("SUCCESS", "green", bold=True)
+            safe_rmpath(FAILED_TESTS_FNAME)
+            sys.exit(0)
+        else:
+            cprint("FAILED", "red", bold=True)
+            self._write_last_failed()
+            sys.exit(1)
+
+    def run(self, suite):
+        result = self._run(suite)
+        self._exit(result.wasSuccessful())
+
+
+class ParallelRunner(ColouredTextRunner):
+
+    @staticmethod
+    def _parallelize(suite):
+        def fdopen(fd, mode, *kwds):
+            stream = orig_fdopen(fd, mode)
+            atexit.register(stream.close)
+            return stream
+
+        # Monkey patch concurrencytest lib bug (fdopen() stream not closed).
+        # https://github.com/cgoldberg/concurrencytest/issues/11
+        orig_fdopen = os.fdopen
+        concurrencytest.os.fdopen = fdopen
+        forker = concurrencytest.fork_for_tests(NWORKERS)
+        return concurrencytest.ConcurrentTestSuite(suite, forker)
+
+    @staticmethod
+    def _split_suite(suite):
+        serial = unittest.TestSuite()
+        parallel = unittest.TestSuite()
+        for test in suite:
+            if test.countTestCases() == 0:
+                continue
+            elif isinstance(test, unittest.TestSuite):
+                test_class = test._tests[0].__class__
+            elif isinstance(test, unittest.TestCase):
+                test_class = test
+            else:
+                raise TypeError("can't recognize type %r" % test)
+
+            if getattr(test_class, '_serialrun', False):
+                serial.addTest(test)
+            else:
+                parallel.addTest(test)
+        return (serial, parallel)
+
+    def run(self, suite):
+        ser_suite, par_suite = self._split_suite(suite)
+        par_suite = self._parallelize(par_suite)
+
+        # run parallel
+        cprint("starting parallel tests using %s workers" % NWORKERS,
+               "green", bold=True)
+        t = time.time()
+        par = self._run(par_suite)
+        par_elapsed = time.time() - t
+
+        # At this point we should have N zombies (the workers), which
+        # will disappear with wait().
+        orphans = psutil.Process().children()
+        gone, alive = psutil.wait_procs(orphans, timeout=1)
+        if alive:
+            cprint("alive processes %s" % alive, "red")
+            reap_children()
+
+        # run serial
+        t = time.time()
+        ser = self._run(ser_suite)
+        ser_elapsed = time.time() - t
+
+        # print
+        if not par.wasSuccessful() and ser_suite.countTestCases() > 0:
+            par.printErrors()  # print them again at the bottom
+        par_fails, par_errs, par_skips = map(len, (par.failures,
+                                                   par.errors,
+                                                   par.skipped))
+        ser_fails, ser_errs, ser_skips = map(len, (ser.failures,
+                                                   ser.errors,
+                                                   ser.skipped))
+        print(textwrap.dedent("""
+            +----------+----------+----------+----------+----------+----------+
+            |          |    total | failures |   errors |  skipped |     time |
+            +----------+----------+----------+----------+----------+----------+
+            | parallel |      %3s |      %3s |      %3s |      %3s |    %.2fs |
+            +----------+----------+----------+----------+----------+----------+
+            | serial   |      %3s |      %3s |      %3s |      %3s |    %.2fs |
+            +----------+----------+----------+----------+----------+----------+
+            """ % (par.testsRun, par_fails, par_errs, par_skips, par_elapsed,
+                   ser.testsRun, ser_fails, ser_errs, ser_skips, ser_elapsed)))
+        print("Ran %s tests in %.3fs using %s workers" % (
+            par.testsRun + ser.testsRun, par_elapsed + ser_elapsed, NWORKERS))
+        ok = par.wasSuccessful() and ser.wasSuccessful()
+        self._exit(ok)
+
+
+def get_runner(parallel=False):
+    def warn(msg):
+        cprint(msg + " Running serial tests instead.", "red")
+    if parallel:
+        if psutil.WINDOWS:
+            warn("Can't run parallel tests on Windows.")
+        elif concurrencytest is None:
+            warn("concurrencytest module is not installed.")
+        elif NWORKERS == 1:
+            warn("Only 1 CPU available.")
+        else:
+            return ParallelRunner(verbosity=VERBOSITY)
+    return ColouredTextRunner(verbosity=VERBOSITY)
+
+
+# Used by test_*,py modules.
+def run_from_name(name):
+    suite = TestLoader().from_name(name)
+    runner = get_runner()
+    runner.run(suite)
+
+
+def setup():
+    if 'PSUTIL_TESTING' not in os.environ:
+        # This won't work on Windows but set_testing() below will do it.
+        os.environ['PSUTIL_TESTING'] = '1'
+    psutil._psplatform.cext.set_testing()
+
+
+def main():
+    setup()
+    usage = "python3 -m psutil.tests [opts] [test-name]"
+    parser = optparse.OptionParser(usage=usage, description="run unit tests")
+    parser.add_option("--last-failed",
+                      action="store_true", default=False,
+                      help="only run last failed tests")
+    parser.add_option("--parallel",
+                      action="store_true", default=False,
+                      help="run tests in parallel")
+    opts, args = parser.parse_args()
+
+    if not opts.last_failed:
+        safe_rmpath(FAILED_TESTS_FNAME)
+
+    # loader
+    loader = TestLoader()
+    if args:
+        if len(args) > 1:
+            parser.print_usage()
+            return sys.exit(1)
+        else:
+            suite = loader.from_name(args[0])
+    elif opts.last_failed:
+        suite = loader.last_failed()
+    else:
+        suite = loader.all()
+
+    if CI_TESTING:
+        print_sysinfo()
+    runner = get_runner(opts.parallel)
+    runner.run(suite)
+
+
+if __name__ == '__main__':
+    main()