Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/utils/filesystem.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:47:31 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9e54283cc701 |
---|---|
1 import errno | |
2 import os | |
3 import os.path | |
4 import random | |
5 import shutil | |
6 import stat | |
7 import sys | |
8 from contextlib import contextmanager | |
9 from tempfile import NamedTemporaryFile | |
10 | |
11 # NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is | |
12 # why we ignore the type on this import. | |
13 from pip._vendor.retrying import retry # type: ignore | |
14 from pip._vendor.six import PY2 | |
15 | |
16 from pip._internal.utils.compat import get_path_uid | |
17 from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast | |
18 | |
19 if MYPY_CHECK_RUNNING: | |
20 from typing import BinaryIO, Iterator | |
21 | |
22 class NamedTemporaryFileResult(BinaryIO): | |
23 @property | |
24 def file(self): | |
25 # type: () -> BinaryIO | |
26 pass | |
27 | |
28 | |
29 def check_path_owner(path): | |
30 # type: (str) -> bool | |
31 # If we don't have a way to check the effective uid of this process, then | |
32 # we'll just assume that we own the directory. | |
33 if sys.platform == "win32" or not hasattr(os, "geteuid"): | |
34 return True | |
35 | |
36 assert os.path.isabs(path) | |
37 | |
38 previous = None | |
39 while path != previous: | |
40 if os.path.lexists(path): | |
41 # Check if path is writable by current user. | |
42 if os.geteuid() == 0: | |
43 # Special handling for root user in order to handle properly | |
44 # cases where users use sudo without -H flag. | |
45 try: | |
46 path_uid = get_path_uid(path) | |
47 except OSError: | |
48 return False | |
49 return path_uid == 0 | |
50 else: | |
51 return os.access(path, os.W_OK) | |
52 else: | |
53 previous, path = path, os.path.dirname(path) | |
54 return False # assume we don't own the path | |
55 | |
56 | |
57 def copy2_fixed(src, dest): | |
58 # type: (str, str) -> None | |
59 """Wrap shutil.copy2() but map errors copying socket files to | |
60 SpecialFileError as expected. | |
61 | |
62 See also https://bugs.python.org/issue37700. | |
63 """ | |
64 try: | |
65 shutil.copy2(src, dest) | |
66 except (OSError, IOError): | |
67 for f in [src, dest]: | |
68 try: | |
69 is_socket_file = is_socket(f) | |
70 except OSError: | |
71 # An error has already occurred. Another error here is not | |
72 # a problem and we can ignore it. | |
73 pass | |
74 else: | |
75 if is_socket_file: | |
76 raise shutil.SpecialFileError("`%s` is a socket" % f) | |
77 | |
78 raise | |
79 | |
80 | |
81 def is_socket(path): | |
82 # type: (str) -> bool | |
83 return stat.S_ISSOCK(os.lstat(path).st_mode) | |
84 | |
85 | |
86 @contextmanager | |
87 def adjacent_tmp_file(path): | |
88 # type: (str) -> Iterator[NamedTemporaryFileResult] | |
89 """Given a path to a file, open a temp file next to it securely and ensure | |
90 it is written to disk after the context reaches its end. | |
91 """ | |
92 with NamedTemporaryFile( | |
93 delete=False, | |
94 dir=os.path.dirname(path), | |
95 prefix=os.path.basename(path), | |
96 suffix='.tmp', | |
97 ) as f: | |
98 result = cast('NamedTemporaryFileResult', f) | |
99 try: | |
100 yield result | |
101 finally: | |
102 result.file.flush() | |
103 os.fsync(result.file.fileno()) | |
104 | |
105 | |
106 _replace_retry = retry(stop_max_delay=1000, wait_fixed=250) | |
107 | |
108 if PY2: | |
109 @_replace_retry | |
110 def replace(src, dest): | |
111 # type: (str, str) -> None | |
112 try: | |
113 os.rename(src, dest) | |
114 except OSError: | |
115 os.remove(dest) | |
116 os.rename(src, dest) | |
117 | |
118 else: | |
119 replace = _replace_retry(os.replace) | |
120 | |
121 | |
122 # test_writable_dir and _test_writable_dir_win are copied from Flit, | |
123 # with the author's agreement to also place them under pip's license. | |
124 def test_writable_dir(path): | |
125 # type: (str) -> bool | |
126 """Check if a directory is writable. | |
127 | |
128 Uses os.access() on POSIX, tries creating files on Windows. | |
129 """ | |
130 # If the directory doesn't exist, find the closest parent that does. | |
131 while not os.path.isdir(path): | |
132 parent = os.path.dirname(path) | |
133 if parent == path: | |
134 break # Should never get here, but infinite loops are bad | |
135 path = parent | |
136 | |
137 if os.name == 'posix': | |
138 return os.access(path, os.W_OK) | |
139 | |
140 return _test_writable_dir_win(path) | |
141 | |
142 | |
143 def _test_writable_dir_win(path): | |
144 # type: (str) -> bool | |
145 # os.access doesn't work on Windows: http://bugs.python.org/issue2528 | |
146 # and we can't use tempfile: http://bugs.python.org/issue22107 | |
147 basename = 'accesstest_deleteme_fishfingers_custard_' | |
148 alphabet = 'abcdefghijklmnopqrstuvwxyz0123456789' | |
149 for i in range(10): | |
150 name = basename + ''.join(random.choice(alphabet) for _ in range(6)) | |
151 file = os.path.join(path, name) | |
152 try: | |
153 fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL) | |
154 except OSError as e: | |
155 if e.errno == errno.EEXIST: | |
156 continue | |
157 if e.errno == errno.EPERM: | |
158 # This could be because there's a directory with the same name. | |
159 # But it's highly unlikely there's a directory called that, | |
160 # so we'll assume it's because the parent dir is not writable. | |
161 return False | |
162 raise | |
163 else: | |
164 os.close(fd) | |
165 os.unlink(file) | |
166 return True | |
167 | |
168 # This should never be reached | |
169 raise EnvironmentError( | |
170 'Unexpected condition testing for writable directory' | |
171 ) |