Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/click/testing.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
comparison
equal
deleted
inserted
replaced
4:79f47841a781 | 5:9b1c78e6ba9c |
---|---|
1 import contextlib | |
2 import os | |
3 import shlex | |
4 import shutil | |
5 import sys | |
6 import tempfile | |
7 | |
8 from . import formatting | |
9 from . import termui | |
10 from . import utils | |
11 from ._compat import iteritems | |
12 from ._compat import PY2 | |
13 from ._compat import string_types | |
14 | |
15 | |
16 if PY2: | |
17 from cStringIO import StringIO | |
18 else: | |
19 import io | |
20 from ._compat import _find_binary_reader | |
21 | |
22 | |
23 class EchoingStdin(object): | |
24 def __init__(self, input, output): | |
25 self._input = input | |
26 self._output = output | |
27 | |
28 def __getattr__(self, x): | |
29 return getattr(self._input, x) | |
30 | |
31 def _echo(self, rv): | |
32 self._output.write(rv) | |
33 return rv | |
34 | |
35 def read(self, n=-1): | |
36 return self._echo(self._input.read(n)) | |
37 | |
38 def readline(self, n=-1): | |
39 return self._echo(self._input.readline(n)) | |
40 | |
41 def readlines(self): | |
42 return [self._echo(x) for x in self._input.readlines()] | |
43 | |
44 def __iter__(self): | |
45 return iter(self._echo(x) for x in self._input) | |
46 | |
47 def __repr__(self): | |
48 return repr(self._input) | |
49 | |
50 | |
51 def make_input_stream(input, charset): | |
52 # Is already an input stream. | |
53 if hasattr(input, "read"): | |
54 if PY2: | |
55 return input | |
56 rv = _find_binary_reader(input) | |
57 if rv is not None: | |
58 return rv | |
59 raise TypeError("Could not find binary reader for input stream.") | |
60 | |
61 if input is None: | |
62 input = b"" | |
63 elif not isinstance(input, bytes): | |
64 input = input.encode(charset) | |
65 if PY2: | |
66 return StringIO(input) | |
67 return io.BytesIO(input) | |
68 | |
69 | |
70 class Result(object): | |
71 """Holds the captured result of an invoked CLI script.""" | |
72 | |
73 def __init__( | |
74 self, runner, stdout_bytes, stderr_bytes, exit_code, exception, exc_info=None | |
75 ): | |
76 #: The runner that created the result | |
77 self.runner = runner | |
78 #: The standard output as bytes. | |
79 self.stdout_bytes = stdout_bytes | |
80 #: The standard error as bytes, or None if not available | |
81 self.stderr_bytes = stderr_bytes | |
82 #: The exit code as integer. | |
83 self.exit_code = exit_code | |
84 #: The exception that happened if one did. | |
85 self.exception = exception | |
86 #: The traceback | |
87 self.exc_info = exc_info | |
88 | |
89 @property | |
90 def output(self): | |
91 """The (standard) output as unicode string.""" | |
92 return self.stdout | |
93 | |
94 @property | |
95 def stdout(self): | |
96 """The standard output as unicode string.""" | |
97 return self.stdout_bytes.decode(self.runner.charset, "replace").replace( | |
98 "\r\n", "\n" | |
99 ) | |
100 | |
101 @property | |
102 def stderr(self): | |
103 """The standard error as unicode string.""" | |
104 if self.stderr_bytes is None: | |
105 raise ValueError("stderr not separately captured") | |
106 return self.stderr_bytes.decode(self.runner.charset, "replace").replace( | |
107 "\r\n", "\n" | |
108 ) | |
109 | |
110 def __repr__(self): | |
111 return "<{} {}>".format( | |
112 type(self).__name__, repr(self.exception) if self.exception else "okay" | |
113 ) | |
114 | |
115 | |
116 class CliRunner(object): | |
117 """The CLI runner provides functionality to invoke a Click command line | |
118 script for unittesting purposes in a isolated environment. This only | |
119 works in single-threaded systems without any concurrency as it changes the | |
120 global interpreter state. | |
121 | |
122 :param charset: the character set for the input and output data. This is | |
123 UTF-8 by default and should not be changed currently as | |
124 the reporting to Click only works in Python 2 properly. | |
125 :param env: a dictionary with environment variables for overriding. | |
126 :param echo_stdin: if this is set to `True`, then reading from stdin writes | |
127 to stdout. This is useful for showing examples in | |
128 some circumstances. Note that regular prompts | |
129 will automatically echo the input. | |
130 :param mix_stderr: if this is set to `False`, then stdout and stderr are | |
131 preserved as independent streams. This is useful for | |
132 Unix-philosophy apps that have predictable stdout and | |
133 noisy stderr, such that each may be measured | |
134 independently | |
135 """ | |
136 | |
137 def __init__(self, charset=None, env=None, echo_stdin=False, mix_stderr=True): | |
138 if charset is None: | |
139 charset = "utf-8" | |
140 self.charset = charset | |
141 self.env = env or {} | |
142 self.echo_stdin = echo_stdin | |
143 self.mix_stderr = mix_stderr | |
144 | |
145 def get_default_prog_name(self, cli): | |
146 """Given a command object it will return the default program name | |
147 for it. The default is the `name` attribute or ``"root"`` if not | |
148 set. | |
149 """ | |
150 return cli.name or "root" | |
151 | |
152 def make_env(self, overrides=None): | |
153 """Returns the environment overrides for invoking a script.""" | |
154 rv = dict(self.env) | |
155 if overrides: | |
156 rv.update(overrides) | |
157 return rv | |
158 | |
159 @contextlib.contextmanager | |
160 def isolation(self, input=None, env=None, color=False): | |
161 """A context manager that sets up the isolation for invoking of a | |
162 command line tool. This sets up stdin with the given input data | |
163 and `os.environ` with the overrides from the given dictionary. | |
164 This also rebinds some internals in Click to be mocked (like the | |
165 prompt functionality). | |
166 | |
167 This is automatically done in the :meth:`invoke` method. | |
168 | |
169 .. versionadded:: 4.0 | |
170 The ``color`` parameter was added. | |
171 | |
172 :param input: the input stream to put into sys.stdin. | |
173 :param env: the environment overrides as dictionary. | |
174 :param color: whether the output should contain color codes. The | |
175 application can still override this explicitly. | |
176 """ | |
177 input = make_input_stream(input, self.charset) | |
178 | |
179 old_stdin = sys.stdin | |
180 old_stdout = sys.stdout | |
181 old_stderr = sys.stderr | |
182 old_forced_width = formatting.FORCED_WIDTH | |
183 formatting.FORCED_WIDTH = 80 | |
184 | |
185 env = self.make_env(env) | |
186 | |
187 if PY2: | |
188 bytes_output = StringIO() | |
189 if self.echo_stdin: | |
190 input = EchoingStdin(input, bytes_output) | |
191 sys.stdout = bytes_output | |
192 if not self.mix_stderr: | |
193 bytes_error = StringIO() | |
194 sys.stderr = bytes_error | |
195 else: | |
196 bytes_output = io.BytesIO() | |
197 if self.echo_stdin: | |
198 input = EchoingStdin(input, bytes_output) | |
199 input = io.TextIOWrapper(input, encoding=self.charset) | |
200 sys.stdout = io.TextIOWrapper(bytes_output, encoding=self.charset) | |
201 if not self.mix_stderr: | |
202 bytes_error = io.BytesIO() | |
203 sys.stderr = io.TextIOWrapper(bytes_error, encoding=self.charset) | |
204 | |
205 if self.mix_stderr: | |
206 sys.stderr = sys.stdout | |
207 | |
208 sys.stdin = input | |
209 | |
210 def visible_input(prompt=None): | |
211 sys.stdout.write(prompt or "") | |
212 val = input.readline().rstrip("\r\n") | |
213 sys.stdout.write("{}\n".format(val)) | |
214 sys.stdout.flush() | |
215 return val | |
216 | |
217 def hidden_input(prompt=None): | |
218 sys.stdout.write("{}\n".format(prompt or "")) | |
219 sys.stdout.flush() | |
220 return input.readline().rstrip("\r\n") | |
221 | |
222 def _getchar(echo): | |
223 char = sys.stdin.read(1) | |
224 if echo: | |
225 sys.stdout.write(char) | |
226 sys.stdout.flush() | |
227 return char | |
228 | |
229 default_color = color | |
230 | |
231 def should_strip_ansi(stream=None, color=None): | |
232 if color is None: | |
233 return not default_color | |
234 return not color | |
235 | |
236 old_visible_prompt_func = termui.visible_prompt_func | |
237 old_hidden_prompt_func = termui.hidden_prompt_func | |
238 old__getchar_func = termui._getchar | |
239 old_should_strip_ansi = utils.should_strip_ansi | |
240 termui.visible_prompt_func = visible_input | |
241 termui.hidden_prompt_func = hidden_input | |
242 termui._getchar = _getchar | |
243 utils.should_strip_ansi = should_strip_ansi | |
244 | |
245 old_env = {} | |
246 try: | |
247 for key, value in iteritems(env): | |
248 old_env[key] = os.environ.get(key) | |
249 if value is None: | |
250 try: | |
251 del os.environ[key] | |
252 except Exception: | |
253 pass | |
254 else: | |
255 os.environ[key] = value | |
256 yield (bytes_output, not self.mix_stderr and bytes_error) | |
257 finally: | |
258 for key, value in iteritems(old_env): | |
259 if value is None: | |
260 try: | |
261 del os.environ[key] | |
262 except Exception: | |
263 pass | |
264 else: | |
265 os.environ[key] = value | |
266 sys.stdout = old_stdout | |
267 sys.stderr = old_stderr | |
268 sys.stdin = old_stdin | |
269 termui.visible_prompt_func = old_visible_prompt_func | |
270 termui.hidden_prompt_func = old_hidden_prompt_func | |
271 termui._getchar = old__getchar_func | |
272 utils.should_strip_ansi = old_should_strip_ansi | |
273 formatting.FORCED_WIDTH = old_forced_width | |
274 | |
275 def invoke( | |
276 self, | |
277 cli, | |
278 args=None, | |
279 input=None, | |
280 env=None, | |
281 catch_exceptions=True, | |
282 color=False, | |
283 **extra | |
284 ): | |
285 """Invokes a command in an isolated environment. The arguments are | |
286 forwarded directly to the command line script, the `extra` keyword | |
287 arguments are passed to the :meth:`~clickpkg.Command.main` function of | |
288 the command. | |
289 | |
290 This returns a :class:`Result` object. | |
291 | |
292 .. versionadded:: 3.0 | |
293 The ``catch_exceptions`` parameter was added. | |
294 | |
295 .. versionchanged:: 3.0 | |
296 The result object now has an `exc_info` attribute with the | |
297 traceback if available. | |
298 | |
299 .. versionadded:: 4.0 | |
300 The ``color`` parameter was added. | |
301 | |
302 :param cli: the command to invoke | |
303 :param args: the arguments to invoke. It may be given as an iterable | |
304 or a string. When given as string it will be interpreted | |
305 as a Unix shell command. More details at | |
306 :func:`shlex.split`. | |
307 :param input: the input data for `sys.stdin`. | |
308 :param env: the environment overrides. | |
309 :param catch_exceptions: Whether to catch any other exceptions than | |
310 ``SystemExit``. | |
311 :param extra: the keyword arguments to pass to :meth:`main`. | |
312 :param color: whether the output should contain color codes. The | |
313 application can still override this explicitly. | |
314 """ | |
315 exc_info = None | |
316 with self.isolation(input=input, env=env, color=color) as outstreams: | |
317 exception = None | |
318 exit_code = 0 | |
319 | |
320 if isinstance(args, string_types): | |
321 args = shlex.split(args) | |
322 | |
323 try: | |
324 prog_name = extra.pop("prog_name") | |
325 except KeyError: | |
326 prog_name = self.get_default_prog_name(cli) | |
327 | |
328 try: | |
329 cli.main(args=args or (), prog_name=prog_name, **extra) | |
330 except SystemExit as e: | |
331 exc_info = sys.exc_info() | |
332 exit_code = e.code | |
333 if exit_code is None: | |
334 exit_code = 0 | |
335 | |
336 if exit_code != 0: | |
337 exception = e | |
338 | |
339 if not isinstance(exit_code, int): | |
340 sys.stdout.write(str(exit_code)) | |
341 sys.stdout.write("\n") | |
342 exit_code = 1 | |
343 | |
344 except Exception as e: | |
345 if not catch_exceptions: | |
346 raise | |
347 exception = e | |
348 exit_code = 1 | |
349 exc_info = sys.exc_info() | |
350 finally: | |
351 sys.stdout.flush() | |
352 stdout = outstreams[0].getvalue() | |
353 if self.mix_stderr: | |
354 stderr = None | |
355 else: | |
356 stderr = outstreams[1].getvalue() | |
357 | |
358 return Result( | |
359 runner=self, | |
360 stdout_bytes=stdout, | |
361 stderr_bytes=stderr, | |
362 exit_code=exit_code, | |
363 exception=exception, | |
364 exc_info=exc_info, | |
365 ) | |
366 | |
367 @contextlib.contextmanager | |
368 def isolated_filesystem(self): | |
369 """A context manager that creates a temporary folder and changes | |
370 the current working directory to it for isolated filesystem tests. | |
371 """ | |
372 cwd = os.getcwd() | |
373 t = tempfile.mkdtemp() | |
374 os.chdir(t) | |
375 try: | |
376 yield t | |
377 finally: | |
378 os.chdir(cwd) | |
379 try: | |
380 shutil.rmtree(t) | |
381 except (OSError, IOError): # noqa: B014 | |
382 pass |