comparison lib/python3.8/site-packages/pip/_internal/utils/temp_dir.py @ 0:9e54283cc701 draft

"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author guerler
date Mon, 27 Jul 2020 03:47:31 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:9e54283cc701
1 from __future__ import absolute_import
2
3 import errno
4 import itertools
5 import logging
6 import os.path
7 import tempfile
8 from contextlib import contextmanager
9
10 from pip._vendor.contextlib2 import ExitStack
11
12 from pip._internal.utils.misc import rmtree
13 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
14
15 if MYPY_CHECK_RUNNING:
16 from typing import Any, Dict, Iterator, Optional, TypeVar
17
18 _T = TypeVar('_T', bound='TempDirectory')
19
20
21 logger = logging.getLogger(__name__)
22
23
24 _tempdir_manager = None # type: Optional[ExitStack]
25
26
27 @contextmanager
28 def global_tempdir_manager():
29 # type: () -> Iterator[None]
30 global _tempdir_manager
31 with ExitStack() as stack:
32 old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
33 try:
34 yield
35 finally:
36 _tempdir_manager = old_tempdir_manager
37
38
39 class TempDirectoryTypeRegistry(object):
40 """Manages temp directory behavior
41 """
42
43 def __init__(self):
44 # type: () -> None
45 self._should_delete = {} # type: Dict[str, bool]
46
47 def set_delete(self, kind, value):
48 # type: (str, bool) -> None
49 """Indicate whether a TempDirectory of the given kind should be
50 auto-deleted.
51 """
52 self._should_delete[kind] = value
53
54 def get_delete(self, kind):
55 # type: (str) -> bool
56 """Get configured auto-delete flag for a given TempDirectory type,
57 default True.
58 """
59 return self._should_delete.get(kind, True)
60
61
62 _tempdir_registry = None # type: Optional[TempDirectoryTypeRegistry]
63
64
65 @contextmanager
66 def tempdir_registry():
67 # type: () -> Iterator[TempDirectoryTypeRegistry]
68 """Provides a scoped global tempdir registry that can be used to dictate
69 whether directories should be deleted.
70 """
71 global _tempdir_registry
72 old_tempdir_registry = _tempdir_registry
73 _tempdir_registry = TempDirectoryTypeRegistry()
74 try:
75 yield _tempdir_registry
76 finally:
77 _tempdir_registry = old_tempdir_registry
78
79
80 class TempDirectory(object):
81 """Helper class that owns and cleans up a temporary directory.
82
83 This class can be used as a context manager or as an OO representation of a
84 temporary directory.
85
86 Attributes:
87 path
88 Location to the created temporary directory
89 delete
90 Whether the directory should be deleted when exiting
91 (when used as a contextmanager)
92
93 Methods:
94 cleanup()
95 Deletes the temporary directory
96
97 When used as a context manager, if the delete attribute is True, on
98 exiting the context the temporary directory is deleted.
99 """
100
101 def __init__(
102 self,
103 path=None, # type: Optional[str]
104 delete=None, # type: Optional[bool]
105 kind="temp", # type: str
106 globally_managed=False, # type: bool
107 ):
108 super(TempDirectory, self).__init__()
109
110 # If we were given an explicit directory, resolve delete option now.
111 # Otherwise we wait until cleanup and see what tempdir_registry says.
112 if path is not None and delete is None:
113 delete = False
114
115 if path is None:
116 path = self._create(kind)
117
118 self._path = path
119 self._deleted = False
120 self.delete = delete
121 self.kind = kind
122
123 if globally_managed:
124 assert _tempdir_manager is not None
125 _tempdir_manager.enter_context(self)
126
127 @property
128 def path(self):
129 # type: () -> str
130 assert not self._deleted, (
131 "Attempted to access deleted path: {}".format(self._path)
132 )
133 return self._path
134
135 def __repr__(self):
136 # type: () -> str
137 return "<{} {!r}>".format(self.__class__.__name__, self.path)
138
139 def __enter__(self):
140 # type: (_T) -> _T
141 return self
142
143 def __exit__(self, exc, value, tb):
144 # type: (Any, Any, Any) -> None
145 if self.delete is not None:
146 delete = self.delete
147 elif _tempdir_registry:
148 delete = _tempdir_registry.get_delete(self.kind)
149 else:
150 delete = True
151
152 if delete:
153 self.cleanup()
154
155 def _create(self, kind):
156 # type: (str) -> str
157 """Create a temporary directory and store its path in self.path
158 """
159 # We realpath here because some systems have their default tmpdir
160 # symlinked to another directory. This tends to confuse build
161 # scripts, so we canonicalize the path by traversing potential
162 # symlinks here.
163 path = os.path.realpath(
164 tempfile.mkdtemp(prefix="pip-{}-".format(kind))
165 )
166 logger.debug("Created temporary directory: {}".format(path))
167 return path
168
169 def cleanup(self):
170 # type: () -> None
171 """Remove the temporary directory created and reset state
172 """
173 self._deleted = True
174 if os.path.exists(self._path):
175 rmtree(self._path)
176
177
178 class AdjacentTempDirectory(TempDirectory):
179 """Helper class that creates a temporary directory adjacent to a real one.
180
181 Attributes:
182 original
183 The original directory to create a temp directory for.
184 path
185 After calling create() or entering, contains the full
186 path to the temporary directory.
187 delete
188 Whether the directory should be deleted when exiting
189 (when used as a contextmanager)
190
191 """
192 # The characters that may be used to name the temp directory
193 # We always prepend a ~ and then rotate through these until
194 # a usable name is found.
195 # pkg_resources raises a different error for .dist-info folder
196 # with leading '-' and invalid metadata
197 LEADING_CHARS = "-~.=%0123456789"
198
199 def __init__(self, original, delete=None):
200 # type: (str, Optional[bool]) -> None
201 self.original = original.rstrip('/\\')
202 super(AdjacentTempDirectory, self).__init__(delete=delete)
203
204 @classmethod
205 def _generate_names(cls, name):
206 # type: (str) -> Iterator[str]
207 """Generates a series of temporary names.
208
209 The algorithm replaces the leading characters in the name
210 with ones that are valid filesystem characters, but are not
211 valid package names (for both Python and pip definitions of
212 package).
213 """
214 for i in range(1, len(name)):
215 for candidate in itertools.combinations_with_replacement(
216 cls.LEADING_CHARS, i - 1):
217 new_name = '~' + ''.join(candidate) + name[i:]
218 if new_name != name:
219 yield new_name
220
221 # If we make it this far, we will have to make a longer name
222 for i in range(len(cls.LEADING_CHARS)):
223 for candidate in itertools.combinations_with_replacement(
224 cls.LEADING_CHARS, i):
225 new_name = '~' + ''.join(candidate) + name
226 if new_name != name:
227 yield new_name
228
229 def _create(self, kind):
230 # type: (str) -> str
231 root, name = os.path.split(self.original)
232 for candidate in self._generate_names(name):
233 path = os.path.join(root, candidate)
234 try:
235 os.mkdir(path)
236 except OSError as ex:
237 # Continue if the name exists already
238 if ex.errno != errno.EEXIST:
239 raise
240 else:
241 path = os.path.realpath(path)
242 break
243 else:
244 # Final fallback on the default behavior.
245 path = os.path.realpath(
246 tempfile.mkdtemp(prefix="pip-{}-".format(kind))
247 )
248
249 logger.debug("Created temporary directory: {}".format(path))
250 return path