Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/utils/subprocess.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:47:31 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9e54283cc701 |
---|---|
1 # The following comment should be removed at some point in the future. | |
2 # mypy: strict-optional=False | |
3 | |
4 from __future__ import absolute_import | |
5 | |
6 import logging | |
7 import os | |
8 import subprocess | |
9 | |
10 from pip._vendor.six.moves import shlex_quote | |
11 | |
12 from pip._internal.exceptions import InstallationError | |
13 from pip._internal.utils.compat import console_to_str, str_to_display | |
14 from pip._internal.utils.logging import subprocess_logger | |
15 from pip._internal.utils.misc import HiddenText, path_to_display | |
16 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
17 from pip._internal.utils.ui import open_spinner | |
18 | |
19 if MYPY_CHECK_RUNNING: | |
20 from typing import ( | |
21 Any, Callable, Iterable, List, Mapping, Optional, Text, Union, | |
22 ) | |
23 from pip._internal.utils.ui import SpinnerInterface | |
24 | |
25 CommandArgs = List[Union[str, HiddenText]] | |
26 | |
27 | |
28 LOG_DIVIDER = '----------------------------------------' | |
29 | |
30 | |
31 def make_command(*args): | |
32 # type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs | |
33 """ | |
34 Create a CommandArgs object. | |
35 """ | |
36 command_args = [] # type: CommandArgs | |
37 for arg in args: | |
38 # Check for list instead of CommandArgs since CommandArgs is | |
39 # only known during type-checking. | |
40 if isinstance(arg, list): | |
41 command_args.extend(arg) | |
42 else: | |
43 # Otherwise, arg is str or HiddenText. | |
44 command_args.append(arg) | |
45 | |
46 return command_args | |
47 | |
48 | |
49 def format_command_args(args): | |
50 # type: (Union[List[str], CommandArgs]) -> str | |
51 """ | |
52 Format command arguments for display. | |
53 """ | |
54 # For HiddenText arguments, display the redacted form by calling str(). | |
55 # Also, we don't apply str() to arguments that aren't HiddenText since | |
56 # this can trigger a UnicodeDecodeError in Python 2 if the argument | |
57 # has type unicode and includes a non-ascii character. (The type | |
58 # checker doesn't ensure the annotations are correct in all cases.) | |
59 return ' '.join( | |
60 shlex_quote(str(arg)) if isinstance(arg, HiddenText) | |
61 else shlex_quote(arg) for arg in args | |
62 ) | |
63 | |
64 | |
65 def reveal_command_args(args): | |
66 # type: (Union[List[str], CommandArgs]) -> List[str] | |
67 """ | |
68 Return the arguments in their raw, unredacted form. | |
69 """ | |
70 return [ | |
71 arg.secret if isinstance(arg, HiddenText) else arg for arg in args | |
72 ] | |
73 | |
74 | |
75 def make_subprocess_output_error( | |
76 cmd_args, # type: Union[List[str], CommandArgs] | |
77 cwd, # type: Optional[str] | |
78 lines, # type: List[Text] | |
79 exit_status, # type: int | |
80 ): | |
81 # type: (...) -> Text | |
82 """ | |
83 Create and return the error message to use to log a subprocess error | |
84 with command output. | |
85 | |
86 :param lines: A list of lines, each ending with a newline. | |
87 """ | |
88 command = format_command_args(cmd_args) | |
89 # Convert `command` and `cwd` to text (unicode in Python 2) so we can use | |
90 # them as arguments in the unicode format string below. This avoids | |
91 # "UnicodeDecodeError: 'ascii' codec can't decode byte ..." in Python 2 | |
92 # if either contains a non-ascii character. | |
93 command_display = str_to_display(command, desc='command bytes') | |
94 cwd_display = path_to_display(cwd) | |
95 | |
96 # We know the joined output value ends in a newline. | |
97 output = ''.join(lines) | |
98 msg = ( | |
99 # Use a unicode string to avoid "UnicodeEncodeError: 'ascii' | |
100 # codec can't encode character ..." in Python 2 when a format | |
101 # argument (e.g. `output`) has a non-ascii character. | |
102 u'Command errored out with exit status {exit_status}:\n' | |
103 ' command: {command_display}\n' | |
104 ' cwd: {cwd_display}\n' | |
105 'Complete output ({line_count} lines):\n{output}{divider}' | |
106 ).format( | |
107 exit_status=exit_status, | |
108 command_display=command_display, | |
109 cwd_display=cwd_display, | |
110 line_count=len(lines), | |
111 output=output, | |
112 divider=LOG_DIVIDER, | |
113 ) | |
114 return msg | |
115 | |
116 | |
117 def call_subprocess( | |
118 cmd, # type: Union[List[str], CommandArgs] | |
119 show_stdout=False, # type: bool | |
120 cwd=None, # type: Optional[str] | |
121 on_returncode='raise', # type: str | |
122 extra_ok_returncodes=None, # type: Optional[Iterable[int]] | |
123 command_desc=None, # type: Optional[str] | |
124 extra_environ=None, # type: Optional[Mapping[str, Any]] | |
125 unset_environ=None, # type: Optional[Iterable[str]] | |
126 spinner=None, # type: Optional[SpinnerInterface] | |
127 log_failed_cmd=True # type: Optional[bool] | |
128 ): | |
129 # type: (...) -> Text | |
130 """ | |
131 Args: | |
132 show_stdout: if true, use INFO to log the subprocess's stderr and | |
133 stdout streams. Otherwise, use DEBUG. Defaults to False. | |
134 extra_ok_returncodes: an iterable of integer return codes that are | |
135 acceptable, in addition to 0. Defaults to None, which means []. | |
136 unset_environ: an iterable of environment variable names to unset | |
137 prior to calling subprocess.Popen(). | |
138 log_failed_cmd: if false, failed commands are not logged, only raised. | |
139 """ | |
140 if extra_ok_returncodes is None: | |
141 extra_ok_returncodes = [] | |
142 if unset_environ is None: | |
143 unset_environ = [] | |
144 # Most places in pip use show_stdout=False. What this means is-- | |
145 # | |
146 # - We connect the child's output (combined stderr and stdout) to a | |
147 # single pipe, which we read. | |
148 # - We log this output to stderr at DEBUG level as it is received. | |
149 # - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't | |
150 # requested), then we show a spinner so the user can still see the | |
151 # subprocess is in progress. | |
152 # - If the subprocess exits with an error, we log the output to stderr | |
153 # at ERROR level if it hasn't already been displayed to the console | |
154 # (e.g. if --verbose logging wasn't enabled). This way we don't log | |
155 # the output to the console twice. | |
156 # | |
157 # If show_stdout=True, then the above is still done, but with DEBUG | |
158 # replaced by INFO. | |
159 if show_stdout: | |
160 # Then log the subprocess output at INFO level. | |
161 log_subprocess = subprocess_logger.info | |
162 used_level = logging.INFO | |
163 else: | |
164 # Then log the subprocess output using DEBUG. This also ensures | |
165 # it will be logged to the log file (aka user_log), if enabled. | |
166 log_subprocess = subprocess_logger.debug | |
167 used_level = logging.DEBUG | |
168 | |
169 # Whether the subprocess will be visible in the console. | |
170 showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level | |
171 | |
172 # Only use the spinner if we're not showing the subprocess output | |
173 # and we have a spinner. | |
174 use_spinner = not showing_subprocess and spinner is not None | |
175 | |
176 if command_desc is None: | |
177 command_desc = format_command_args(cmd) | |
178 | |
179 log_subprocess("Running command %s", command_desc) | |
180 env = os.environ.copy() | |
181 if extra_environ: | |
182 env.update(extra_environ) | |
183 for name in unset_environ: | |
184 env.pop(name, None) | |
185 try: | |
186 proc = subprocess.Popen( | |
187 # Convert HiddenText objects to the underlying str. | |
188 reveal_command_args(cmd), | |
189 stderr=subprocess.STDOUT, stdin=subprocess.PIPE, | |
190 stdout=subprocess.PIPE, cwd=cwd, env=env, | |
191 ) | |
192 proc.stdin.close() | |
193 except Exception as exc: | |
194 if log_failed_cmd: | |
195 subprocess_logger.critical( | |
196 "Error %s while executing command %s", exc, command_desc, | |
197 ) | |
198 raise | |
199 all_output = [] | |
200 while True: | |
201 # The "line" value is a unicode string in Python 2. | |
202 line = console_to_str(proc.stdout.readline()) | |
203 if not line: | |
204 break | |
205 line = line.rstrip() | |
206 all_output.append(line + '\n') | |
207 | |
208 # Show the line immediately. | |
209 log_subprocess(line) | |
210 # Update the spinner. | |
211 if use_spinner: | |
212 spinner.spin() | |
213 try: | |
214 proc.wait() | |
215 finally: | |
216 if proc.stdout: | |
217 proc.stdout.close() | |
218 proc_had_error = ( | |
219 proc.returncode and proc.returncode not in extra_ok_returncodes | |
220 ) | |
221 if use_spinner: | |
222 if proc_had_error: | |
223 spinner.finish("error") | |
224 else: | |
225 spinner.finish("done") | |
226 if proc_had_error: | |
227 if on_returncode == 'raise': | |
228 if not showing_subprocess and log_failed_cmd: | |
229 # Then the subprocess streams haven't been logged to the | |
230 # console yet. | |
231 msg = make_subprocess_output_error( | |
232 cmd_args=cmd, | |
233 cwd=cwd, | |
234 lines=all_output, | |
235 exit_status=proc.returncode, | |
236 ) | |
237 subprocess_logger.error(msg) | |
238 exc_msg = ( | |
239 'Command errored out with exit status {}: {} ' | |
240 'Check the logs for full command output.' | |
241 ).format(proc.returncode, command_desc) | |
242 raise InstallationError(exc_msg) | |
243 elif on_returncode == 'warn': | |
244 subprocess_logger.warning( | |
245 'Command "%s" had error code %s in %s', | |
246 command_desc, proc.returncode, cwd, | |
247 ) | |
248 elif on_returncode == 'ignore': | |
249 pass | |
250 else: | |
251 raise ValueError('Invalid value: on_returncode=%s' % | |
252 repr(on_returncode)) | |
253 return ''.join(all_output) | |
254 | |
255 | |
256 def runner_with_spinner_message(message): | |
257 # type: (str) -> Callable[..., None] | |
258 """Provide a subprocess_runner that shows a spinner message. | |
259 | |
260 Intended for use with for pep517's Pep517HookCaller. Thus, the runner has | |
261 an API that matches what's expected by Pep517HookCaller.subprocess_runner. | |
262 """ | |
263 | |
264 def runner( | |
265 cmd, # type: List[str] | |
266 cwd=None, # type: Optional[str] | |
267 extra_environ=None # type: Optional[Mapping[str, Any]] | |
268 ): | |
269 # type: (...) -> None | |
270 with open_spinner(message) as spinner: | |
271 call_subprocess( | |
272 cmd, | |
273 cwd=cwd, | |
274 extra_environ=extra_environ, | |
275 spinner=spinner, | |
276 ) | |
277 | |
278 return runner |