Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/future/tests/base.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 from __future__ import print_function, absolute_import | |
2 import os | |
3 import tempfile | |
4 import unittest | |
5 import sys | |
6 import re | |
7 import warnings | |
8 import io | |
9 from textwrap import dedent | |
10 | |
11 from future.utils import bind_method, PY26, PY3, PY2, PY27 | |
12 from future.moves.subprocess import check_output, STDOUT, CalledProcessError | |
13 | |
14 if PY26: | |
15 import unittest2 as unittest | |
16 | |
17 | |
18 def reformat_code(code): | |
19 """ | |
20 Removes any leading \n and dedents. | |
21 """ | |
22 if code.startswith('\n'): | |
23 code = code[1:] | |
24 return dedent(code) | |
25 | |
26 | |
27 def order_future_lines(code): | |
28 """ | |
29 Returns the code block with any ``__future__`` import lines sorted, and | |
30 then any ``future`` import lines sorted, then any ``builtins`` import lines | |
31 sorted. | |
32 | |
33 This only sorts the lines within the expected blocks. | |
34 | |
35 See test_order_future_lines() for an example. | |
36 """ | |
37 | |
38 # We need .splitlines(keepends=True), which doesn't exist on Py2, | |
39 # so we use this instead: | |
40 lines = code.split('\n') | |
41 | |
42 uufuture_line_numbers = [i for i, line in enumerate(lines) | |
43 if line.startswith('from __future__ import ')] | |
44 | |
45 future_line_numbers = [i for i, line in enumerate(lines) | |
46 if line.startswith('from future') | |
47 or line.startswith('from past')] | |
48 | |
49 builtins_line_numbers = [i for i, line in enumerate(lines) | |
50 if line.startswith('from builtins')] | |
51 | |
52 assert code.lstrip() == code, ('internal usage error: ' | |
53 'dedent the code before calling order_future_lines()') | |
54 | |
55 def mymax(numbers): | |
56 return max(numbers) if len(numbers) > 0 else 0 | |
57 | |
58 def mymin(numbers): | |
59 return min(numbers) if len(numbers) > 0 else float('inf') | |
60 | |
61 assert mymax(uufuture_line_numbers) <= mymin(future_line_numbers), \ | |
62 'the __future__ and future imports are out of order' | |
63 | |
64 # assert mymax(future_line_numbers) <= mymin(builtins_line_numbers), \ | |
65 # 'the future and builtins imports are out of order' | |
66 | |
67 uul = sorted([lines[i] for i in uufuture_line_numbers]) | |
68 sorted_uufuture_lines = dict(zip(uufuture_line_numbers, uul)) | |
69 | |
70 fl = sorted([lines[i] for i in future_line_numbers]) | |
71 sorted_future_lines = dict(zip(future_line_numbers, fl)) | |
72 | |
73 bl = sorted([lines[i] for i in builtins_line_numbers]) | |
74 sorted_builtins_lines = dict(zip(builtins_line_numbers, bl)) | |
75 | |
76 # Replace the old unsorted "from __future__ import ..." lines with the | |
77 # new sorted ones: | |
78 new_lines = [] | |
79 for i in range(len(lines)): | |
80 if i in uufuture_line_numbers: | |
81 new_lines.append(sorted_uufuture_lines[i]) | |
82 elif i in future_line_numbers: | |
83 new_lines.append(sorted_future_lines[i]) | |
84 elif i in builtins_line_numbers: | |
85 new_lines.append(sorted_builtins_lines[i]) | |
86 else: | |
87 new_lines.append(lines[i]) | |
88 return '\n'.join(new_lines) | |
89 | |
90 | |
91 class VerboseCalledProcessError(CalledProcessError): | |
92 """ | |
93 Like CalledProcessError, but it displays more information (message and | |
94 script output) for diagnosing test failures etc. | |
95 """ | |
96 def __init__(self, msg, returncode, cmd, output=None): | |
97 self.msg = msg | |
98 self.returncode = returncode | |
99 self.cmd = cmd | |
100 self.output = output | |
101 | |
102 def __str__(self): | |
103 return ("Command '%s' failed with exit status %d\nMessage: %s\nOutput: %s" | |
104 % (self.cmd, self.returncode, self.msg, self.output)) | |
105 | |
106 class FuturizeError(VerboseCalledProcessError): | |
107 pass | |
108 | |
109 class PasteurizeError(VerboseCalledProcessError): | |
110 pass | |
111 | |
112 | |
113 class CodeHandler(unittest.TestCase): | |
114 """ | |
115 Handy mixin for test classes for writing / reading / futurizing / | |
116 running .py files in the test suite. | |
117 """ | |
118 def setUp(self): | |
119 """ | |
120 The outputs from the various futurize stages should have the | |
121 following headers: | |
122 """ | |
123 # After stage1: | |
124 # TODO: use this form after implementing a fixer to consolidate | |
125 # __future__ imports into a single line: | |
126 # self.headers1 = """ | |
127 # from __future__ import absolute_import, division, print_function | |
128 # """ | |
129 self.headers1 = reformat_code(""" | |
130 from __future__ import absolute_import | |
131 from __future__ import division | |
132 from __future__ import print_function | |
133 """) | |
134 | |
135 # After stage2 --all-imports: | |
136 # TODO: use this form after implementing a fixer to consolidate | |
137 # __future__ imports into a single line: | |
138 # self.headers2 = """ | |
139 # from __future__ import (absolute_import, division, | |
140 # print_function, unicode_literals) | |
141 # from future import standard_library | |
142 # from future.builtins import * | |
143 # """ | |
144 self.headers2 = reformat_code(""" | |
145 from __future__ import absolute_import | |
146 from __future__ import division | |
147 from __future__ import print_function | |
148 from __future__ import unicode_literals | |
149 from future import standard_library | |
150 standard_library.install_aliases() | |
151 from builtins import * | |
152 """) | |
153 self.interpreters = [sys.executable] | |
154 self.tempdir = tempfile.mkdtemp() + os.path.sep | |
155 pypath = os.getenv('PYTHONPATH') | |
156 if pypath: | |
157 self.env = {'PYTHONPATH': os.getcwd() + os.pathsep + pypath} | |
158 else: | |
159 self.env = {'PYTHONPATH': os.getcwd()} | |
160 | |
161 def convert(self, code, stages=(1, 2), all_imports=False, from3=False, | |
162 reformat=True, run=True, conservative=False): | |
163 """ | |
164 Converts the code block using ``futurize`` and returns the | |
165 resulting code. | |
166 | |
167 Passing stages=[1] or stages=[2] passes the flag ``--stage1`` or | |
168 ``stage2`` to ``futurize``. Passing both stages runs ``futurize`` | |
169 with both stages by default. | |
170 | |
171 If from3 is False, runs ``futurize``, converting from Python 2 to | |
172 both 2 and 3. If from3 is True, runs ``pasteurize`` to convert | |
173 from Python 3 to both 2 and 3. | |
174 | |
175 Optionally reformats the code block first using the reformat() function. | |
176 | |
177 If run is True, runs the resulting code under all Python | |
178 interpreters in self.interpreters. | |
179 """ | |
180 if reformat: | |
181 code = reformat_code(code) | |
182 self._write_test_script(code) | |
183 self._futurize_test_script(stages=stages, all_imports=all_imports, | |
184 from3=from3, conservative=conservative) | |
185 output = self._read_test_script() | |
186 if run: | |
187 for interpreter in self.interpreters: | |
188 _ = self._run_test_script(interpreter=interpreter) | |
189 return output | |
190 | |
191 def compare(self, output, expected, ignore_imports=True): | |
192 """ | |
193 Compares whether the code blocks are equal. If not, raises an | |
194 exception so the test fails. Ignores any trailing whitespace like | |
195 blank lines. | |
196 | |
197 If ignore_imports is True, passes the code blocks into the | |
198 strip_future_imports method. | |
199 | |
200 If one code block is a unicode string and the other a | |
201 byte-string, it assumes the byte-string is encoded as utf-8. | |
202 """ | |
203 if ignore_imports: | |
204 output = self.strip_future_imports(output) | |
205 expected = self.strip_future_imports(expected) | |
206 if isinstance(output, bytes) and not isinstance(expected, bytes): | |
207 output = output.decode('utf-8') | |
208 if isinstance(expected, bytes) and not isinstance(output, bytes): | |
209 expected = expected.decode('utf-8') | |
210 self.assertEqual(order_future_lines(output.rstrip()), | |
211 expected.rstrip()) | |
212 | |
213 def strip_future_imports(self, code): | |
214 """ | |
215 Strips any of these import lines: | |
216 | |
217 from __future__ import <anything> | |
218 from future <anything> | |
219 from future.<anything> | |
220 from builtins <anything> | |
221 | |
222 or any line containing: | |
223 install_hooks() | |
224 or: | |
225 install_aliases() | |
226 | |
227 Limitation: doesn't handle imports split across multiple lines like | |
228 this: | |
229 | |
230 from __future__ import (absolute_import, division, print_function, | |
231 unicode_literals) | |
232 """ | |
233 output = [] | |
234 # We need .splitlines(keepends=True), which doesn't exist on Py2, | |
235 # so we use this instead: | |
236 for line in code.split('\n'): | |
237 if not (line.startswith('from __future__ import ') | |
238 or line.startswith('from future ') | |
239 or line.startswith('from builtins ') | |
240 or 'install_hooks()' in line | |
241 or 'install_aliases()' in line | |
242 # but don't match "from future_builtins" :) | |
243 or line.startswith('from future.')): | |
244 output.append(line) | |
245 return '\n'.join(output) | |
246 | |
247 def convert_check(self, before, expected, stages=(1, 2), all_imports=False, | |
248 ignore_imports=True, from3=False, run=True, | |
249 conservative=False): | |
250 """ | |
251 Convenience method that calls convert() and compare(). | |
252 | |
253 Reformats the code blocks automatically using the reformat_code() | |
254 function. | |
255 | |
256 If all_imports is passed, we add the appropriate import headers | |
257 for the stage(s) selected to the ``expected`` code-block, so they | |
258 needn't appear repeatedly in the test code. | |
259 | |
260 If ignore_imports is True, ignores the presence of any lines | |
261 beginning: | |
262 | |
263 from __future__ import ... | |
264 from future import ... | |
265 | |
266 for the purpose of the comparison. | |
267 """ | |
268 output = self.convert(before, stages=stages, all_imports=all_imports, | |
269 from3=from3, run=run, conservative=conservative) | |
270 if all_imports: | |
271 headers = self.headers2 if 2 in stages else self.headers1 | |
272 else: | |
273 headers = '' | |
274 | |
275 reformatted = reformat_code(expected) | |
276 if headers in reformatted: | |
277 headers = '' | |
278 | |
279 self.compare(output, headers + reformatted, | |
280 ignore_imports=ignore_imports) | |
281 | |
282 def unchanged(self, code, **kwargs): | |
283 """ | |
284 Convenience method to ensure the code is unchanged by the | |
285 futurize process. | |
286 """ | |
287 self.convert_check(code, code, **kwargs) | |
288 | |
289 def _write_test_script(self, code, filename='mytestscript.py'): | |
290 """ | |
291 Dedents the given code (a multiline string) and writes it out to | |
292 a file in a temporary folder like /tmp/tmpUDCn7x/mytestscript.py. | |
293 """ | |
294 if isinstance(code, bytes): | |
295 code = code.decode('utf-8') | |
296 # Be explicit about encoding the temp file as UTF-8 (issue #63): | |
297 with io.open(self.tempdir + filename, 'wt', encoding='utf-8') as f: | |
298 f.write(dedent(code)) | |
299 | |
300 def _read_test_script(self, filename='mytestscript.py'): | |
301 with io.open(self.tempdir + filename, 'rt', encoding='utf-8') as f: | |
302 newsource = f.read() | |
303 return newsource | |
304 | |
305 def _futurize_test_script(self, filename='mytestscript.py', stages=(1, 2), | |
306 all_imports=False, from3=False, | |
307 conservative=False): | |
308 params = [] | |
309 stages = list(stages) | |
310 if all_imports: | |
311 params.append('--all-imports') | |
312 if from3: | |
313 script = 'pasteurize.py' | |
314 else: | |
315 script = 'futurize.py' | |
316 if stages == [1]: | |
317 params.append('--stage1') | |
318 elif stages == [2]: | |
319 params.append('--stage2') | |
320 else: | |
321 assert stages == [1, 2] | |
322 if conservative: | |
323 params.append('--conservative') | |
324 # No extra params needed | |
325 | |
326 # Absolute file path: | |
327 fn = self.tempdir + filename | |
328 call_args = [sys.executable, script] + params + ['-w', fn] | |
329 try: | |
330 output = check_output(call_args, stderr=STDOUT, env=self.env) | |
331 except CalledProcessError as e: | |
332 with open(fn) as f: | |
333 msg = ( | |
334 'Error running the command %s\n' | |
335 '%s\n' | |
336 'Contents of file %s:\n' | |
337 '\n' | |
338 '%s') % ( | |
339 ' '.join(call_args), | |
340 'env=%s' % self.env, | |
341 fn, | |
342 '----\n%s\n----' % f.read(), | |
343 ) | |
344 ErrorClass = (FuturizeError if 'futurize' in script else PasteurizeError) | |
345 | |
346 if not hasattr(e, 'output'): | |
347 # The attribute CalledProcessError.output doesn't exist on Py2.6 | |
348 e.output = None | |
349 raise ErrorClass(msg, e.returncode, e.cmd, output=e.output) | |
350 return output | |
351 | |
352 def _run_test_script(self, filename='mytestscript.py', | |
353 interpreter=sys.executable): | |
354 # Absolute file path: | |
355 fn = self.tempdir + filename | |
356 try: | |
357 output = check_output([interpreter, fn], | |
358 env=self.env, stderr=STDOUT) | |
359 except CalledProcessError as e: | |
360 with open(fn) as f: | |
361 msg = ( | |
362 'Error running the command %s\n' | |
363 '%s\n' | |
364 'Contents of file %s:\n' | |
365 '\n' | |
366 '%s') % ( | |
367 ' '.join([interpreter, fn]), | |
368 'env=%s' % self.env, | |
369 fn, | |
370 '----\n%s\n----' % f.read(), | |
371 ) | |
372 if not hasattr(e, 'output'): | |
373 # The attribute CalledProcessError.output doesn't exist on Py2.6 | |
374 e.output = None | |
375 raise VerboseCalledProcessError(msg, e.returncode, e.cmd, output=e.output) | |
376 return output | |
377 | |
378 | |
379 # Decorator to skip some tests on Python 2.6 ... | |
380 skip26 = unittest.skipIf(PY26, "this test is known to fail on Py2.6") | |
381 | |
382 | |
383 def expectedFailurePY3(func): | |
384 if not PY3: | |
385 return func | |
386 return unittest.expectedFailure(func) | |
387 | |
388 def expectedFailurePY26(func): | |
389 if not PY26: | |
390 return func | |
391 return unittest.expectedFailure(func) | |
392 | |
393 | |
394 def expectedFailurePY27(func): | |
395 if not PY27: | |
396 return func | |
397 return unittest.expectedFailure(func) | |
398 | |
399 | |
400 def expectedFailurePY2(func): | |
401 if not PY2: | |
402 return func | |
403 return unittest.expectedFailure(func) | |
404 | |
405 | |
406 # Renamed in Py3.3: | |
407 if not hasattr(unittest.TestCase, 'assertRaisesRegex'): | |
408 unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp | |
409 | |
410 # From Py3.3: | |
411 def assertRegex(self, text, expected_regex, msg=None): | |
412 """Fail the test unless the text matches the regular expression.""" | |
413 if isinstance(expected_regex, (str, unicode)): | |
414 assert expected_regex, "expected_regex must not be empty." | |
415 expected_regex = re.compile(expected_regex) | |
416 if not expected_regex.search(text): | |
417 msg = msg or "Regex didn't match" | |
418 msg = '%s: %r not found in %r' % (msg, expected_regex.pattern, text) | |
419 raise self.failureException(msg) | |
420 | |
421 if not hasattr(unittest.TestCase, 'assertRegex'): | |
422 bind_method(unittest.TestCase, 'assertRegex', assertRegex) | |
423 | |
424 class _AssertRaisesBaseContext(object): | |
425 | |
426 def __init__(self, expected, test_case, callable_obj=None, | |
427 expected_regex=None): | |
428 self.expected = expected | |
429 self.test_case = test_case | |
430 if callable_obj is not None: | |
431 try: | |
432 self.obj_name = callable_obj.__name__ | |
433 except AttributeError: | |
434 self.obj_name = str(callable_obj) | |
435 else: | |
436 self.obj_name = None | |
437 if isinstance(expected_regex, (bytes, str)): | |
438 expected_regex = re.compile(expected_regex) | |
439 self.expected_regex = expected_regex | |
440 self.msg = None | |
441 | |
442 def _raiseFailure(self, standardMsg): | |
443 msg = self.test_case._formatMessage(self.msg, standardMsg) | |
444 raise self.test_case.failureException(msg) | |
445 | |
446 def handle(self, name, callable_obj, args, kwargs): | |
447 """ | |
448 If callable_obj is None, assertRaises/Warns is being used as a | |
449 context manager, so check for a 'msg' kwarg and return self. | |
450 If callable_obj is not None, call it passing args and kwargs. | |
451 """ | |
452 if callable_obj is None: | |
453 self.msg = kwargs.pop('msg', None) | |
454 return self | |
455 with self: | |
456 callable_obj(*args, **kwargs) | |
457 | |
458 class _AssertWarnsContext(_AssertRaisesBaseContext): | |
459 """A context manager used to implement TestCase.assertWarns* methods.""" | |
460 | |
461 def __enter__(self): | |
462 # The __warningregistry__'s need to be in a pristine state for tests | |
463 # to work properly. | |
464 for v in sys.modules.values(): | |
465 if getattr(v, '__warningregistry__', None): | |
466 v.__warningregistry__ = {} | |
467 self.warnings_manager = warnings.catch_warnings(record=True) | |
468 self.warnings = self.warnings_manager.__enter__() | |
469 warnings.simplefilter("always", self.expected) | |
470 return self | |
471 | |
472 def __exit__(self, exc_type, exc_value, tb): | |
473 self.warnings_manager.__exit__(exc_type, exc_value, tb) | |
474 if exc_type is not None: | |
475 # let unexpected exceptions pass through | |
476 return | |
477 try: | |
478 exc_name = self.expected.__name__ | |
479 except AttributeError: | |
480 exc_name = str(self.expected) | |
481 first_matching = None | |
482 for m in self.warnings: | |
483 w = m.message | |
484 if not isinstance(w, self.expected): | |
485 continue | |
486 if first_matching is None: | |
487 first_matching = w | |
488 if (self.expected_regex is not None and | |
489 not self.expected_regex.search(str(w))): | |
490 continue | |
491 # store warning for later retrieval | |
492 self.warning = w | |
493 self.filename = m.filename | |
494 self.lineno = m.lineno | |
495 return | |
496 # Now we simply try to choose a helpful failure message | |
497 if first_matching is not None: | |
498 self._raiseFailure('"{}" does not match "{}"'.format( | |
499 self.expected_regex.pattern, str(first_matching))) | |
500 if self.obj_name: | |
501 self._raiseFailure("{} not triggered by {}".format(exc_name, | |
502 self.obj_name)) | |
503 else: | |
504 self._raiseFailure("{} not triggered".format(exc_name)) | |
505 | |
506 | |
507 def assertWarns(self, expected_warning, callable_obj=None, *args, **kwargs): | |
508 """Fail unless a warning of class warnClass is triggered | |
509 by callable_obj when invoked with arguments args and keyword | |
510 arguments kwargs. If a different type of warning is | |
511 triggered, it will not be handled: depending on the other | |
512 warning filtering rules in effect, it might be silenced, printed | |
513 out, or raised as an exception. | |
514 | |
515 If called with callable_obj omitted or None, will return a | |
516 context object used like this:: | |
517 | |
518 with self.assertWarns(SomeWarning): | |
519 do_something() | |
520 | |
521 An optional keyword argument 'msg' can be provided when assertWarns | |
522 is used as a context object. | |
523 | |
524 The context manager keeps a reference to the first matching | |
525 warning as the 'warning' attribute; similarly, the 'filename' | |
526 and 'lineno' attributes give you information about the line | |
527 of Python code from which the warning was triggered. | |
528 This allows you to inspect the warning after the assertion:: | |
529 | |
530 with self.assertWarns(SomeWarning) as cm: | |
531 do_something() | |
532 the_warning = cm.warning | |
533 self.assertEqual(the_warning.some_attribute, 147) | |
534 """ | |
535 context = _AssertWarnsContext(expected_warning, self, callable_obj) | |
536 return context.handle('assertWarns', callable_obj, args, kwargs) | |
537 | |
538 if not hasattr(unittest.TestCase, 'assertWarns'): | |
539 bind_method(unittest.TestCase, 'assertWarns', assertWarns) |