comparison lib/python3.8/site-packages/pip/_internal/utils/filesystem.py @ 0:9e54283cc701 draft

"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author guerler
date Mon, 27 Jul 2020 03:47:31 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:9e54283cc701
1 import errno
2 import os
3 import os.path
4 import random
5 import shutil
6 import stat
7 import sys
8 from contextlib import contextmanager
9 from tempfile import NamedTemporaryFile
10
11 # NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is
12 # why we ignore the type on this import.
13 from pip._vendor.retrying import retry # type: ignore
14 from pip._vendor.six import PY2
15
16 from pip._internal.utils.compat import get_path_uid
17 from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast
18
19 if MYPY_CHECK_RUNNING:
20 from typing import BinaryIO, Iterator
21
22 class NamedTemporaryFileResult(BinaryIO):
23 @property
24 def file(self):
25 # type: () -> BinaryIO
26 pass
27
28
29 def check_path_owner(path):
30 # type: (str) -> bool
31 # If we don't have a way to check the effective uid of this process, then
32 # we'll just assume that we own the directory.
33 if sys.platform == "win32" or not hasattr(os, "geteuid"):
34 return True
35
36 assert os.path.isabs(path)
37
38 previous = None
39 while path != previous:
40 if os.path.lexists(path):
41 # Check if path is writable by current user.
42 if os.geteuid() == 0:
43 # Special handling for root user in order to handle properly
44 # cases where users use sudo without -H flag.
45 try:
46 path_uid = get_path_uid(path)
47 except OSError:
48 return False
49 return path_uid == 0
50 else:
51 return os.access(path, os.W_OK)
52 else:
53 previous, path = path, os.path.dirname(path)
54 return False # assume we don't own the path
55
56
57 def copy2_fixed(src, dest):
58 # type: (str, str) -> None
59 """Wrap shutil.copy2() but map errors copying socket files to
60 SpecialFileError as expected.
61
62 See also https://bugs.python.org/issue37700.
63 """
64 try:
65 shutil.copy2(src, dest)
66 except (OSError, IOError):
67 for f in [src, dest]:
68 try:
69 is_socket_file = is_socket(f)
70 except OSError:
71 # An error has already occurred. Another error here is not
72 # a problem and we can ignore it.
73 pass
74 else:
75 if is_socket_file:
76 raise shutil.SpecialFileError("`%s` is a socket" % f)
77
78 raise
79
80
81 def is_socket(path):
82 # type: (str) -> bool
83 return stat.S_ISSOCK(os.lstat(path).st_mode)
84
85
86 @contextmanager
87 def adjacent_tmp_file(path):
88 # type: (str) -> Iterator[NamedTemporaryFileResult]
89 """Given a path to a file, open a temp file next to it securely and ensure
90 it is written to disk after the context reaches its end.
91 """
92 with NamedTemporaryFile(
93 delete=False,
94 dir=os.path.dirname(path),
95 prefix=os.path.basename(path),
96 suffix='.tmp',
97 ) as f:
98 result = cast('NamedTemporaryFileResult', f)
99 try:
100 yield result
101 finally:
102 result.file.flush()
103 os.fsync(result.file.fileno())
104
105
106 _replace_retry = retry(stop_max_delay=1000, wait_fixed=250)
107
108 if PY2:
109 @_replace_retry
110 def replace(src, dest):
111 # type: (str, str) -> None
112 try:
113 os.rename(src, dest)
114 except OSError:
115 os.remove(dest)
116 os.rename(src, dest)
117
118 else:
119 replace = _replace_retry(os.replace)
120
121
122 # test_writable_dir and _test_writable_dir_win are copied from Flit,
123 # with the author's agreement to also place them under pip's license.
124 def test_writable_dir(path):
125 # type: (str) -> bool
126 """Check if a directory is writable.
127
128 Uses os.access() on POSIX, tries creating files on Windows.
129 """
130 # If the directory doesn't exist, find the closest parent that does.
131 while not os.path.isdir(path):
132 parent = os.path.dirname(path)
133 if parent == path:
134 break # Should never get here, but infinite loops are bad
135 path = parent
136
137 if os.name == 'posix':
138 return os.access(path, os.W_OK)
139
140 return _test_writable_dir_win(path)
141
142
143 def _test_writable_dir_win(path):
144 # type: (str) -> bool
145 # os.access doesn't work on Windows: http://bugs.python.org/issue2528
146 # and we can't use tempfile: http://bugs.python.org/issue22107
147 basename = 'accesstest_deleteme_fishfingers_custard_'
148 alphabet = 'abcdefghijklmnopqrstuvwxyz0123456789'
149 for i in range(10):
150 name = basename + ''.join(random.choice(alphabet) for _ in range(6))
151 file = os.path.join(path, name)
152 try:
153 fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL)
154 except OSError as e:
155 if e.errno == errno.EEXIST:
156 continue
157 if e.errno == errno.EPERM:
158 # This could be because there's a directory with the same name.
159 # But it's highly unlikely there's a directory called that,
160 # so we'll assume it's because the parent dir is not writable.
161 return False
162 raise
163 else:
164 os.close(fd)
165 os.unlink(file)
166 return True
167
168 # This should never be reached
169 raise EnvironmentError(
170 'Unexpected condition testing for writable directory'
171 )