Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/humanfriendly/testing.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
comparison
equal
deleted
inserted
replaced
| 4:79f47841a781 | 5:9b1c78e6ba9c |
|---|---|
| 1 # Human friendly input/output in Python. | |
| 2 # | |
| 3 # Author: Peter Odding <peter@peterodding.com> | |
| 4 # Last Change: March 6, 2020 | |
| 5 # URL: https://humanfriendly.readthedocs.io | |
| 6 | |
| 7 """ | |
| 8 Utility classes and functions that make it easy to write :mod:`unittest` compatible test suites. | |
| 9 | |
| 10 Over the years I've developed the habit of writing test suites for Python | |
| 11 projects using the :mod:`unittest` module. During those years I've come to know | |
| 12 :pypi:`pytest` and in fact I use :pypi:`pytest` to run my test suites (due to | |
| 13 its much better error reporting) but I've yet to publish a test suite that | |
| 14 *requires* :pypi:`pytest`. I have several reasons for doing so: | |
| 15 | |
| 16 - It's nice to keep my test suites as simple and accessible as possible and | |
| 17 not requiring a specific test runner is part of that attitude. | |
| 18 | |
| 19 - Whereas :mod:`unittest` is quite explicit, :pypi:`pytest` contains a lot of | |
| 20 magic, which kind of contradicts the Python mantra "explicit is better than | |
| 21 implicit" (IMHO). | |
| 22 """ | |
| 23 | |
| 24 # Standard library module | |
| 25 import functools | |
| 26 import logging | |
| 27 import os | |
| 28 import pipes | |
| 29 import shutil | |
| 30 import sys | |
| 31 import tempfile | |
| 32 import time | |
| 33 import unittest | |
| 34 | |
| 35 # Modules included in our package. | |
| 36 from humanfriendly.compat import StringIO | |
| 37 from humanfriendly.text import random_string | |
| 38 | |
| 39 # Initialize a logger for this module. | |
| 40 logger = logging.getLogger(__name__) | |
| 41 | |
| 42 # A unique object reference used to detect missing attributes. | |
| 43 NOTHING = object() | |
| 44 | |
| 45 # Public identifiers that require documentation. | |
| 46 __all__ = ( | |
| 47 'CallableTimedOut', | |
| 48 'CaptureBuffer', | |
| 49 'CaptureOutput', | |
| 50 'ContextManager', | |
| 51 'CustomSearchPath', | |
| 52 'MockedProgram', | |
| 53 'PatchedAttribute', | |
| 54 'PatchedItem', | |
| 55 'TemporaryDirectory', | |
| 56 'TestCase', | |
| 57 'configure_logging', | |
| 58 'make_dirs', | |
| 59 'retry', | |
| 60 'run_cli', | |
| 61 'skip_on_raise', | |
| 62 'touch', | |
| 63 ) | |
| 64 | |
| 65 | |
| 66 def configure_logging(log_level=logging.DEBUG): | |
| 67 """configure_logging(log_level=logging.DEBUG) | |
| 68 Automatically configure logging to the terminal. | |
| 69 | |
| 70 :param log_level: The log verbosity (a number, defaults | |
| 71 to :mod:`logging.DEBUG <logging>`). | |
| 72 | |
| 73 When :mod:`coloredlogs` is installed :func:`coloredlogs.install()` will be | |
| 74 used to configure logging to the terminal. When this fails with an | |
| 75 :exc:`~exceptions.ImportError` then :func:`logging.basicConfig()` is used | |
| 76 as a fall back. | |
| 77 """ | |
| 78 try: | |
| 79 import coloredlogs | |
| 80 coloredlogs.install(level=log_level) | |
| 81 except ImportError: | |
| 82 logging.basicConfig( | |
| 83 level=log_level, | |
| 84 format='%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s', | |
| 85 datefmt='%Y-%m-%d %H:%M:%S') | |
| 86 | |
| 87 | |
| 88 def make_dirs(pathname): | |
| 89 """ | |
| 90 Create missing directories. | |
| 91 | |
| 92 :param pathname: The pathname of a directory (a string). | |
| 93 """ | |
| 94 if not os.path.isdir(pathname): | |
| 95 os.makedirs(pathname) | |
| 96 | |
| 97 | |
| 98 def retry(func, timeout=60, exc_type=AssertionError): | |
| 99 """retry(func, timeout=60, exc_type=AssertionError) | |
| 100 Retry a function until assertions no longer fail. | |
| 101 | |
| 102 :param func: A callable. When the callable returns | |
| 103 :data:`False` it will also be retried. | |
| 104 :param timeout: The number of seconds after which to abort (a number, | |
| 105 defaults to 60). | |
| 106 :param exc_type: The type of exceptions to retry (defaults | |
| 107 to :exc:`~exceptions.AssertionError`). | |
| 108 :returns: The value returned by `func`. | |
| 109 :raises: Once the timeout has expired :func:`retry()` will raise the | |
| 110 previously retried assertion error. When `func` keeps returning | |
| 111 :data:`False` until `timeout` expires :exc:`CallableTimedOut` | |
| 112 will be raised. | |
| 113 | |
| 114 This function sleeps between retries to avoid claiming CPU cycles we don't | |
| 115 need. It starts by sleeping for 0.1 second but adjusts this to one second | |
| 116 as the number of retries grows. | |
| 117 """ | |
| 118 pause = 0.1 | |
| 119 timeout += time.time() | |
| 120 while True: | |
| 121 try: | |
| 122 result = func() | |
| 123 if result is not False: | |
| 124 return result | |
| 125 except exc_type: | |
| 126 if time.time() > timeout: | |
| 127 raise | |
| 128 else: | |
| 129 if time.time() > timeout: | |
| 130 raise CallableTimedOut() | |
| 131 time.sleep(pause) | |
| 132 if pause < 1: | |
| 133 pause *= 2 | |
| 134 | |
| 135 | |
| 136 def run_cli(entry_point, *arguments, **options): | |
| 137 """ | |
| 138 Test a command line entry point. | |
| 139 | |
| 140 :param entry_point: The function that implements the command line interface | |
| 141 (a callable). | |
| 142 :param arguments: Any positional arguments (strings) become the command | |
| 143 line arguments (:data:`sys.argv` items 1-N). | |
| 144 :param options: The following keyword arguments are supported: | |
| 145 | |
| 146 **capture** | |
| 147 Whether to use :class:`CaptureOutput`. Defaults | |
| 148 to :data:`True` but can be disabled by passing | |
| 149 :data:`False` instead. | |
| 150 **input** | |
| 151 Refer to :class:`CaptureOutput`. | |
| 152 **merged** | |
| 153 Refer to :class:`CaptureOutput`. | |
| 154 **program_name** | |
| 155 Used to set :data:`sys.argv` item 0. | |
| 156 :returns: A tuple with two values: | |
| 157 | |
| 158 1. The return code (an integer). | |
| 159 2. The captured output (a string). | |
| 160 """ | |
| 161 # Add the `program_name' option to the arguments. | |
| 162 arguments = list(arguments) | |
| 163 arguments.insert(0, options.pop('program_name', sys.executable)) | |
| 164 # Log the command line arguments (and the fact that we're about to call the | |
| 165 # command line entry point function). | |
| 166 logger.debug("Calling command line entry point with arguments: %s", arguments) | |
| 167 # Prepare to capture the return code and output even if the command line | |
| 168 # interface raises an exception (whether the exception type is SystemExit | |
| 169 # or something else). | |
| 170 returncode = 0 | |
| 171 stdout = None | |
| 172 stderr = None | |
| 173 try: | |
| 174 # Temporarily override sys.argv. | |
| 175 with PatchedAttribute(sys, 'argv', arguments): | |
| 176 # Manipulate the standard input/output/error streams? | |
| 177 options['enabled'] = options.pop('capture', True) | |
| 178 with CaptureOutput(**options) as capturer: | |
| 179 try: | |
| 180 # Call the command line interface. | |
| 181 entry_point() | |
| 182 finally: | |
| 183 # Get the output even if an exception is raised. | |
| 184 stdout = capturer.stdout.getvalue() | |
| 185 stderr = capturer.stderr.getvalue() | |
| 186 # Reconfigure logging to the terminal because it is very | |
| 187 # likely that the entry point function has changed the | |
| 188 # configured log level. | |
| 189 configure_logging() | |
| 190 except BaseException as e: | |
| 191 if isinstance(e, SystemExit): | |
| 192 logger.debug("Intercepting return code %s from SystemExit exception.", e.code) | |
| 193 returncode = e.code | |
| 194 else: | |
| 195 logger.warning("Defaulting return code to 1 due to raised exception.", exc_info=True) | |
| 196 returncode = 1 | |
| 197 else: | |
| 198 logger.debug("Command line entry point returned successfully!") | |
| 199 # Always log the output captured on stdout/stderr, to make it easier to | |
| 200 # diagnose test failures (but avoid duplicate logging when merged=True). | |
| 201 is_merged = options.get('merged', False) | |
| 202 merged_streams = [('merged streams', stdout)] | |
| 203 separate_streams = [('stdout', stdout), ('stderr', stderr)] | |
| 204 streams = merged_streams if is_merged else separate_streams | |
| 205 for name, value in streams: | |
| 206 if value: | |
| 207 logger.debug("Output on %s:\n%s", name, value) | |
| 208 else: | |
| 209 logger.debug("No output on %s.", name) | |
| 210 return returncode, stdout | |
| 211 | |
| 212 | |
| 213 def skip_on_raise(*exc_types): | |
| 214 """ | |
| 215 Decorate a test function to translation specific exception types to :exc:`unittest.SkipTest`. | |
| 216 | |
| 217 :param exc_types: One or more positional arguments give the exception | |
| 218 types to be translated to :exc:`unittest.SkipTest`. | |
| 219 :returns: A decorator function specialized to `exc_types`. | |
| 220 """ | |
| 221 def decorator(function): | |
| 222 @functools.wraps(function) | |
| 223 def wrapper(*args, **kw): | |
| 224 try: | |
| 225 return function(*args, **kw) | |
| 226 except exc_types as e: | |
| 227 logger.debug("Translating exception to unittest.SkipTest ..", exc_info=True) | |
| 228 raise unittest.SkipTest("skipping test because %s was raised" % type(e)) | |
| 229 return wrapper | |
| 230 return decorator | |
| 231 | |
| 232 | |
| 233 def touch(filename): | |
| 234 """ | |
| 235 The equivalent of the UNIX :man:`touch` program in Python. | |
| 236 | |
| 237 :param filename: The pathname of the file to touch (a string). | |
| 238 | |
| 239 Note that missing directories are automatically created using | |
| 240 :func:`make_dirs()`. | |
| 241 """ | |
| 242 make_dirs(os.path.dirname(filename)) | |
| 243 with open(filename, 'a'): | |
| 244 os.utime(filename, None) | |
| 245 | |
| 246 | |
| 247 class CallableTimedOut(Exception): | |
| 248 | |
| 249 """Raised by :func:`retry()` when the timeout expires.""" | |
| 250 | |
| 251 | |
| 252 class ContextManager(object): | |
| 253 | |
| 254 """Base class to enable composition of context managers.""" | |
| 255 | |
| 256 def __enter__(self): | |
| 257 """Enable use as context managers.""" | |
| 258 return self | |
| 259 | |
| 260 def __exit__(self, exc_type=None, exc_value=None, traceback=None): | |
| 261 """Enable use as context managers.""" | |
| 262 | |
| 263 | |
| 264 class PatchedAttribute(ContextManager): | |
| 265 | |
| 266 """Context manager that temporary replaces an object attribute using :func:`setattr()`.""" | |
| 267 | |
| 268 def __init__(self, obj, name, value): | |
| 269 """ | |
| 270 Initialize a :class:`PatchedAttribute` object. | |
| 271 | |
| 272 :param obj: The object to patch. | |
| 273 :param name: An attribute name. | |
| 274 :param value: The value to set. | |
| 275 """ | |
| 276 self.object_to_patch = obj | |
| 277 self.attribute_to_patch = name | |
| 278 self.patched_value = value | |
| 279 self.original_value = NOTHING | |
| 280 | |
| 281 def __enter__(self): | |
| 282 """ | |
| 283 Replace (patch) the attribute. | |
| 284 | |
| 285 :returns: The object whose attribute was patched. | |
| 286 """ | |
| 287 # Enable composition of context managers. | |
| 288 super(PatchedAttribute, self).__enter__() | |
| 289 # Patch the object's attribute. | |
| 290 self.original_value = getattr(self.object_to_patch, self.attribute_to_patch, NOTHING) | |
| 291 setattr(self.object_to_patch, self.attribute_to_patch, self.patched_value) | |
| 292 return self.object_to_patch | |
| 293 | |
| 294 def __exit__(self, exc_type=None, exc_value=None, traceback=None): | |
| 295 """Restore the attribute to its original value.""" | |
| 296 # Enable composition of context managers. | |
| 297 super(PatchedAttribute, self).__exit__(exc_type, exc_value, traceback) | |
| 298 # Restore the object's attribute. | |
| 299 if self.original_value is NOTHING: | |
| 300 delattr(self.object_to_patch, self.attribute_to_patch) | |
| 301 else: | |
| 302 setattr(self.object_to_patch, self.attribute_to_patch, self.original_value) | |
| 303 | |
| 304 | |
| 305 class PatchedItem(ContextManager): | |
| 306 | |
| 307 """Context manager that temporary replaces an object item using :meth:`~object.__setitem__()`.""" | |
| 308 | |
| 309 def __init__(self, obj, item, value): | |
| 310 """ | |
| 311 Initialize a :class:`PatchedItem` object. | |
| 312 | |
| 313 :param obj: The object to patch. | |
| 314 :param item: The item to patch. | |
| 315 :param value: The value to set. | |
| 316 """ | |
| 317 self.object_to_patch = obj | |
| 318 self.item_to_patch = item | |
| 319 self.patched_value = value | |
| 320 self.original_value = NOTHING | |
| 321 | |
| 322 def __enter__(self): | |
| 323 """ | |
| 324 Replace (patch) the item. | |
| 325 | |
| 326 :returns: The object whose item was patched. | |
| 327 """ | |
| 328 # Enable composition of context managers. | |
| 329 super(PatchedItem, self).__enter__() | |
| 330 # Patch the object's item. | |
| 331 try: | |
| 332 self.original_value = self.object_to_patch[self.item_to_patch] | |
| 333 except KeyError: | |
| 334 self.original_value = NOTHING | |
| 335 self.object_to_patch[self.item_to_patch] = self.patched_value | |
| 336 return self.object_to_patch | |
| 337 | |
| 338 def __exit__(self, exc_type=None, exc_value=None, traceback=None): | |
| 339 """Restore the item to its original value.""" | |
| 340 # Enable composition of context managers. | |
| 341 super(PatchedItem, self).__exit__(exc_type, exc_value, traceback) | |
| 342 # Restore the object's item. | |
| 343 if self.original_value is NOTHING: | |
| 344 del self.object_to_patch[self.item_to_patch] | |
| 345 else: | |
| 346 self.object_to_patch[self.item_to_patch] = self.original_value | |
| 347 | |
| 348 | |
| 349 class TemporaryDirectory(ContextManager): | |
| 350 | |
| 351 """ | |
| 352 Easy temporary directory creation & cleanup using the :keyword:`with` statement. | |
| 353 | |
| 354 Here's an example of how to use this: | |
| 355 | |
| 356 .. code-block:: python | |
| 357 | |
| 358 with TemporaryDirectory() as directory: | |
| 359 # Do something useful here. | |
| 360 assert os.path.isdir(directory) | |
| 361 """ | |
| 362 | |
| 363 def __init__(self, **options): | |
| 364 """ | |
| 365 Initialize a :class:`TemporaryDirectory` object. | |
| 366 | |
| 367 :param options: Any keyword arguments are passed on to | |
| 368 :func:`tempfile.mkdtemp()`. | |
| 369 """ | |
| 370 self.mkdtemp_options = options | |
| 371 self.temporary_directory = None | |
| 372 | |
| 373 def __enter__(self): | |
| 374 """ | |
| 375 Create the temporary directory using :func:`tempfile.mkdtemp()`. | |
| 376 | |
| 377 :returns: The pathname of the directory (a string). | |
| 378 """ | |
| 379 # Enable composition of context managers. | |
| 380 super(TemporaryDirectory, self).__enter__() | |
| 381 # Create the temporary directory. | |
| 382 self.temporary_directory = tempfile.mkdtemp(**self.mkdtemp_options) | |
| 383 return self.temporary_directory | |
| 384 | |
| 385 def __exit__(self, exc_type=None, exc_value=None, traceback=None): | |
| 386 """Cleanup the temporary directory using :func:`shutil.rmtree()`.""" | |
| 387 # Enable composition of context managers. | |
| 388 super(TemporaryDirectory, self).__exit__(exc_type, exc_value, traceback) | |
| 389 # Cleanup the temporary directory. | |
| 390 if self.temporary_directory is not None: | |
| 391 shutil.rmtree(self.temporary_directory) | |
| 392 self.temporary_directory = None | |
| 393 | |
| 394 | |
| 395 class MockedHomeDirectory(PatchedItem, TemporaryDirectory): | |
| 396 | |
| 397 """ | |
| 398 Context manager to temporarily change ``$HOME`` (the current user's profile directory). | |
| 399 | |
| 400 This class is a composition of the :class:`PatchedItem` and | |
| 401 :class:`TemporaryDirectory` context managers. | |
| 402 """ | |
| 403 | |
| 404 def __init__(self): | |
| 405 """Initialize a :class:`MockedHomeDirectory` object.""" | |
| 406 PatchedItem.__init__(self, os.environ, 'HOME', os.environ.get('HOME')) | |
| 407 TemporaryDirectory.__init__(self) | |
| 408 | |
| 409 def __enter__(self): | |
| 410 """ | |
| 411 Activate the custom ``$PATH``. | |
| 412 | |
| 413 :returns: The pathname of the directory that has | |
| 414 been added to ``$PATH`` (a string). | |
| 415 """ | |
| 416 # Get the temporary directory. | |
| 417 directory = TemporaryDirectory.__enter__(self) | |
| 418 # Override the value to patch now that we have | |
| 419 # the pathname of the temporary directory. | |
| 420 self.patched_value = directory | |
| 421 # Temporary patch $HOME. | |
| 422 PatchedItem.__enter__(self) | |
| 423 # Pass the pathname of the temporary directory to the caller. | |
| 424 return directory | |
| 425 | |
| 426 def __exit__(self, exc_type=None, exc_value=None, traceback=None): | |
| 427 """Deactivate the custom ``$HOME``.""" | |
| 428 super(MockedHomeDirectory, self).__exit__(exc_type, exc_value, traceback) | |
| 429 | |
| 430 | |
| 431 class CustomSearchPath(PatchedItem, TemporaryDirectory): | |
| 432 | |
| 433 """ | |
| 434 Context manager to temporarily customize ``$PATH`` (the executable search path). | |
| 435 | |
| 436 This class is a composition of the :class:`PatchedItem` and | |
| 437 :class:`TemporaryDirectory` context managers. | |
| 438 """ | |
| 439 | |
| 440 def __init__(self, isolated=False): | |
| 441 """ | |
| 442 Initialize a :class:`CustomSearchPath` object. | |
| 443 | |
| 444 :param isolated: :data:`True` to clear the original search path, | |
| 445 :data:`False` to add the temporary directory to the | |
| 446 start of the search path. | |
| 447 """ | |
| 448 # Initialize our own instance variables. | |
| 449 self.isolated_search_path = isolated | |
| 450 # Selectively initialize our superclasses. | |
| 451 PatchedItem.__init__(self, os.environ, 'PATH', self.current_search_path) | |
| 452 TemporaryDirectory.__init__(self) | |
| 453 | |
| 454 def __enter__(self): | |
| 455 """ | |
| 456 Activate the custom ``$PATH``. | |
| 457 | |
| 458 :returns: The pathname of the directory that has | |
| 459 been added to ``$PATH`` (a string). | |
| 460 """ | |
| 461 # Get the temporary directory. | |
| 462 directory = TemporaryDirectory.__enter__(self) | |
| 463 # Override the value to patch now that we have | |
| 464 # the pathname of the temporary directory. | |
| 465 self.patched_value = ( | |
| 466 directory if self.isolated_search_path | |
| 467 else os.pathsep.join([directory] + self.current_search_path.split(os.pathsep)) | |
| 468 ) | |
| 469 # Temporary patch the $PATH. | |
| 470 PatchedItem.__enter__(self) | |
| 471 # Pass the pathname of the temporary directory to the caller | |
| 472 # because they may want to `install' custom executables. | |
| 473 return directory | |
| 474 | |
| 475 def __exit__(self, exc_type=None, exc_value=None, traceback=None): | |
| 476 """Deactivate the custom ``$PATH``.""" | |
| 477 super(CustomSearchPath, self).__exit__(exc_type, exc_value, traceback) | |
| 478 | |
| 479 @property | |
| 480 def current_search_path(self): | |
| 481 """The value of ``$PATH`` or :data:`os.defpath` (a string).""" | |
| 482 return os.environ.get('PATH', os.defpath) | |
| 483 | |
| 484 | |
| 485 class MockedProgram(CustomSearchPath): | |
| 486 | |
| 487 """ | |
| 488 Context manager to mock the existence of a program (executable). | |
| 489 | |
| 490 This class extends the functionality of :class:`CustomSearchPath`. | |
| 491 """ | |
| 492 | |
| 493 def __init__(self, name, returncode=0, script=None): | |
| 494 """ | |
| 495 Initialize a :class:`MockedProgram` object. | |
| 496 | |
| 497 :param name: The name of the program (a string). | |
| 498 :param returncode: The return code that the program should emit (a | |
| 499 number, defaults to zero). | |
| 500 :param script: Shell script code to include in the mocked program (a | |
| 501 string or :data:`None`). This can be used to mock a | |
| 502 program that is expected to generate specific output. | |
| 503 """ | |
| 504 # Initialize our own instance variables. | |
| 505 self.program_name = name | |
| 506 self.program_returncode = returncode | |
| 507 self.program_script = script | |
| 508 self.program_signal_file = None | |
| 509 # Initialize our superclasses. | |
| 510 super(MockedProgram, self).__init__() | |
| 511 | |
| 512 def __enter__(self): | |
| 513 """ | |
| 514 Create the mock program. | |
| 515 | |
| 516 :returns: The pathname of the directory that has | |
| 517 been added to ``$PATH`` (a string). | |
| 518 """ | |
| 519 directory = super(MockedProgram, self).__enter__() | |
| 520 self.program_signal_file = os.path.join(directory, 'program-was-run-%s' % random_string(10)) | |
| 521 pathname = os.path.join(directory, self.program_name) | |
| 522 with open(pathname, 'w') as handle: | |
| 523 handle.write('#!/bin/sh\n') | |
| 524 handle.write('echo > %s\n' % pipes.quote(self.program_signal_file)) | |
| 525 if self.program_script: | |
| 526 handle.write('%s\n' % self.program_script.strip()) | |
| 527 handle.write('exit %i\n' % self.program_returncode) | |
| 528 os.chmod(pathname, 0o755) | |
| 529 return directory | |
| 530 | |
| 531 def __exit__(self, *args, **kw): | |
| 532 """ | |
| 533 Ensure that the mock program was run. | |
| 534 | |
| 535 :raises: :exc:`~exceptions.AssertionError` when | |
| 536 the mock program hasn't been run. | |
| 537 """ | |
| 538 try: | |
| 539 assert self.program_signal_file and os.path.isfile(self.program_signal_file), \ | |
| 540 ("It looks like %r was never run!" % self.program_name) | |
| 541 finally: | |
| 542 return super(MockedProgram, self).__exit__(*args, **kw) | |
| 543 | |
| 544 | |
| 545 class CaptureOutput(ContextManager): | |
| 546 | |
| 547 """ | |
| 548 Context manager that captures what's written to :data:`sys.stdout` and :data:`sys.stderr`. | |
| 549 | |
| 550 .. attribute:: stdin | |
| 551 | |
| 552 The :class:`~humanfriendly.compat.StringIO` object used to feed the standard input stream. | |
| 553 | |
| 554 .. attribute:: stdout | |
| 555 | |
| 556 The :class:`CaptureBuffer` object used to capture the standard output stream. | |
| 557 | |
| 558 .. attribute:: stderr | |
| 559 | |
| 560 The :class:`CaptureBuffer` object used to capture the standard error stream. | |
| 561 """ | |
| 562 | |
| 563 def __init__(self, merged=False, input='', enabled=True): | |
| 564 """ | |
| 565 Initialize a :class:`CaptureOutput` object. | |
| 566 | |
| 567 :param merged: :data:`True` to merge the streams, | |
| 568 :data:`False` to capture them separately. | |
| 569 :param input: The data that reads from :data:`sys.stdin` | |
| 570 should return (a string). | |
| 571 :param enabled: :data:`True` to enable capturing (the default), | |
| 572 :data:`False` otherwise. This makes it easy to | |
| 573 unconditionally use :class:`CaptureOutput` in | |
| 574 a :keyword:`with` block while preserving the | |
| 575 choice to opt out of capturing output. | |
| 576 """ | |
| 577 self.stdin = StringIO(input) | |
| 578 self.stdout = CaptureBuffer() | |
| 579 self.stderr = self.stdout if merged else CaptureBuffer() | |
| 580 self.patched_attributes = [] | |
| 581 if enabled: | |
| 582 self.patched_attributes.extend( | |
| 583 PatchedAttribute(sys, name, getattr(self, name)) | |
| 584 for name in ('stdin', 'stdout', 'stderr') | |
| 585 ) | |
| 586 | |
| 587 def __enter__(self): | |
| 588 """Start capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`.""" | |
| 589 super(CaptureOutput, self).__enter__() | |
| 590 for context in self.patched_attributes: | |
| 591 context.__enter__() | |
| 592 return self | |
| 593 | |
| 594 def __exit__(self, exc_type=None, exc_value=None, traceback=None): | |
| 595 """Stop capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`.""" | |
| 596 super(CaptureOutput, self).__exit__(exc_type, exc_value, traceback) | |
| 597 for context in self.patched_attributes: | |
| 598 context.__exit__(exc_type, exc_value, traceback) | |
| 599 | |
| 600 def get_lines(self): | |
| 601 """Get the contents of :attr:`stdout` split into separate lines.""" | |
| 602 return self.get_text().splitlines() | |
| 603 | |
| 604 def get_text(self): | |
| 605 """Get the contents of :attr:`stdout` as a Unicode string.""" | |
| 606 return self.stdout.get_text() | |
| 607 | |
| 608 def getvalue(self): | |
| 609 """Get the text written to :data:`sys.stdout`.""" | |
| 610 return self.stdout.getvalue() | |
| 611 | |
| 612 | |
| 613 class CaptureBuffer(StringIO): | |
| 614 | |
| 615 """ | |
| 616 Helper for :class:`CaptureOutput` to provide an easy to use API. | |
| 617 | |
| 618 The two methods defined by this subclass were specifically chosen to match | |
| 619 the names of the methods provided by my :pypi:`capturer` package which | |
| 620 serves a similar role as :class:`CaptureOutput` but knows how to simulate | |
| 621 an interactive terminal (tty). | |
| 622 """ | |
| 623 | |
| 624 def get_lines(self): | |
| 625 """Get the contents of the buffer split into separate lines.""" | |
| 626 return self.get_text().splitlines() | |
| 627 | |
| 628 def get_text(self): | |
| 629 """Get the contents of the buffer as a Unicode string.""" | |
| 630 return self.getvalue() | |
| 631 | |
| 632 | |
| 633 class TestCase(unittest.TestCase): | |
| 634 | |
| 635 """Subclass of :class:`unittest.TestCase` with automatic logging and other miscellaneous features.""" | |
| 636 | |
| 637 def __init__(self, *args, **kw): | |
| 638 """ | |
| 639 Initialize a :class:`TestCase` object. | |
| 640 | |
| 641 Any positional and/or keyword arguments are passed on to the | |
| 642 initializer of the superclass. | |
| 643 """ | |
| 644 super(TestCase, self).__init__(*args, **kw) | |
| 645 | |
| 646 def setUp(self, log_level=logging.DEBUG): | |
| 647 """setUp(log_level=logging.DEBUG) | |
| 648 Automatically configure logging to the terminal. | |
| 649 | |
| 650 :param log_level: Refer to :func:`configure_logging()`. | |
| 651 | |
| 652 The :func:`setUp()` method is automatically called by | |
| 653 :class:`unittest.TestCase` before each test method starts. | |
| 654 It does two things: | |
| 655 | |
| 656 - Logging to the terminal is configured using | |
| 657 :func:`configure_logging()`. | |
| 658 | |
| 659 - Before the test method starts a newline is emitted, to separate the | |
| 660 name of the test method (which will be printed to the terminal by | |
| 661 :mod:`unittest` or :pypi:`pytest`) from the first line of logging | |
| 662 output that the test method is likely going to generate. | |
| 663 """ | |
| 664 # Configure logging to the terminal. | |
| 665 configure_logging(log_level) | |
| 666 # Separate the name of the test method (printed by the superclass | |
| 667 # and/or py.test without a newline at the end) from the first line of | |
| 668 # logging output that the test method is likely going to generate. | |
| 669 sys.stderr.write("\n") |
