Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/utils/unpacking.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:47:31 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9e54283cc701 |
---|---|
1 """Utilities related archives. | |
2 """ | |
3 | |
4 # The following comment should be removed at some point in the future. | |
5 # mypy: strict-optional=False | |
6 # mypy: disallow-untyped-defs=False | |
7 | |
8 from __future__ import absolute_import | |
9 | |
10 import logging | |
11 import os | |
12 import shutil | |
13 import stat | |
14 import tarfile | |
15 import zipfile | |
16 | |
17 from pip._internal.exceptions import InstallationError | |
18 from pip._internal.utils.filetypes import ( | |
19 BZ2_EXTENSIONS, | |
20 TAR_EXTENSIONS, | |
21 XZ_EXTENSIONS, | |
22 ZIP_EXTENSIONS, | |
23 ) | |
24 from pip._internal.utils.misc import ensure_dir | |
25 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
26 | |
27 if MYPY_CHECK_RUNNING: | |
28 from typing import Iterable, List, Optional, Text, Union | |
29 | |
30 | |
31 logger = logging.getLogger(__name__) | |
32 | |
33 | |
34 SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS | |
35 | |
36 try: | |
37 import bz2 # noqa | |
38 SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS | |
39 except ImportError: | |
40 logger.debug('bz2 module is not available') | |
41 | |
42 try: | |
43 # Only for Python 3.3+ | |
44 import lzma # noqa | |
45 SUPPORTED_EXTENSIONS += XZ_EXTENSIONS | |
46 except ImportError: | |
47 logger.debug('lzma module is not available') | |
48 | |
49 | |
50 def current_umask(): | |
51 """Get the current umask which involves having to set it temporarily.""" | |
52 mask = os.umask(0) | |
53 os.umask(mask) | |
54 return mask | |
55 | |
56 | |
57 def split_leading_dir(path): | |
58 # type: (Union[str, Text]) -> List[Union[str, Text]] | |
59 path = path.lstrip('/').lstrip('\\') | |
60 if ( | |
61 '/' in path and ( | |
62 ('\\' in path and path.find('/') < path.find('\\')) or | |
63 '\\' not in path | |
64 ) | |
65 ): | |
66 return path.split('/', 1) | |
67 elif '\\' in path: | |
68 return path.split('\\', 1) | |
69 else: | |
70 return [path, ''] | |
71 | |
72 | |
73 def has_leading_dir(paths): | |
74 # type: (Iterable[Union[str, Text]]) -> bool | |
75 """Returns true if all the paths have the same leading path name | |
76 (i.e., everything is in one subdirectory in an archive)""" | |
77 common_prefix = None | |
78 for path in paths: | |
79 prefix, rest = split_leading_dir(path) | |
80 if not prefix: | |
81 return False | |
82 elif common_prefix is None: | |
83 common_prefix = prefix | |
84 elif prefix != common_prefix: | |
85 return False | |
86 return True | |
87 | |
88 | |
89 def is_within_directory(directory, target): | |
90 # type: ((Union[str, Text]), (Union[str, Text])) -> bool | |
91 """ | |
92 Return true if the absolute path of target is within the directory | |
93 """ | |
94 abs_directory = os.path.abspath(directory) | |
95 abs_target = os.path.abspath(target) | |
96 | |
97 prefix = os.path.commonprefix([abs_directory, abs_target]) | |
98 return prefix == abs_directory | |
99 | |
100 | |
101 def unzip_file(filename, location, flatten=True): | |
102 # type: (str, str, bool) -> None | |
103 """ | |
104 Unzip the file (with path `filename`) to the destination `location`. All | |
105 files are written based on system defaults and umask (i.e. permissions are | |
106 not preserved), except that regular file members with any execute | |
107 permissions (user, group, or world) have "chmod +x" applied after being | |
108 written. Note that for windows, any execute changes using os.chmod are | |
109 no-ops per the python docs. | |
110 """ | |
111 ensure_dir(location) | |
112 zipfp = open(filename, 'rb') | |
113 try: | |
114 zip = zipfile.ZipFile(zipfp, allowZip64=True) | |
115 leading = has_leading_dir(zip.namelist()) and flatten | |
116 for info in zip.infolist(): | |
117 name = info.filename | |
118 fn = name | |
119 if leading: | |
120 fn = split_leading_dir(name)[1] | |
121 fn = os.path.join(location, fn) | |
122 dir = os.path.dirname(fn) | |
123 if not is_within_directory(location, fn): | |
124 message = ( | |
125 'The zip file ({}) has a file ({}) trying to install ' | |
126 'outside target directory ({})' | |
127 ) | |
128 raise InstallationError(message.format(filename, fn, location)) | |
129 if fn.endswith('/') or fn.endswith('\\'): | |
130 # A directory | |
131 ensure_dir(fn) | |
132 else: | |
133 ensure_dir(dir) | |
134 # Don't use read() to avoid allocating an arbitrarily large | |
135 # chunk of memory for the file's content | |
136 fp = zip.open(name) | |
137 try: | |
138 with open(fn, 'wb') as destfp: | |
139 shutil.copyfileobj(fp, destfp) | |
140 finally: | |
141 fp.close() | |
142 mode = info.external_attr >> 16 | |
143 # if mode and regular file and any execute permissions for | |
144 # user/group/world? | |
145 if mode and stat.S_ISREG(mode) and mode & 0o111: | |
146 # make dest file have execute for user/group/world | |
147 # (chmod +x) no-op on windows per python docs | |
148 os.chmod(fn, (0o777 - current_umask() | 0o111)) | |
149 finally: | |
150 zipfp.close() | |
151 | |
152 | |
153 def untar_file(filename, location): | |
154 # type: (str, str) -> None | |
155 """ | |
156 Untar the file (with path `filename`) to the destination `location`. | |
157 All files are written based on system defaults and umask (i.e. permissions | |
158 are not preserved), except that regular file members with any execute | |
159 permissions (user, group, or world) have "chmod +x" applied after being | |
160 written. Note that for windows, any execute changes using os.chmod are | |
161 no-ops per the python docs. | |
162 """ | |
163 ensure_dir(location) | |
164 if filename.lower().endswith('.gz') or filename.lower().endswith('.tgz'): | |
165 mode = 'r:gz' | |
166 elif filename.lower().endswith(BZ2_EXTENSIONS): | |
167 mode = 'r:bz2' | |
168 elif filename.lower().endswith(XZ_EXTENSIONS): | |
169 mode = 'r:xz' | |
170 elif filename.lower().endswith('.tar'): | |
171 mode = 'r' | |
172 else: | |
173 logger.warning( | |
174 'Cannot determine compression type for file %s', filename, | |
175 ) | |
176 mode = 'r:*' | |
177 tar = tarfile.open(filename, mode) | |
178 try: | |
179 leading = has_leading_dir([ | |
180 member.name for member in tar.getmembers() | |
181 ]) | |
182 for member in tar.getmembers(): | |
183 fn = member.name | |
184 if leading: | |
185 # https://github.com/python/mypy/issues/1174 | |
186 fn = split_leading_dir(fn)[1] # type: ignore | |
187 path = os.path.join(location, fn) | |
188 if not is_within_directory(location, path): | |
189 message = ( | |
190 'The tar file ({}) has a file ({}) trying to install ' | |
191 'outside target directory ({})' | |
192 ) | |
193 raise InstallationError( | |
194 message.format(filename, path, location) | |
195 ) | |
196 if member.isdir(): | |
197 ensure_dir(path) | |
198 elif member.issym(): | |
199 try: | |
200 # https://github.com/python/typeshed/issues/2673 | |
201 tar._extract_member(member, path) # type: ignore | |
202 except Exception as exc: | |
203 # Some corrupt tar files seem to produce this | |
204 # (specifically bad symlinks) | |
205 logger.warning( | |
206 'In the tar file %s the member %s is invalid: %s', | |
207 filename, member.name, exc, | |
208 ) | |
209 continue | |
210 else: | |
211 try: | |
212 fp = tar.extractfile(member) | |
213 except (KeyError, AttributeError) as exc: | |
214 # Some corrupt tar files seem to produce this | |
215 # (specifically bad symlinks) | |
216 logger.warning( | |
217 'In the tar file %s the member %s is invalid: %s', | |
218 filename, member.name, exc, | |
219 ) | |
220 continue | |
221 ensure_dir(os.path.dirname(path)) | |
222 with open(path, 'wb') as destfp: | |
223 shutil.copyfileobj(fp, destfp) | |
224 fp.close() | |
225 # Update the timestamp (useful for cython compiled files) | |
226 # https://github.com/python/typeshed/issues/2673 | |
227 tar.utime(member, path) # type: ignore | |
228 # member have any execute permissions for user/group/world? | |
229 if member.mode & 0o111: | |
230 # make dest file have execute for user/group/world | |
231 # no-op on windows per python docs | |
232 os.chmod(path, (0o777 - current_umask() | 0o111)) | |
233 finally: | |
234 tar.close() | |
235 | |
236 | |
237 def unpack_file( | |
238 filename, # type: str | |
239 location, # type: str | |
240 content_type=None, # type: Optional[str] | |
241 ): | |
242 # type: (...) -> None | |
243 filename = os.path.realpath(filename) | |
244 if ( | |
245 content_type == 'application/zip' or | |
246 filename.lower().endswith(ZIP_EXTENSIONS) or | |
247 zipfile.is_zipfile(filename) | |
248 ): | |
249 unzip_file( | |
250 filename, | |
251 location, | |
252 flatten=not filename.endswith('.whl') | |
253 ) | |
254 elif ( | |
255 content_type == 'application/x-gzip' or | |
256 tarfile.is_tarfile(filename) or | |
257 filename.lower().endswith( | |
258 TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS | |
259 ) | |
260 ): | |
261 untar_file(filename, location) | |
262 else: | |
263 # FIXME: handle? | |
264 # FIXME: magic signatures? | |
265 logger.critical( | |
266 'Cannot unpack file %s (downloaded from %s, content-type: %s); ' | |
267 'cannot detect archive format', | |
268 filename, location, content_type, | |
269 ) | |
270 raise InstallationError( | |
271 'Cannot determine archive format of {}'.format(location) | |
272 ) |