Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/utils/subprocess.py @ 1:64071f2a4cf0 draft default tip
Deleted selected files
| author | guerler |
|---|---|
| date | Mon, 27 Jul 2020 03:55:49 -0400 |
| parents | 9e54283cc701 |
| children |
comparison
equal
deleted
inserted
replaced
| 0:9e54283cc701 | 1:64071f2a4cf0 |
|---|---|
| 1 # The following comment should be removed at some point in the future. | |
| 2 # mypy: strict-optional=False | |
| 3 | |
| 4 from __future__ import absolute_import | |
| 5 | |
| 6 import logging | |
| 7 import os | |
| 8 import subprocess | |
| 9 | |
| 10 from pip._vendor.six.moves import shlex_quote | |
| 11 | |
| 12 from pip._internal.exceptions import InstallationError | |
| 13 from pip._internal.utils.compat import console_to_str, str_to_display | |
| 14 from pip._internal.utils.logging import subprocess_logger | |
| 15 from pip._internal.utils.misc import HiddenText, path_to_display | |
| 16 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
| 17 from pip._internal.utils.ui import open_spinner | |
| 18 | |
| 19 if MYPY_CHECK_RUNNING: | |
| 20 from typing import ( | |
| 21 Any, Callable, Iterable, List, Mapping, Optional, Text, Union, | |
| 22 ) | |
| 23 from pip._internal.utils.ui import SpinnerInterface | |
| 24 | |
| 25 CommandArgs = List[Union[str, HiddenText]] | |
| 26 | |
| 27 | |
| 28 LOG_DIVIDER = '----------------------------------------' | |
| 29 | |
| 30 | |
| 31 def make_command(*args): | |
| 32 # type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs | |
| 33 """ | |
| 34 Create a CommandArgs object. | |
| 35 """ | |
| 36 command_args = [] # type: CommandArgs | |
| 37 for arg in args: | |
| 38 # Check for list instead of CommandArgs since CommandArgs is | |
| 39 # only known during type-checking. | |
| 40 if isinstance(arg, list): | |
| 41 command_args.extend(arg) | |
| 42 else: | |
| 43 # Otherwise, arg is str or HiddenText. | |
| 44 command_args.append(arg) | |
| 45 | |
| 46 return command_args | |
| 47 | |
| 48 | |
| 49 def format_command_args(args): | |
| 50 # type: (Union[List[str], CommandArgs]) -> str | |
| 51 """ | |
| 52 Format command arguments for display. | |
| 53 """ | |
| 54 # For HiddenText arguments, display the redacted form by calling str(). | |
| 55 # Also, we don't apply str() to arguments that aren't HiddenText since | |
| 56 # this can trigger a UnicodeDecodeError in Python 2 if the argument | |
| 57 # has type unicode and includes a non-ascii character. (The type | |
| 58 # checker doesn't ensure the annotations are correct in all cases.) | |
| 59 return ' '.join( | |
| 60 shlex_quote(str(arg)) if isinstance(arg, HiddenText) | |
| 61 else shlex_quote(arg) for arg in args | |
| 62 ) | |
| 63 | |
| 64 | |
| 65 def reveal_command_args(args): | |
| 66 # type: (Union[List[str], CommandArgs]) -> List[str] | |
| 67 """ | |
| 68 Return the arguments in their raw, unredacted form. | |
| 69 """ | |
| 70 return [ | |
| 71 arg.secret if isinstance(arg, HiddenText) else arg for arg in args | |
| 72 ] | |
| 73 | |
| 74 | |
| 75 def make_subprocess_output_error( | |
| 76 cmd_args, # type: Union[List[str], CommandArgs] | |
| 77 cwd, # type: Optional[str] | |
| 78 lines, # type: List[Text] | |
| 79 exit_status, # type: int | |
| 80 ): | |
| 81 # type: (...) -> Text | |
| 82 """ | |
| 83 Create and return the error message to use to log a subprocess error | |
| 84 with command output. | |
| 85 | |
| 86 :param lines: A list of lines, each ending with a newline. | |
| 87 """ | |
| 88 command = format_command_args(cmd_args) | |
| 89 # Convert `command` and `cwd` to text (unicode in Python 2) so we can use | |
| 90 # them as arguments in the unicode format string below. This avoids | |
| 91 # "UnicodeDecodeError: 'ascii' codec can't decode byte ..." in Python 2 | |
| 92 # if either contains a non-ascii character. | |
| 93 command_display = str_to_display(command, desc='command bytes') | |
| 94 cwd_display = path_to_display(cwd) | |
| 95 | |
| 96 # We know the joined output value ends in a newline. | |
| 97 output = ''.join(lines) | |
| 98 msg = ( | |
| 99 # Use a unicode string to avoid "UnicodeEncodeError: 'ascii' | |
| 100 # codec can't encode character ..." in Python 2 when a format | |
| 101 # argument (e.g. `output`) has a non-ascii character. | |
| 102 u'Command errored out with exit status {exit_status}:\n' | |
| 103 ' command: {command_display}\n' | |
| 104 ' cwd: {cwd_display}\n' | |
| 105 'Complete output ({line_count} lines):\n{output}{divider}' | |
| 106 ).format( | |
| 107 exit_status=exit_status, | |
| 108 command_display=command_display, | |
| 109 cwd_display=cwd_display, | |
| 110 line_count=len(lines), | |
| 111 output=output, | |
| 112 divider=LOG_DIVIDER, | |
| 113 ) | |
| 114 return msg | |
| 115 | |
| 116 | |
| 117 def call_subprocess( | |
| 118 cmd, # type: Union[List[str], CommandArgs] | |
| 119 show_stdout=False, # type: bool | |
| 120 cwd=None, # type: Optional[str] | |
| 121 on_returncode='raise', # type: str | |
| 122 extra_ok_returncodes=None, # type: Optional[Iterable[int]] | |
| 123 command_desc=None, # type: Optional[str] | |
| 124 extra_environ=None, # type: Optional[Mapping[str, Any]] | |
| 125 unset_environ=None, # type: Optional[Iterable[str]] | |
| 126 spinner=None, # type: Optional[SpinnerInterface] | |
| 127 log_failed_cmd=True # type: Optional[bool] | |
| 128 ): | |
| 129 # type: (...) -> Text | |
| 130 """ | |
| 131 Args: | |
| 132 show_stdout: if true, use INFO to log the subprocess's stderr and | |
| 133 stdout streams. Otherwise, use DEBUG. Defaults to False. | |
| 134 extra_ok_returncodes: an iterable of integer return codes that are | |
| 135 acceptable, in addition to 0. Defaults to None, which means []. | |
| 136 unset_environ: an iterable of environment variable names to unset | |
| 137 prior to calling subprocess.Popen(). | |
| 138 log_failed_cmd: if false, failed commands are not logged, only raised. | |
| 139 """ | |
| 140 if extra_ok_returncodes is None: | |
| 141 extra_ok_returncodes = [] | |
| 142 if unset_environ is None: | |
| 143 unset_environ = [] | |
| 144 # Most places in pip use show_stdout=False. What this means is-- | |
| 145 # | |
| 146 # - We connect the child's output (combined stderr and stdout) to a | |
| 147 # single pipe, which we read. | |
| 148 # - We log this output to stderr at DEBUG level as it is received. | |
| 149 # - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't | |
| 150 # requested), then we show a spinner so the user can still see the | |
| 151 # subprocess is in progress. | |
| 152 # - If the subprocess exits with an error, we log the output to stderr | |
| 153 # at ERROR level if it hasn't already been displayed to the console | |
| 154 # (e.g. if --verbose logging wasn't enabled). This way we don't log | |
| 155 # the output to the console twice. | |
| 156 # | |
| 157 # If show_stdout=True, then the above is still done, but with DEBUG | |
| 158 # replaced by INFO. | |
| 159 if show_stdout: | |
| 160 # Then log the subprocess output at INFO level. | |
| 161 log_subprocess = subprocess_logger.info | |
| 162 used_level = logging.INFO | |
| 163 else: | |
| 164 # Then log the subprocess output using DEBUG. This also ensures | |
| 165 # it will be logged to the log file (aka user_log), if enabled. | |
| 166 log_subprocess = subprocess_logger.debug | |
| 167 used_level = logging.DEBUG | |
| 168 | |
| 169 # Whether the subprocess will be visible in the console. | |
| 170 showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level | |
| 171 | |
| 172 # Only use the spinner if we're not showing the subprocess output | |
| 173 # and we have a spinner. | |
| 174 use_spinner = not showing_subprocess and spinner is not None | |
| 175 | |
| 176 if command_desc is None: | |
| 177 command_desc = format_command_args(cmd) | |
| 178 | |
| 179 log_subprocess("Running command %s", command_desc) | |
| 180 env = os.environ.copy() | |
| 181 if extra_environ: | |
| 182 env.update(extra_environ) | |
| 183 for name in unset_environ: | |
| 184 env.pop(name, None) | |
| 185 try: | |
| 186 proc = subprocess.Popen( | |
| 187 # Convert HiddenText objects to the underlying str. | |
| 188 reveal_command_args(cmd), | |
| 189 stderr=subprocess.STDOUT, stdin=subprocess.PIPE, | |
| 190 stdout=subprocess.PIPE, cwd=cwd, env=env, | |
| 191 ) | |
| 192 proc.stdin.close() | |
| 193 except Exception as exc: | |
| 194 if log_failed_cmd: | |
| 195 subprocess_logger.critical( | |
| 196 "Error %s while executing command %s", exc, command_desc, | |
| 197 ) | |
| 198 raise | |
| 199 all_output = [] | |
| 200 while True: | |
| 201 # The "line" value is a unicode string in Python 2. | |
| 202 line = console_to_str(proc.stdout.readline()) | |
| 203 if not line: | |
| 204 break | |
| 205 line = line.rstrip() | |
| 206 all_output.append(line + '\n') | |
| 207 | |
| 208 # Show the line immediately. | |
| 209 log_subprocess(line) | |
| 210 # Update the spinner. | |
| 211 if use_spinner: | |
| 212 spinner.spin() | |
| 213 try: | |
| 214 proc.wait() | |
| 215 finally: | |
| 216 if proc.stdout: | |
| 217 proc.stdout.close() | |
| 218 proc_had_error = ( | |
| 219 proc.returncode and proc.returncode not in extra_ok_returncodes | |
| 220 ) | |
| 221 if use_spinner: | |
| 222 if proc_had_error: | |
| 223 spinner.finish("error") | |
| 224 else: | |
| 225 spinner.finish("done") | |
| 226 if proc_had_error: | |
| 227 if on_returncode == 'raise': | |
| 228 if not showing_subprocess and log_failed_cmd: | |
| 229 # Then the subprocess streams haven't been logged to the | |
| 230 # console yet. | |
| 231 msg = make_subprocess_output_error( | |
| 232 cmd_args=cmd, | |
| 233 cwd=cwd, | |
| 234 lines=all_output, | |
| 235 exit_status=proc.returncode, | |
| 236 ) | |
| 237 subprocess_logger.error(msg) | |
| 238 exc_msg = ( | |
| 239 'Command errored out with exit status {}: {} ' | |
| 240 'Check the logs for full command output.' | |
| 241 ).format(proc.returncode, command_desc) | |
| 242 raise InstallationError(exc_msg) | |
| 243 elif on_returncode == 'warn': | |
| 244 subprocess_logger.warning( | |
| 245 'Command "%s" had error code %s in %s', | |
| 246 command_desc, proc.returncode, cwd, | |
| 247 ) | |
| 248 elif on_returncode == 'ignore': | |
| 249 pass | |
| 250 else: | |
| 251 raise ValueError('Invalid value: on_returncode=%s' % | |
| 252 repr(on_returncode)) | |
| 253 return ''.join(all_output) | |
| 254 | |
| 255 | |
| 256 def runner_with_spinner_message(message): | |
| 257 # type: (str) -> Callable[..., None] | |
| 258 """Provide a subprocess_runner that shows a spinner message. | |
| 259 | |
| 260 Intended for use with for pep517's Pep517HookCaller. Thus, the runner has | |
| 261 an API that matches what's expected by Pep517HookCaller.subprocess_runner. | |
| 262 """ | |
| 263 | |
| 264 def runner( | |
| 265 cmd, # type: List[str] | |
| 266 cwd=None, # type: Optional[str] | |
| 267 extra_environ=None # type: Optional[Mapping[str, Any]] | |
| 268 ): | |
| 269 # type: (...) -> None | |
| 270 with open_spinner(message) as spinner: | |
| 271 call_subprocess( | |
| 272 cmd, | |
| 273 cwd=cwd, | |
| 274 extra_environ=extra_environ, | |
| 275 spinner=spinner, | |
| 276 ) | |
| 277 | |
| 278 return runner |
