Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/humanfriendly/testing.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/humanfriendly/testing.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,669 @@ +# Human friendly input/output in Python. +# +# Author: Peter Odding <peter@peterodding.com> +# Last Change: March 6, 2020 +# URL: https://humanfriendly.readthedocs.io + +""" +Utility classes and functions that make it easy to write :mod:`unittest` compatible test suites. + +Over the years I've developed the habit of writing test suites for Python +projects using the :mod:`unittest` module. During those years I've come to know +:pypi:`pytest` and in fact I use :pypi:`pytest` to run my test suites (due to +its much better error reporting) but I've yet to publish a test suite that +*requires* :pypi:`pytest`. I have several reasons for doing so: + +- It's nice to keep my test suites as simple and accessible as possible and + not requiring a specific test runner is part of that attitude. + +- Whereas :mod:`unittest` is quite explicit, :pypi:`pytest` contains a lot of + magic, which kind of contradicts the Python mantra "explicit is better than + implicit" (IMHO). +""" + +# Standard library module +import functools +import logging +import os +import pipes +import shutil +import sys +import tempfile +import time +import unittest + +# Modules included in our package. +from humanfriendly.compat import StringIO +from humanfriendly.text import random_string + +# Initialize a logger for this module. +logger = logging.getLogger(__name__) + +# A unique object reference used to detect missing attributes. +NOTHING = object() + +# Public identifiers that require documentation. +__all__ = ( + 'CallableTimedOut', + 'CaptureBuffer', + 'CaptureOutput', + 'ContextManager', + 'CustomSearchPath', + 'MockedProgram', + 'PatchedAttribute', + 'PatchedItem', + 'TemporaryDirectory', + 'TestCase', + 'configure_logging', + 'make_dirs', + 'retry', + 'run_cli', + 'skip_on_raise', + 'touch', +) + + +def configure_logging(log_level=logging.DEBUG): + """configure_logging(log_level=logging.DEBUG) + Automatically configure logging to the terminal. + + :param log_level: The log verbosity (a number, defaults + to :mod:`logging.DEBUG <logging>`). + + When :mod:`coloredlogs` is installed :func:`coloredlogs.install()` will be + used to configure logging to the terminal. When this fails with an + :exc:`~exceptions.ImportError` then :func:`logging.basicConfig()` is used + as a fall back. + """ + try: + import coloredlogs + coloredlogs.install(level=log_level) + except ImportError: + logging.basicConfig( + level=log_level, + format='%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + + +def make_dirs(pathname): + """ + Create missing directories. + + :param pathname: The pathname of a directory (a string). + """ + if not os.path.isdir(pathname): + os.makedirs(pathname) + + +def retry(func, timeout=60, exc_type=AssertionError): + """retry(func, timeout=60, exc_type=AssertionError) + Retry a function until assertions no longer fail. + + :param func: A callable. When the callable returns + :data:`False` it will also be retried. + :param timeout: The number of seconds after which to abort (a number, + defaults to 60). + :param exc_type: The type of exceptions to retry (defaults + to :exc:`~exceptions.AssertionError`). + :returns: The value returned by `func`. + :raises: Once the timeout has expired :func:`retry()` will raise the + previously retried assertion error. When `func` keeps returning + :data:`False` until `timeout` expires :exc:`CallableTimedOut` + will be raised. + + This function sleeps between retries to avoid claiming CPU cycles we don't + need. It starts by sleeping for 0.1 second but adjusts this to one second + as the number of retries grows. + """ + pause = 0.1 + timeout += time.time() + while True: + try: + result = func() + if result is not False: + return result + except exc_type: + if time.time() > timeout: + raise + else: + if time.time() > timeout: + raise CallableTimedOut() + time.sleep(pause) + if pause < 1: + pause *= 2 + + +def run_cli(entry_point, *arguments, **options): + """ + Test a command line entry point. + + :param entry_point: The function that implements the command line interface + (a callable). + :param arguments: Any positional arguments (strings) become the command + line arguments (:data:`sys.argv` items 1-N). + :param options: The following keyword arguments are supported: + + **capture** + Whether to use :class:`CaptureOutput`. Defaults + to :data:`True` but can be disabled by passing + :data:`False` instead. + **input** + Refer to :class:`CaptureOutput`. + **merged** + Refer to :class:`CaptureOutput`. + **program_name** + Used to set :data:`sys.argv` item 0. + :returns: A tuple with two values: + + 1. The return code (an integer). + 2. The captured output (a string). + """ + # Add the `program_name' option to the arguments. + arguments = list(arguments) + arguments.insert(0, options.pop('program_name', sys.executable)) + # Log the command line arguments (and the fact that we're about to call the + # command line entry point function). + logger.debug("Calling command line entry point with arguments: %s", arguments) + # Prepare to capture the return code and output even if the command line + # interface raises an exception (whether the exception type is SystemExit + # or something else). + returncode = 0 + stdout = None + stderr = None + try: + # Temporarily override sys.argv. + with PatchedAttribute(sys, 'argv', arguments): + # Manipulate the standard input/output/error streams? + options['enabled'] = options.pop('capture', True) + with CaptureOutput(**options) as capturer: + try: + # Call the command line interface. + entry_point() + finally: + # Get the output even if an exception is raised. + stdout = capturer.stdout.getvalue() + stderr = capturer.stderr.getvalue() + # Reconfigure logging to the terminal because it is very + # likely that the entry point function has changed the + # configured log level. + configure_logging() + except BaseException as e: + if isinstance(e, SystemExit): + logger.debug("Intercepting return code %s from SystemExit exception.", e.code) + returncode = e.code + else: + logger.warning("Defaulting return code to 1 due to raised exception.", exc_info=True) + returncode = 1 + else: + logger.debug("Command line entry point returned successfully!") + # Always log the output captured on stdout/stderr, to make it easier to + # diagnose test failures (but avoid duplicate logging when merged=True). + is_merged = options.get('merged', False) + merged_streams = [('merged streams', stdout)] + separate_streams = [('stdout', stdout), ('stderr', stderr)] + streams = merged_streams if is_merged else separate_streams + for name, value in streams: + if value: + logger.debug("Output on %s:\n%s", name, value) + else: + logger.debug("No output on %s.", name) + return returncode, stdout + + +def skip_on_raise(*exc_types): + """ + Decorate a test function to translation specific exception types to :exc:`unittest.SkipTest`. + + :param exc_types: One or more positional arguments give the exception + types to be translated to :exc:`unittest.SkipTest`. + :returns: A decorator function specialized to `exc_types`. + """ + def decorator(function): + @functools.wraps(function) + def wrapper(*args, **kw): + try: + return function(*args, **kw) + except exc_types as e: + logger.debug("Translating exception to unittest.SkipTest ..", exc_info=True) + raise unittest.SkipTest("skipping test because %s was raised" % type(e)) + return wrapper + return decorator + + +def touch(filename): + """ + The equivalent of the UNIX :man:`touch` program in Python. + + :param filename: The pathname of the file to touch (a string). + + Note that missing directories are automatically created using + :func:`make_dirs()`. + """ + make_dirs(os.path.dirname(filename)) + with open(filename, 'a'): + os.utime(filename, None) + + +class CallableTimedOut(Exception): + + """Raised by :func:`retry()` when the timeout expires.""" + + +class ContextManager(object): + + """Base class to enable composition of context managers.""" + + def __enter__(self): + """Enable use as context managers.""" + return self + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + """Enable use as context managers.""" + + +class PatchedAttribute(ContextManager): + + """Context manager that temporary replaces an object attribute using :func:`setattr()`.""" + + def __init__(self, obj, name, value): + """ + Initialize a :class:`PatchedAttribute` object. + + :param obj: The object to patch. + :param name: An attribute name. + :param value: The value to set. + """ + self.object_to_patch = obj + self.attribute_to_patch = name + self.patched_value = value + self.original_value = NOTHING + + def __enter__(self): + """ + Replace (patch) the attribute. + + :returns: The object whose attribute was patched. + """ + # Enable composition of context managers. + super(PatchedAttribute, self).__enter__() + # Patch the object's attribute. + self.original_value = getattr(self.object_to_patch, self.attribute_to_patch, NOTHING) + setattr(self.object_to_patch, self.attribute_to_patch, self.patched_value) + return self.object_to_patch + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + """Restore the attribute to its original value.""" + # Enable composition of context managers. + super(PatchedAttribute, self).__exit__(exc_type, exc_value, traceback) + # Restore the object's attribute. + if self.original_value is NOTHING: + delattr(self.object_to_patch, self.attribute_to_patch) + else: + setattr(self.object_to_patch, self.attribute_to_patch, self.original_value) + + +class PatchedItem(ContextManager): + + """Context manager that temporary replaces an object item using :meth:`~object.__setitem__()`.""" + + def __init__(self, obj, item, value): + """ + Initialize a :class:`PatchedItem` object. + + :param obj: The object to patch. + :param item: The item to patch. + :param value: The value to set. + """ + self.object_to_patch = obj + self.item_to_patch = item + self.patched_value = value + self.original_value = NOTHING + + def __enter__(self): + """ + Replace (patch) the item. + + :returns: The object whose item was patched. + """ + # Enable composition of context managers. + super(PatchedItem, self).__enter__() + # Patch the object's item. + try: + self.original_value = self.object_to_patch[self.item_to_patch] + except KeyError: + self.original_value = NOTHING + self.object_to_patch[self.item_to_patch] = self.patched_value + return self.object_to_patch + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + """Restore the item to its original value.""" + # Enable composition of context managers. + super(PatchedItem, self).__exit__(exc_type, exc_value, traceback) + # Restore the object's item. + if self.original_value is NOTHING: + del self.object_to_patch[self.item_to_patch] + else: + self.object_to_patch[self.item_to_patch] = self.original_value + + +class TemporaryDirectory(ContextManager): + + """ + Easy temporary directory creation & cleanup using the :keyword:`with` statement. + + Here's an example of how to use this: + + .. code-block:: python + + with TemporaryDirectory() as directory: + # Do something useful here. + assert os.path.isdir(directory) + """ + + def __init__(self, **options): + """ + Initialize a :class:`TemporaryDirectory` object. + + :param options: Any keyword arguments are passed on to + :func:`tempfile.mkdtemp()`. + """ + self.mkdtemp_options = options + self.temporary_directory = None + + def __enter__(self): + """ + Create the temporary directory using :func:`tempfile.mkdtemp()`. + + :returns: The pathname of the directory (a string). + """ + # Enable composition of context managers. + super(TemporaryDirectory, self).__enter__() + # Create the temporary directory. + self.temporary_directory = tempfile.mkdtemp(**self.mkdtemp_options) + return self.temporary_directory + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + """Cleanup the temporary directory using :func:`shutil.rmtree()`.""" + # Enable composition of context managers. + super(TemporaryDirectory, self).__exit__(exc_type, exc_value, traceback) + # Cleanup the temporary directory. + if self.temporary_directory is not None: + shutil.rmtree(self.temporary_directory) + self.temporary_directory = None + + +class MockedHomeDirectory(PatchedItem, TemporaryDirectory): + + """ + Context manager to temporarily change ``$HOME`` (the current user's profile directory). + + This class is a composition of the :class:`PatchedItem` and + :class:`TemporaryDirectory` context managers. + """ + + def __init__(self): + """Initialize a :class:`MockedHomeDirectory` object.""" + PatchedItem.__init__(self, os.environ, 'HOME', os.environ.get('HOME')) + TemporaryDirectory.__init__(self) + + def __enter__(self): + """ + Activate the custom ``$PATH``. + + :returns: The pathname of the directory that has + been added to ``$PATH`` (a string). + """ + # Get the temporary directory. + directory = TemporaryDirectory.__enter__(self) + # Override the value to patch now that we have + # the pathname of the temporary directory. + self.patched_value = directory + # Temporary patch $HOME. + PatchedItem.__enter__(self) + # Pass the pathname of the temporary directory to the caller. + return directory + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + """Deactivate the custom ``$HOME``.""" + super(MockedHomeDirectory, self).__exit__(exc_type, exc_value, traceback) + + +class CustomSearchPath(PatchedItem, TemporaryDirectory): + + """ + Context manager to temporarily customize ``$PATH`` (the executable search path). + + This class is a composition of the :class:`PatchedItem` and + :class:`TemporaryDirectory` context managers. + """ + + def __init__(self, isolated=False): + """ + Initialize a :class:`CustomSearchPath` object. + + :param isolated: :data:`True` to clear the original search path, + :data:`False` to add the temporary directory to the + start of the search path. + """ + # Initialize our own instance variables. + self.isolated_search_path = isolated + # Selectively initialize our superclasses. + PatchedItem.__init__(self, os.environ, 'PATH', self.current_search_path) + TemporaryDirectory.__init__(self) + + def __enter__(self): + """ + Activate the custom ``$PATH``. + + :returns: The pathname of the directory that has + been added to ``$PATH`` (a string). + """ + # Get the temporary directory. + directory = TemporaryDirectory.__enter__(self) + # Override the value to patch now that we have + # the pathname of the temporary directory. + self.patched_value = ( + directory if self.isolated_search_path + else os.pathsep.join([directory] + self.current_search_path.split(os.pathsep)) + ) + # Temporary patch the $PATH. + PatchedItem.__enter__(self) + # Pass the pathname of the temporary directory to the caller + # because they may want to `install' custom executables. + return directory + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + """Deactivate the custom ``$PATH``.""" + super(CustomSearchPath, self).__exit__(exc_type, exc_value, traceback) + + @property + def current_search_path(self): + """The value of ``$PATH`` or :data:`os.defpath` (a string).""" + return os.environ.get('PATH', os.defpath) + + +class MockedProgram(CustomSearchPath): + + """ + Context manager to mock the existence of a program (executable). + + This class extends the functionality of :class:`CustomSearchPath`. + """ + + def __init__(self, name, returncode=0, script=None): + """ + Initialize a :class:`MockedProgram` object. + + :param name: The name of the program (a string). + :param returncode: The return code that the program should emit (a + number, defaults to zero). + :param script: Shell script code to include in the mocked program (a + string or :data:`None`). This can be used to mock a + program that is expected to generate specific output. + """ + # Initialize our own instance variables. + self.program_name = name + self.program_returncode = returncode + self.program_script = script + self.program_signal_file = None + # Initialize our superclasses. + super(MockedProgram, self).__init__() + + def __enter__(self): + """ + Create the mock program. + + :returns: The pathname of the directory that has + been added to ``$PATH`` (a string). + """ + directory = super(MockedProgram, self).__enter__() + self.program_signal_file = os.path.join(directory, 'program-was-run-%s' % random_string(10)) + pathname = os.path.join(directory, self.program_name) + with open(pathname, 'w') as handle: + handle.write('#!/bin/sh\n') + handle.write('echo > %s\n' % pipes.quote(self.program_signal_file)) + if self.program_script: + handle.write('%s\n' % self.program_script.strip()) + handle.write('exit %i\n' % self.program_returncode) + os.chmod(pathname, 0o755) + return directory + + def __exit__(self, *args, **kw): + """ + Ensure that the mock program was run. + + :raises: :exc:`~exceptions.AssertionError` when + the mock program hasn't been run. + """ + try: + assert self.program_signal_file and os.path.isfile(self.program_signal_file), \ + ("It looks like %r was never run!" % self.program_name) + finally: + return super(MockedProgram, self).__exit__(*args, **kw) + + +class CaptureOutput(ContextManager): + + """ + Context manager that captures what's written to :data:`sys.stdout` and :data:`sys.stderr`. + + .. attribute:: stdin + + The :class:`~humanfriendly.compat.StringIO` object used to feed the standard input stream. + + .. attribute:: stdout + + The :class:`CaptureBuffer` object used to capture the standard output stream. + + .. attribute:: stderr + + The :class:`CaptureBuffer` object used to capture the standard error stream. + """ + + def __init__(self, merged=False, input='', enabled=True): + """ + Initialize a :class:`CaptureOutput` object. + + :param merged: :data:`True` to merge the streams, + :data:`False` to capture them separately. + :param input: The data that reads from :data:`sys.stdin` + should return (a string). + :param enabled: :data:`True` to enable capturing (the default), + :data:`False` otherwise. This makes it easy to + unconditionally use :class:`CaptureOutput` in + a :keyword:`with` block while preserving the + choice to opt out of capturing output. + """ + self.stdin = StringIO(input) + self.stdout = CaptureBuffer() + self.stderr = self.stdout if merged else CaptureBuffer() + self.patched_attributes = [] + if enabled: + self.patched_attributes.extend( + PatchedAttribute(sys, name, getattr(self, name)) + for name in ('stdin', 'stdout', 'stderr') + ) + + def __enter__(self): + """Start capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`.""" + super(CaptureOutput, self).__enter__() + for context in self.patched_attributes: + context.__enter__() + return self + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + """Stop capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`.""" + super(CaptureOutput, self).__exit__(exc_type, exc_value, traceback) + for context in self.patched_attributes: + context.__exit__(exc_type, exc_value, traceback) + + def get_lines(self): + """Get the contents of :attr:`stdout` split into separate lines.""" + return self.get_text().splitlines() + + def get_text(self): + """Get the contents of :attr:`stdout` as a Unicode string.""" + return self.stdout.get_text() + + def getvalue(self): + """Get the text written to :data:`sys.stdout`.""" + return self.stdout.getvalue() + + +class CaptureBuffer(StringIO): + + """ + Helper for :class:`CaptureOutput` to provide an easy to use API. + + The two methods defined by this subclass were specifically chosen to match + the names of the methods provided by my :pypi:`capturer` package which + serves a similar role as :class:`CaptureOutput` but knows how to simulate + an interactive terminal (tty). + """ + + def get_lines(self): + """Get the contents of the buffer split into separate lines.""" + return self.get_text().splitlines() + + def get_text(self): + """Get the contents of the buffer as a Unicode string.""" + return self.getvalue() + + +class TestCase(unittest.TestCase): + + """Subclass of :class:`unittest.TestCase` with automatic logging and other miscellaneous features.""" + + def __init__(self, *args, **kw): + """ + Initialize a :class:`TestCase` object. + + Any positional and/or keyword arguments are passed on to the + initializer of the superclass. + """ + super(TestCase, self).__init__(*args, **kw) + + def setUp(self, log_level=logging.DEBUG): + """setUp(log_level=logging.DEBUG) + Automatically configure logging to the terminal. + + :param log_level: Refer to :func:`configure_logging()`. + + The :func:`setUp()` method is automatically called by + :class:`unittest.TestCase` before each test method starts. + It does two things: + + - Logging to the terminal is configured using + :func:`configure_logging()`. + + - Before the test method starts a newline is emitted, to separate the + name of the test method (which will be printed to the terminal by + :mod:`unittest` or :pypi:`pytest`) from the first line of logging + output that the test method is likely going to generate. + """ + # Configure logging to the terminal. + configure_logging(log_level) + # Separate the name of the test method (printed by the superclass + # and/or py.test without a newline at the end) from the first line of + # logging output that the test method is likely going to generate. + sys.stderr.write("\n")