comparison lib/python3.8/site-packages/pip/_internal/utils/subprocess.py @ 0:9e54283cc701 draft

"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author guerler
date Mon, 27 Jul 2020 03:47:31 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:9e54283cc701
1 # The following comment should be removed at some point in the future.
2 # mypy: strict-optional=False
3
4 from __future__ import absolute_import
5
6 import logging
7 import os
8 import subprocess
9
10 from pip._vendor.six.moves import shlex_quote
11
12 from pip._internal.exceptions import InstallationError
13 from pip._internal.utils.compat import console_to_str, str_to_display
14 from pip._internal.utils.logging import subprocess_logger
15 from pip._internal.utils.misc import HiddenText, path_to_display
16 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
17 from pip._internal.utils.ui import open_spinner
18
19 if MYPY_CHECK_RUNNING:
20 from typing import (
21 Any, Callable, Iterable, List, Mapping, Optional, Text, Union,
22 )
23 from pip._internal.utils.ui import SpinnerInterface
24
25 CommandArgs = List[Union[str, HiddenText]]
26
27
28 LOG_DIVIDER = '----------------------------------------'
29
30
31 def make_command(*args):
32 # type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs
33 """
34 Create a CommandArgs object.
35 """
36 command_args = [] # type: CommandArgs
37 for arg in args:
38 # Check for list instead of CommandArgs since CommandArgs is
39 # only known during type-checking.
40 if isinstance(arg, list):
41 command_args.extend(arg)
42 else:
43 # Otherwise, arg is str or HiddenText.
44 command_args.append(arg)
45
46 return command_args
47
48
49 def format_command_args(args):
50 # type: (Union[List[str], CommandArgs]) -> str
51 """
52 Format command arguments for display.
53 """
54 # For HiddenText arguments, display the redacted form by calling str().
55 # Also, we don't apply str() to arguments that aren't HiddenText since
56 # this can trigger a UnicodeDecodeError in Python 2 if the argument
57 # has type unicode and includes a non-ascii character. (The type
58 # checker doesn't ensure the annotations are correct in all cases.)
59 return ' '.join(
60 shlex_quote(str(arg)) if isinstance(arg, HiddenText)
61 else shlex_quote(arg) for arg in args
62 )
63
64
65 def reveal_command_args(args):
66 # type: (Union[List[str], CommandArgs]) -> List[str]
67 """
68 Return the arguments in their raw, unredacted form.
69 """
70 return [
71 arg.secret if isinstance(arg, HiddenText) else arg for arg in args
72 ]
73
74
75 def make_subprocess_output_error(
76 cmd_args, # type: Union[List[str], CommandArgs]
77 cwd, # type: Optional[str]
78 lines, # type: List[Text]
79 exit_status, # type: int
80 ):
81 # type: (...) -> Text
82 """
83 Create and return the error message to use to log a subprocess error
84 with command output.
85
86 :param lines: A list of lines, each ending with a newline.
87 """
88 command = format_command_args(cmd_args)
89 # Convert `command` and `cwd` to text (unicode in Python 2) so we can use
90 # them as arguments in the unicode format string below. This avoids
91 # "UnicodeDecodeError: 'ascii' codec can't decode byte ..." in Python 2
92 # if either contains a non-ascii character.
93 command_display = str_to_display(command, desc='command bytes')
94 cwd_display = path_to_display(cwd)
95
96 # We know the joined output value ends in a newline.
97 output = ''.join(lines)
98 msg = (
99 # Use a unicode string to avoid "UnicodeEncodeError: 'ascii'
100 # codec can't encode character ..." in Python 2 when a format
101 # argument (e.g. `output`) has a non-ascii character.
102 u'Command errored out with exit status {exit_status}:\n'
103 ' command: {command_display}\n'
104 ' cwd: {cwd_display}\n'
105 'Complete output ({line_count} lines):\n{output}{divider}'
106 ).format(
107 exit_status=exit_status,
108 command_display=command_display,
109 cwd_display=cwd_display,
110 line_count=len(lines),
111 output=output,
112 divider=LOG_DIVIDER,
113 )
114 return msg
115
116
117 def call_subprocess(
118 cmd, # type: Union[List[str], CommandArgs]
119 show_stdout=False, # type: bool
120 cwd=None, # type: Optional[str]
121 on_returncode='raise', # type: str
122 extra_ok_returncodes=None, # type: Optional[Iterable[int]]
123 command_desc=None, # type: Optional[str]
124 extra_environ=None, # type: Optional[Mapping[str, Any]]
125 unset_environ=None, # type: Optional[Iterable[str]]
126 spinner=None, # type: Optional[SpinnerInterface]
127 log_failed_cmd=True # type: Optional[bool]
128 ):
129 # type: (...) -> Text
130 """
131 Args:
132 show_stdout: if true, use INFO to log the subprocess's stderr and
133 stdout streams. Otherwise, use DEBUG. Defaults to False.
134 extra_ok_returncodes: an iterable of integer return codes that are
135 acceptable, in addition to 0. Defaults to None, which means [].
136 unset_environ: an iterable of environment variable names to unset
137 prior to calling subprocess.Popen().
138 log_failed_cmd: if false, failed commands are not logged, only raised.
139 """
140 if extra_ok_returncodes is None:
141 extra_ok_returncodes = []
142 if unset_environ is None:
143 unset_environ = []
144 # Most places in pip use show_stdout=False. What this means is--
145 #
146 # - We connect the child's output (combined stderr and stdout) to a
147 # single pipe, which we read.
148 # - We log this output to stderr at DEBUG level as it is received.
149 # - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't
150 # requested), then we show a spinner so the user can still see the
151 # subprocess is in progress.
152 # - If the subprocess exits with an error, we log the output to stderr
153 # at ERROR level if it hasn't already been displayed to the console
154 # (e.g. if --verbose logging wasn't enabled). This way we don't log
155 # the output to the console twice.
156 #
157 # If show_stdout=True, then the above is still done, but with DEBUG
158 # replaced by INFO.
159 if show_stdout:
160 # Then log the subprocess output at INFO level.
161 log_subprocess = subprocess_logger.info
162 used_level = logging.INFO
163 else:
164 # Then log the subprocess output using DEBUG. This also ensures
165 # it will be logged to the log file (aka user_log), if enabled.
166 log_subprocess = subprocess_logger.debug
167 used_level = logging.DEBUG
168
169 # Whether the subprocess will be visible in the console.
170 showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level
171
172 # Only use the spinner if we're not showing the subprocess output
173 # and we have a spinner.
174 use_spinner = not showing_subprocess and spinner is not None
175
176 if command_desc is None:
177 command_desc = format_command_args(cmd)
178
179 log_subprocess("Running command %s", command_desc)
180 env = os.environ.copy()
181 if extra_environ:
182 env.update(extra_environ)
183 for name in unset_environ:
184 env.pop(name, None)
185 try:
186 proc = subprocess.Popen(
187 # Convert HiddenText objects to the underlying str.
188 reveal_command_args(cmd),
189 stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
190 stdout=subprocess.PIPE, cwd=cwd, env=env,
191 )
192 proc.stdin.close()
193 except Exception as exc:
194 if log_failed_cmd:
195 subprocess_logger.critical(
196 "Error %s while executing command %s", exc, command_desc,
197 )
198 raise
199 all_output = []
200 while True:
201 # The "line" value is a unicode string in Python 2.
202 line = console_to_str(proc.stdout.readline())
203 if not line:
204 break
205 line = line.rstrip()
206 all_output.append(line + '\n')
207
208 # Show the line immediately.
209 log_subprocess(line)
210 # Update the spinner.
211 if use_spinner:
212 spinner.spin()
213 try:
214 proc.wait()
215 finally:
216 if proc.stdout:
217 proc.stdout.close()
218 proc_had_error = (
219 proc.returncode and proc.returncode not in extra_ok_returncodes
220 )
221 if use_spinner:
222 if proc_had_error:
223 spinner.finish("error")
224 else:
225 spinner.finish("done")
226 if proc_had_error:
227 if on_returncode == 'raise':
228 if not showing_subprocess and log_failed_cmd:
229 # Then the subprocess streams haven't been logged to the
230 # console yet.
231 msg = make_subprocess_output_error(
232 cmd_args=cmd,
233 cwd=cwd,
234 lines=all_output,
235 exit_status=proc.returncode,
236 )
237 subprocess_logger.error(msg)
238 exc_msg = (
239 'Command errored out with exit status {}: {} '
240 'Check the logs for full command output.'
241 ).format(proc.returncode, command_desc)
242 raise InstallationError(exc_msg)
243 elif on_returncode == 'warn':
244 subprocess_logger.warning(
245 'Command "%s" had error code %s in %s',
246 command_desc, proc.returncode, cwd,
247 )
248 elif on_returncode == 'ignore':
249 pass
250 else:
251 raise ValueError('Invalid value: on_returncode=%s' %
252 repr(on_returncode))
253 return ''.join(all_output)
254
255
256 def runner_with_spinner_message(message):
257 # type: (str) -> Callable[..., None]
258 """Provide a subprocess_runner that shows a spinner message.
259
260 Intended for use with for pep517's Pep517HookCaller. Thus, the runner has
261 an API that matches what's expected by Pep517HookCaller.subprocess_runner.
262 """
263
264 def runner(
265 cmd, # type: List[str]
266 cwd=None, # type: Optional[str]
267 extra_environ=None # type: Optional[Mapping[str, Any]]
268 ):
269 # type: (...) -> None
270 with open_spinner(message) as spinner:
271 call_subprocess(
272 cmd,
273 cwd=cwd,
274 extra_environ=extra_environ,
275 spinner=spinner,
276 )
277
278 return runner