diff lib/python3.8/site-packages/pip/_internal/utils/filesystem.py @ 0:9e54283cc701 draft

"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author guerler
date Mon, 27 Jul 2020 03:47:31 -0400 (2020-07-27)
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/python3.8/site-packages/pip/_internal/utils/filesystem.py	Mon Jul 27 03:47:31 2020 -0400
@@ -0,0 +1,171 @@
+import errno
+import os
+import os.path
+import random
+import shutil
+import stat
+import sys
+from contextlib import contextmanager
+from tempfile import NamedTemporaryFile
+
+# NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is
+#       why we ignore the type on this import.
+from pip._vendor.retrying import retry  # type: ignore
+from pip._vendor.six import PY2
+
+from pip._internal.utils.compat import get_path_uid
+from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast
+
+if MYPY_CHECK_RUNNING:
+    from typing import BinaryIO, Iterator
+
+    class NamedTemporaryFileResult(BinaryIO):
+        @property
+        def file(self):
+            # type: () -> BinaryIO
+            pass
+
+
+def check_path_owner(path):
+    # type: (str) -> bool
+    # If we don't have a way to check the effective uid of this process, then
+    # we'll just assume that we own the directory.
+    if sys.platform == "win32" or not hasattr(os, "geteuid"):
+        return True
+
+    assert os.path.isabs(path)
+
+    previous = None
+    while path != previous:
+        if os.path.lexists(path):
+            # Check if path is writable by current user.
+            if os.geteuid() == 0:
+                # Special handling for root user in order to handle properly
+                # cases where users use sudo without -H flag.
+                try:
+                    path_uid = get_path_uid(path)
+                except OSError:
+                    return False
+                return path_uid == 0
+            else:
+                return os.access(path, os.W_OK)
+        else:
+            previous, path = path, os.path.dirname(path)
+    return False  # assume we don't own the path
+
+
+def copy2_fixed(src, dest):
+    # type: (str, str) -> None
+    """Wrap shutil.copy2() but map errors copying socket files to
+    SpecialFileError as expected.
+
+    See also https://bugs.python.org/issue37700.
+    """
+    try:
+        shutil.copy2(src, dest)
+    except (OSError, IOError):
+        for f in [src, dest]:
+            try:
+                is_socket_file = is_socket(f)
+            except OSError:
+                # An error has already occurred. Another error here is not
+                # a problem and we can ignore it.
+                pass
+            else:
+                if is_socket_file:
+                    raise shutil.SpecialFileError("`%s` is a socket" % f)
+
+        raise
+
+
+def is_socket(path):
+    # type: (str) -> bool
+    return stat.S_ISSOCK(os.lstat(path).st_mode)
+
+
+@contextmanager
+def adjacent_tmp_file(path):
+    # type: (str) -> Iterator[NamedTemporaryFileResult]
+    """Given a path to a file, open a temp file next to it securely and ensure
+    it is written to disk after the context reaches its end.
+    """
+    with NamedTemporaryFile(
+        delete=False,
+        dir=os.path.dirname(path),
+        prefix=os.path.basename(path),
+        suffix='.tmp',
+    ) as f:
+        result = cast('NamedTemporaryFileResult', f)
+        try:
+            yield result
+        finally:
+            result.file.flush()
+            os.fsync(result.file.fileno())
+
+
+_replace_retry = retry(stop_max_delay=1000, wait_fixed=250)
+
+if PY2:
+    @_replace_retry
+    def replace(src, dest):
+        # type: (str, str) -> None
+        try:
+            os.rename(src, dest)
+        except OSError:
+            os.remove(dest)
+            os.rename(src, dest)
+
+else:
+    replace = _replace_retry(os.replace)
+
+
+# test_writable_dir and _test_writable_dir_win are copied from Flit,
+# with the author's agreement to also place them under pip's license.
+def test_writable_dir(path):
+    # type: (str) -> bool
+    """Check if a directory is writable.
+
+    Uses os.access() on POSIX, tries creating files on Windows.
+    """
+    # If the directory doesn't exist, find the closest parent that does.
+    while not os.path.isdir(path):
+        parent = os.path.dirname(path)
+        if parent == path:
+            break  # Should never get here, but infinite loops are bad
+        path = parent
+
+    if os.name == 'posix':
+        return os.access(path, os.W_OK)
+
+    return _test_writable_dir_win(path)
+
+
+def _test_writable_dir_win(path):
+    # type: (str) -> bool
+    # os.access doesn't work on Windows: http://bugs.python.org/issue2528
+    # and we can't use tempfile: http://bugs.python.org/issue22107
+    basename = 'accesstest_deleteme_fishfingers_custard_'
+    alphabet = 'abcdefghijklmnopqrstuvwxyz0123456789'
+    for i in range(10):
+        name = basename + ''.join(random.choice(alphabet) for _ in range(6))
+        file = os.path.join(path, name)
+        try:
+            fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL)
+        except OSError as e:
+            if e.errno == errno.EEXIST:
+                continue
+            if e.errno == errno.EPERM:
+                # This could be because there's a directory with the same name.
+                # But it's highly unlikely there's a directory called that,
+                # so we'll assume it's because the parent dir is not writable.
+                return False
+            raise
+        else:
+            os.close(fd)
+            os.unlink(file)
+            return True
+
+    # This should never be reached
+    raise EnvironmentError(
+        'Unexpected condition testing for writable directory'
+    )