Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/utils/temp_dir.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
| author | guerler |
|---|---|
| date | Mon, 27 Jul 2020 03:47:31 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:9e54283cc701 |
|---|---|
| 1 from __future__ import absolute_import | |
| 2 | |
| 3 import errno | |
| 4 import itertools | |
| 5 import logging | |
| 6 import os.path | |
| 7 import tempfile | |
| 8 from contextlib import contextmanager | |
| 9 | |
| 10 from pip._vendor.contextlib2 import ExitStack | |
| 11 | |
| 12 from pip._internal.utils.misc import rmtree | |
| 13 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
| 14 | |
| 15 if MYPY_CHECK_RUNNING: | |
| 16 from typing import Any, Dict, Iterator, Optional, TypeVar | |
| 17 | |
| 18 _T = TypeVar('_T', bound='TempDirectory') | |
| 19 | |
| 20 | |
| 21 logger = logging.getLogger(__name__) | |
| 22 | |
| 23 | |
| 24 _tempdir_manager = None # type: Optional[ExitStack] | |
| 25 | |
| 26 | |
| 27 @contextmanager | |
| 28 def global_tempdir_manager(): | |
| 29 # type: () -> Iterator[None] | |
| 30 global _tempdir_manager | |
| 31 with ExitStack() as stack: | |
| 32 old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack | |
| 33 try: | |
| 34 yield | |
| 35 finally: | |
| 36 _tempdir_manager = old_tempdir_manager | |
| 37 | |
| 38 | |
| 39 class TempDirectoryTypeRegistry(object): | |
| 40 """Manages temp directory behavior | |
| 41 """ | |
| 42 | |
| 43 def __init__(self): | |
| 44 # type: () -> None | |
| 45 self._should_delete = {} # type: Dict[str, bool] | |
| 46 | |
| 47 def set_delete(self, kind, value): | |
| 48 # type: (str, bool) -> None | |
| 49 """Indicate whether a TempDirectory of the given kind should be | |
| 50 auto-deleted. | |
| 51 """ | |
| 52 self._should_delete[kind] = value | |
| 53 | |
| 54 def get_delete(self, kind): | |
| 55 # type: (str) -> bool | |
| 56 """Get configured auto-delete flag for a given TempDirectory type, | |
| 57 default True. | |
| 58 """ | |
| 59 return self._should_delete.get(kind, True) | |
| 60 | |
| 61 | |
| 62 _tempdir_registry = None # type: Optional[TempDirectoryTypeRegistry] | |
| 63 | |
| 64 | |
| 65 @contextmanager | |
| 66 def tempdir_registry(): | |
| 67 # type: () -> Iterator[TempDirectoryTypeRegistry] | |
| 68 """Provides a scoped global tempdir registry that can be used to dictate | |
| 69 whether directories should be deleted. | |
| 70 """ | |
| 71 global _tempdir_registry | |
| 72 old_tempdir_registry = _tempdir_registry | |
| 73 _tempdir_registry = TempDirectoryTypeRegistry() | |
| 74 try: | |
| 75 yield _tempdir_registry | |
| 76 finally: | |
| 77 _tempdir_registry = old_tempdir_registry | |
| 78 | |
| 79 | |
| 80 class TempDirectory(object): | |
| 81 """Helper class that owns and cleans up a temporary directory. | |
| 82 | |
| 83 This class can be used as a context manager or as an OO representation of a | |
| 84 temporary directory. | |
| 85 | |
| 86 Attributes: | |
| 87 path | |
| 88 Location to the created temporary directory | |
| 89 delete | |
| 90 Whether the directory should be deleted when exiting | |
| 91 (when used as a contextmanager) | |
| 92 | |
| 93 Methods: | |
| 94 cleanup() | |
| 95 Deletes the temporary directory | |
| 96 | |
| 97 When used as a context manager, if the delete attribute is True, on | |
| 98 exiting the context the temporary directory is deleted. | |
| 99 """ | |
| 100 | |
| 101 def __init__( | |
| 102 self, | |
| 103 path=None, # type: Optional[str] | |
| 104 delete=None, # type: Optional[bool] | |
| 105 kind="temp", # type: str | |
| 106 globally_managed=False, # type: bool | |
| 107 ): | |
| 108 super(TempDirectory, self).__init__() | |
| 109 | |
| 110 # If we were given an explicit directory, resolve delete option now. | |
| 111 # Otherwise we wait until cleanup and see what tempdir_registry says. | |
| 112 if path is not None and delete is None: | |
| 113 delete = False | |
| 114 | |
| 115 if path is None: | |
| 116 path = self._create(kind) | |
| 117 | |
| 118 self._path = path | |
| 119 self._deleted = False | |
| 120 self.delete = delete | |
| 121 self.kind = kind | |
| 122 | |
| 123 if globally_managed: | |
| 124 assert _tempdir_manager is not None | |
| 125 _tempdir_manager.enter_context(self) | |
| 126 | |
| 127 @property | |
| 128 def path(self): | |
| 129 # type: () -> str | |
| 130 assert not self._deleted, ( | |
| 131 "Attempted to access deleted path: {}".format(self._path) | |
| 132 ) | |
| 133 return self._path | |
| 134 | |
| 135 def __repr__(self): | |
| 136 # type: () -> str | |
| 137 return "<{} {!r}>".format(self.__class__.__name__, self.path) | |
| 138 | |
| 139 def __enter__(self): | |
| 140 # type: (_T) -> _T | |
| 141 return self | |
| 142 | |
| 143 def __exit__(self, exc, value, tb): | |
| 144 # type: (Any, Any, Any) -> None | |
| 145 if self.delete is not None: | |
| 146 delete = self.delete | |
| 147 elif _tempdir_registry: | |
| 148 delete = _tempdir_registry.get_delete(self.kind) | |
| 149 else: | |
| 150 delete = True | |
| 151 | |
| 152 if delete: | |
| 153 self.cleanup() | |
| 154 | |
| 155 def _create(self, kind): | |
| 156 # type: (str) -> str | |
| 157 """Create a temporary directory and store its path in self.path | |
| 158 """ | |
| 159 # We realpath here because some systems have their default tmpdir | |
| 160 # symlinked to another directory. This tends to confuse build | |
| 161 # scripts, so we canonicalize the path by traversing potential | |
| 162 # symlinks here. | |
| 163 path = os.path.realpath( | |
| 164 tempfile.mkdtemp(prefix="pip-{}-".format(kind)) | |
| 165 ) | |
| 166 logger.debug("Created temporary directory: {}".format(path)) | |
| 167 return path | |
| 168 | |
| 169 def cleanup(self): | |
| 170 # type: () -> None | |
| 171 """Remove the temporary directory created and reset state | |
| 172 """ | |
| 173 self._deleted = True | |
| 174 if os.path.exists(self._path): | |
| 175 rmtree(self._path) | |
| 176 | |
| 177 | |
| 178 class AdjacentTempDirectory(TempDirectory): | |
| 179 """Helper class that creates a temporary directory adjacent to a real one. | |
| 180 | |
| 181 Attributes: | |
| 182 original | |
| 183 The original directory to create a temp directory for. | |
| 184 path | |
| 185 After calling create() or entering, contains the full | |
| 186 path to the temporary directory. | |
| 187 delete | |
| 188 Whether the directory should be deleted when exiting | |
| 189 (when used as a contextmanager) | |
| 190 | |
| 191 """ | |
| 192 # The characters that may be used to name the temp directory | |
| 193 # We always prepend a ~ and then rotate through these until | |
| 194 # a usable name is found. | |
| 195 # pkg_resources raises a different error for .dist-info folder | |
| 196 # with leading '-' and invalid metadata | |
| 197 LEADING_CHARS = "-~.=%0123456789" | |
| 198 | |
| 199 def __init__(self, original, delete=None): | |
| 200 # type: (str, Optional[bool]) -> None | |
| 201 self.original = original.rstrip('/\\') | |
| 202 super(AdjacentTempDirectory, self).__init__(delete=delete) | |
| 203 | |
| 204 @classmethod | |
| 205 def _generate_names(cls, name): | |
| 206 # type: (str) -> Iterator[str] | |
| 207 """Generates a series of temporary names. | |
| 208 | |
| 209 The algorithm replaces the leading characters in the name | |
| 210 with ones that are valid filesystem characters, but are not | |
| 211 valid package names (for both Python and pip definitions of | |
| 212 package). | |
| 213 """ | |
| 214 for i in range(1, len(name)): | |
| 215 for candidate in itertools.combinations_with_replacement( | |
| 216 cls.LEADING_CHARS, i - 1): | |
| 217 new_name = '~' + ''.join(candidate) + name[i:] | |
| 218 if new_name != name: | |
| 219 yield new_name | |
| 220 | |
| 221 # If we make it this far, we will have to make a longer name | |
| 222 for i in range(len(cls.LEADING_CHARS)): | |
| 223 for candidate in itertools.combinations_with_replacement( | |
| 224 cls.LEADING_CHARS, i): | |
| 225 new_name = '~' + ''.join(candidate) + name | |
| 226 if new_name != name: | |
| 227 yield new_name | |
| 228 | |
| 229 def _create(self, kind): | |
| 230 # type: (str) -> str | |
| 231 root, name = os.path.split(self.original) | |
| 232 for candidate in self._generate_names(name): | |
| 233 path = os.path.join(root, candidate) | |
| 234 try: | |
| 235 os.mkdir(path) | |
| 236 except OSError as ex: | |
| 237 # Continue if the name exists already | |
| 238 if ex.errno != errno.EEXIST: | |
| 239 raise | |
| 240 else: | |
| 241 path = os.path.realpath(path) | |
| 242 break | |
| 243 else: | |
| 244 # Final fallback on the default behavior. | |
| 245 path = os.path.realpath( | |
| 246 tempfile.mkdtemp(prefix="pip-{}-".format(kind)) | |
| 247 ) | |
| 248 | |
| 249 logger.debug("Created temporary directory: {}".format(path)) | |
| 250 return path |
