diff env/lib/python3.7/site-packages/boltons/fileutils.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/boltons/fileutils.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,660 @@
+# -*- coding: utf-8 -*-
+"""Virtually every Python programmer has used Python for wrangling
+disk contents, and ``fileutils`` collects solutions to some of the
+most commonly-found gaps in the standard library.
+"""
+
+from __future__ import print_function
+
+import os
+import re
+import sys
+import stat
+import errno
+import fnmatch
+from shutil import copy2, copystat, Error
+
+
+__all__ = ['mkdir_p', 'atomic_save', 'AtomicSaver', 'FilePerms',
+           'iter_find_files', 'copytree']
+
+
+FULL_PERMS = 511  # 0777 that both Python 2 and 3 can digest
+RW_PERMS = 438
+_SINGLE_FULL_PERM = 7  # or 07 in Python 2
+try:
+    basestring
+except NameError:
+    unicode = str  # Python 3 compat
+    basestring = (str, bytes)
+
+
+def mkdir_p(path):
+    """Creates a directory and any parent directories that may need to
+    be created along the way, without raising errors for any existing
+    directories. This function mimics the behavior of the ``mkdir -p``
+    command available in Linux/BSD environments, but also works on
+    Windows.
+    """
+    try:
+        os.makedirs(path)
+    except OSError as exc:
+        if exc.errno == errno.EEXIST and os.path.isdir(path):
+            return
+        raise
+    return
+
+
+class FilePerms(object):
+    """The :class:`FilePerms` type is used to represent standard POSIX
+    filesystem permissions:
+
+      * Read
+      * Write
+      * Execute
+
+    Across three classes of user:
+
+      * Owning (u)ser
+      * Owner's (g)roup
+      * Any (o)ther user
+
+    This class assists with computing new permissions, as well as
+    working with numeric octal ``777``-style and ``rwx``-style
+    permissions. Currently it only considers the bottom 9 permission
+    bits; it does not support sticky bits or more advanced permission
+    systems.
+
+    Args:
+        user (str): A string in the 'rwx' format, omitting characters
+            for which owning user's permissions are not provided.
+        group (str): A string in the 'rwx' format, omitting characters
+            for which owning group permissions are not provided.
+        other (str): A string in the 'rwx' format, omitting characters
+            for which owning other/world permissions are not provided.
+
+    There are many ways to use :class:`FilePerms`:
+
+    >>> FilePerms(user='rwx', group='xrw', other='wxr')  # note character order
+    FilePerms(user='rwx', group='rwx', other='rwx')
+    >>> int(FilePerms('r', 'r', ''))
+    288
+    >>> oct(288)[-3:]  # XXX Py3k
+    '440'
+
+    See also the :meth:`FilePerms.from_int` and
+    :meth:`FilePerms.from_path` classmethods for useful alternative
+    ways to construct :class:`FilePerms` objects.
+    """
+    # TODO: consider more than the lower 9 bits
+    class _FilePermProperty(object):
+        _perm_chars = 'rwx'
+        _perm_set = frozenset('rwx')
+        _perm_val = {'r': 4, 'w': 2, 'x': 1}  # for sorting
+
+        def __init__(self, attribute, offset):
+            self.attribute = attribute
+            self.offset = offset
+
+        def __get__(self, fp_obj, type_=None):
+            if fp_obj is None:
+                return self
+            return getattr(fp_obj, self.attribute)
+
+        def __set__(self, fp_obj, value):
+            cur = getattr(fp_obj, self.attribute)
+            if cur == value:
+                return
+            try:
+                invalid_chars = set(str(value)) - self._perm_set
+            except TypeError:
+                raise TypeError('expected string, not %r' % value)
+            if invalid_chars:
+                raise ValueError('got invalid chars %r in permission'
+                                 ' specification %r, expected empty string'
+                                 ' or one or more of %r'
+                                 % (invalid_chars, value, self._perm_chars))
+
+            sort_key = lambda c: self._perm_val[c]
+            new_value = ''.join(sorted(set(value),
+                                       key=sort_key, reverse=True))
+            setattr(fp_obj, self.attribute, new_value)
+            self._update_integer(fp_obj, new_value)
+
+        def _update_integer(self, fp_obj, value):
+            mode = 0
+            key = 'xwr'
+            for symbol in value:
+                bit = 2 ** key.index(symbol)
+                mode |= (bit << (self.offset * 3))
+            fp_obj._integer |= mode
+
+    def __init__(self, user='', group='', other=''):
+        self._user, self._group, self._other = '', '', ''
+        self._integer = 0
+        self.user = user
+        self.group = group
+        self.other = other
+
+    @classmethod
+    def from_int(cls, i):
+        """Create a :class:`FilePerms` object from an integer.
+
+        >>> FilePerms.from_int(0o644)  # note the leading zero-oh for octal
+        FilePerms(user='rw', group='r', other='r')
+        """
+        i &= FULL_PERMS
+        key = ('', 'x', 'w', 'xw', 'r', 'rx', 'rw', 'rwx')
+        parts = []
+        while i:
+            parts.append(key[i & _SINGLE_FULL_PERM])
+            i >>= 3
+        parts.reverse()
+        return cls(*parts)
+
+    @classmethod
+    def from_path(cls, path):
+        """Make a new :class:`FilePerms` object based on the permissions
+        assigned to the file or directory at *path*.
+
+        Args:
+            path (str): Filesystem path of the target file.
+
+        Here's an example that holds true on most systems:
+
+        >>> import tempfile
+        >>> 'r' in FilePerms.from_path(tempfile.gettempdir()).user
+        True
+        """
+        stat_res = os.stat(path)
+        return cls.from_int(stat.S_IMODE(stat_res.st_mode))
+
+    def __int__(self):
+        return self._integer
+
+    # Sphinx tip: attribute docstrings come after the attribute
+    user = _FilePermProperty('_user', 2)
+    "Stores the ``rwx``-formatted *user* permission."
+    group = _FilePermProperty('_group', 1)
+    "Stores the ``rwx``-formatted *group* permission."
+    other = _FilePermProperty('_other', 0)
+    "Stores the ``rwx``-formatted *other* permission."
+
+    def __repr__(self):
+        cn = self.__class__.__name__
+        return ('%s(user=%r, group=%r, other=%r)'
+                % (cn, self.user, self.group, self.other))
+
+####
+
+
+_TEXT_OPENFLAGS = os.O_RDWR | os.O_CREAT | os.O_EXCL
+if hasattr(os, 'O_NOINHERIT'):
+    _TEXT_OPENFLAGS |= os.O_NOINHERIT
+if hasattr(os, 'O_NOFOLLOW'):
+    _TEXT_OPENFLAGS |= os.O_NOFOLLOW
+_BIN_OPENFLAGS = _TEXT_OPENFLAGS
+if hasattr(os, 'O_BINARY'):
+    _BIN_OPENFLAGS |= os.O_BINARY
+
+
+try:
+    import fcntl as fcntl
+except ImportError:
+    def set_cloexec(fd):
+        "Dummy set_cloexec for platforms without fcntl support"
+        pass
+else:
+    def set_cloexec(fd):
+        """Does a best-effort :func:`fcntl.fcntl` call to set a fd to be
+        automatically closed by any future child processes.
+
+        Implementation from the :mod:`tempfile` module.
+        """
+        try:
+            flags = fcntl.fcntl(fd, fcntl.F_GETFD, 0)
+        except IOError:
+            pass
+        else:
+            # flags read successfully, modify
+            flags |= fcntl.FD_CLOEXEC
+            fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+        return
+
+
+def atomic_save(dest_path, **kwargs):
+    """A convenient interface to the :class:`AtomicSaver` type. See the
+    :class:`AtomicSaver` documentation for details.
+    """
+    return AtomicSaver(dest_path, **kwargs)
+
+
+def path_to_unicode(path):
+    if isinstance(path, unicode):
+        return path
+    encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
+    return path.decode(encoding)
+
+
+if os.name == 'nt':
+    import ctypes
+    from ctypes import c_wchar_p
+    from ctypes.wintypes import DWORD, LPVOID
+
+    _ReplaceFile = ctypes.windll.kernel32.ReplaceFile
+    _ReplaceFile.argtypes = [c_wchar_p, c_wchar_p, c_wchar_p,
+                             DWORD, LPVOID, LPVOID]
+
+    def replace(src, dst):
+        # argument names match stdlib docs, docstring below
+        try:
+            # ReplaceFile fails if the dest file does not exist, so
+            # first try to rename it into position
+            os.rename(src, dst)
+            return
+        except WindowsError as we:
+            if we.errno == errno.EEXIST:
+                pass  # continue with the ReplaceFile logic below
+            else:
+                raise
+
+        src = path_to_unicode(src)
+        dst = path_to_unicode(dst)
+        res = _ReplaceFile(c_wchar_p(dst), c_wchar_p(src),
+                           None, 0, None, None)
+        if not res:
+            raise OSError('failed to replace %r with %r' % (dst, src))
+        return
+
+    def atomic_rename(src, dst, overwrite=False):
+        "Rename *src* to *dst*, replacing *dst* if *overwrite is True"
+        if overwrite:
+            replace(src, dst)
+        else:
+            os.rename(src, dst)
+        return
+else:
+    # wrapper func for cross compat + docs
+    def replace(src, dst):
+        # os.replace does the same thing on unix
+        return os.rename(src, dst)
+
+    def atomic_rename(src, dst, overwrite=False):
+        "Rename *src* to *dst*, replacing *dst* if *overwrite is True"
+        if overwrite:
+            os.rename(src, dst)
+        else:
+            os.link(src, dst)
+            os.unlink(src)
+        return
+
+
+_atomic_rename = atomic_rename  # backwards compat
+
+replace.__doc__ = """Similar to :func:`os.replace` in Python 3.3+,
+this function will atomically create or replace the file at path
+*dst* with the file at path *src*.
+
+On Windows, this function uses the ReplaceFile API for maximum
+possible atomicity on a range of filesystems.
+"""
+
+
+class AtomicSaver(object):
+    """``AtomicSaver`` is a configurable `context manager`_ that provides
+    a writable :class:`file` which will be moved into place as long as
+    no exceptions are raised within the context manager's block. These
+    "part files" are created in the same directory as the destination
+    path to ensure atomic move operations (i.e., no cross-filesystem
+    moves occur).
+
+    Args:
+        dest_path (str): The path where the completed file will be
+            written.
+        overwrite (bool): Whether to overwrite the destination file if
+            it exists at completion time. Defaults to ``True``.
+        file_perms (int): Integer representation of file permissions
+            for the newly-created file. Defaults are, when the
+            destination path already exists, to copy the permissions
+            from the previous file, or if the file did not exist, to
+            respect the user's configured `umask`_, usually resulting
+            in octal 0644 or 0664.
+        part_file (str): Name of the temporary *part_file*. Defaults
+            to *dest_path* + ``.part``. Note that this argument is
+            just the filename, and not the full path of the part
+            file. To guarantee atomic saves, part files are always
+            created in the same directory as the destination path.
+        overwrite_part (bool): Whether to overwrite the *part_file*,
+            should it exist at setup time. Defaults to ``False``,
+            which results in an :exc:`OSError` being raised on
+            pre-existing part files. Be careful of setting this to
+            ``True`` in situations when multiple threads or processes
+            could be writing to the same part file.
+        rm_part_on_exc (bool): Remove *part_file* on exception cases.
+            Defaults to ``True``, but ``False`` can be useful for
+            recovery in some cases. Note that resumption is not
+            automatic and by default an :exc:`OSError` is raised if
+            the *part_file* exists.
+
+    Practically, the AtomicSaver serves a few purposes:
+
+      * Avoiding overwriting an existing, valid file with a partially
+        written one.
+      * Providing a reasonable guarantee that a part file only has one
+        writer at a time.
+      * Optional recovery of partial data in failure cases.
+
+    .. _context manager: https://docs.python.org/2/reference/compound_stmts.html#with
+    .. _umask: https://en.wikipedia.org/wiki/Umask
+
+    """
+    _default_file_perms = RW_PERMS
+
+    # TODO: option to abort if target file modify date has changed since start?
+    def __init__(self, dest_path, **kwargs):
+        self.dest_path = dest_path
+        self.overwrite = kwargs.pop('overwrite', True)
+        self.file_perms = kwargs.pop('file_perms', None)
+        self.overwrite_part = kwargs.pop('overwrite_part', False)
+        self.part_filename = kwargs.pop('part_file', None)
+        self.rm_part_on_exc = kwargs.pop('rm_part_on_exc', True)
+        self.text_mode = kwargs.pop('text_mode', False)  # for windows
+        self.buffering = kwargs.pop('buffering', -1)
+        if kwargs:
+            raise TypeError('unexpected kwargs: %r' % (kwargs.keys(),))
+
+        self.dest_path = os.path.abspath(self.dest_path)
+        self.dest_dir = os.path.dirname(self.dest_path)
+        if not self.part_filename:
+            self.part_path = dest_path + '.part'
+        else:
+            self.part_path = os.path.join(self.dest_dir, self.part_filename)
+        self.mode = 'w+' if self.text_mode else 'w+b'
+        self.open_flags = _TEXT_OPENFLAGS if self.text_mode else _BIN_OPENFLAGS
+
+        self.part_file = None
+
+    def _open_part_file(self):
+        do_chmod = True
+        file_perms = self.file_perms
+        if file_perms is None:
+            try:
+                # try to copy from file being replaced
+                stat_res = os.stat(self.dest_path)
+                file_perms = stat.S_IMODE(stat_res.st_mode)
+            except (OSError, IOError):
+                # default if no destination file exists
+                file_perms = self._default_file_perms
+                do_chmod = False  # respect the umask
+
+        fd = os.open(self.part_path, self.open_flags, file_perms)
+        set_cloexec(fd)
+        self.part_file = os.fdopen(fd, self.mode, self.buffering)
+
+        # if default perms are overridden by the user or previous dest_path
+        # chmod away the effects of the umask
+        if do_chmod:
+            try:
+                os.chmod(self.part_path, file_perms)
+            except (OSError, IOError):
+                self.part_file.close()
+                raise
+        return
+
+    def setup(self):
+        """Called on context manager entry (the :keyword:`with` statement),
+        the ``setup()`` method creates the temporary file in the same
+        directory as the destination file.
+
+        ``setup()`` tests for a writable directory with rename permissions
+        early, as the part file may not be written to immediately (not
+        using :func:`os.access` because of the potential issues of
+        effective vs. real privileges).
+
+        If the caller is not using the :class:`AtomicSaver` as a
+        context manager, this method should be called explicitly
+        before writing.
+        """
+        if os.path.lexists(self.dest_path):
+            if not self.overwrite:
+                raise OSError(errno.EEXIST,
+                              'Overwrite disabled and file already exists',
+                              self.dest_path)
+        if self.overwrite_part and os.path.lexists(self.part_path):
+            os.unlink(self.part_path)
+        self._open_part_file()
+        return
+
+    def __enter__(self):
+        self.setup()
+        return self.part_file
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.part_file.close()
+        if exc_type:
+            if self.rm_part_on_exc:
+                try:
+                    os.unlink(self.part_path)
+                except Exception:
+                    pass  # avoid masking original error
+            return
+        try:
+            atomic_rename(self.part_path, self.dest_path,
+                          overwrite=self.overwrite)
+        except OSError:
+            if self.rm_part_on_exc:
+                try:
+                    os.unlink(self.part_path)
+                except Exception:
+                    pass  # avoid masking original error
+            raise  # could not save destination file
+        return
+
+
+def iter_find_files(directory, patterns, ignored=None, include_dirs=False):
+    """Returns a generator that yields file paths under a *directory*,
+    matching *patterns* using `glob`_ syntax (e.g., ``*.txt``). Also
+    supports *ignored* patterns.
+
+    Args:
+        directory (str): Path that serves as the root of the
+            search. Yielded paths will include this as a prefix.
+        patterns (str or list): A single pattern or list of
+            glob-formatted patterns to find under *directory*.
+        ignored (str or list): A single pattern or list of
+            glob-formatted patterns to ignore.
+        include_dirs (bool): Whether to include directories that match
+           patterns, as well. Defaults to ``False``.
+
+    For example, finding Python files in the current directory:
+
+    >>> _CUR_DIR = os.path.dirname(os.path.abspath(__file__))
+    >>> filenames = sorted(iter_find_files(_CUR_DIR, '*.py'))
+    >>> os.path.basename(filenames[-1])
+    'urlutils.py'
+
+    Or, Python files while ignoring emacs lockfiles:
+
+    >>> filenames = iter_find_files(_CUR_DIR, '*.py', ignored='.#*')
+
+    .. _glob: https://en.wikipedia.org/wiki/Glob_%28programming%29
+
+    """
+    if isinstance(patterns, basestring):
+        patterns = [patterns]
+    pats_re = re.compile('|'.join([fnmatch.translate(p) for p in patterns]))
+
+    if not ignored:
+        ignored = []
+    elif isinstance(ignored, basestring):
+        ignored = [ignored]
+    ign_re = re.compile('|'.join([fnmatch.translate(p) for p in ignored]))
+    for root, dirs, files in os.walk(directory):
+        if include_dirs:
+            for basename in dirs:
+                if pats_re.match(basename):
+                    if ignored and ign_re.match(basename):
+                        continue
+                    filename = os.path.join(root, basename)
+                    yield filename
+
+        for basename in files:
+            if pats_re.match(basename):
+                if ignored and ign_re.match(basename):
+                    continue
+                filename = os.path.join(root, basename)
+                yield filename
+    return
+
+
+def copy_tree(src, dst, symlinks=False, ignore=None):
+    """The ``copy_tree`` function is an exact copy of the built-in
+    :func:`shutil.copytree`, with one key difference: it will not
+    raise an exception if part of the tree already exists. It achieves
+    this by using :func:`mkdir_p`.
+
+    Args:
+        src (str): Path of the source directory to copy.
+        dst (str): Destination path. Existing directories accepted.
+        symlinks (bool): If ``True``, copy symlinks rather than their
+            contents.
+        ignore (callable): A callable that takes a path and directory
+            listing, returning the files within the listing to be ignored.
+
+    For more details, check out :func:`shutil.copytree` and
+    :func:`shutil.copy2`.
+
+    """
+    names = os.listdir(src)
+    if ignore is not None:
+        ignored_names = ignore(src, names)
+    else:
+        ignored_names = set()
+
+    mkdir_p(dst)
+    errors = []
+    for name in names:
+        if name in ignored_names:
+            continue
+        srcname = os.path.join(src, name)
+        dstname = os.path.join(dst, name)
+        try:
+            if symlinks and os.path.islink(srcname):
+                linkto = os.readlink(srcname)
+                os.symlink(linkto, dstname)
+            elif os.path.isdir(srcname):
+                copytree(srcname, dstname, symlinks, ignore)
+            else:
+                # Will raise a SpecialFileError for unsupported file types
+                copy2(srcname, dstname)
+        # catch the Error from the recursive copytree so that we can
+        # continue with other files
+        except Error as e:
+            errors.extend(e.args[0])
+        except EnvironmentError as why:
+            errors.append((srcname, dstname, str(why)))
+    try:
+        copystat(src, dst)
+    except OSError as why:
+        if WindowsError is not None and isinstance(why, WindowsError):
+            # Copying file access times may fail on Windows
+            pass
+        else:
+            errors.append((src, dst, str(why)))
+    if errors:
+        raise Error(errors)
+
+
+copytree = copy_tree  # alias for drop-in replacement of shutil
+
+
+try:
+    file
+except NameError:
+    file = object
+
+
+# like open(os.devnull) but with even fewer side effects
+class DummyFile(file):
+    # TODO: raise ValueErrors on closed for all methods?
+    # TODO: enforce read/write
+    def __init__(self, path, mode='r', buffering=None):
+        self.name = path
+        self.mode = mode
+        self.closed = False
+        self.errors = None
+        self.isatty = False
+        self.encoding = None
+        self.newlines = None
+        self.softspace = 0
+
+    def close(self):
+        self.closed = True
+
+    def fileno(self):
+        return -1
+
+    def flush(self):
+        if self.closed:
+            raise ValueError('I/O operation on a closed file')
+        return
+
+    def next(self):
+        raise StopIteration()
+
+    def read(self, size=0):
+        if self.closed:
+            raise ValueError('I/O operation on a closed file')
+        return ''
+
+    def readline(self, size=0):
+        if self.closed:
+            raise ValueError('I/O operation on a closed file')
+        return ''
+
+    def readlines(self, size=0):
+        if self.closed:
+            raise ValueError('I/O operation on a closed file')
+        return []
+
+    def seek(self):
+        if self.closed:
+            raise ValueError('I/O operation on a closed file')
+        return
+
+    def tell(self):
+        if self.closed:
+            raise ValueError('I/O operation on a closed file')
+        return 0
+
+    def truncate(self):
+        if self.closed:
+            raise ValueError('I/O operation on a closed file')
+        return
+
+    def write(self, string):
+        if self.closed:
+            raise ValueError('I/O operation on a closed file')
+        return
+
+    def writelines(self, list_of_strings):
+        if self.closed:
+            raise ValueError('I/O operation on a closed file')
+        return
+
+    def __next__(self):
+        raise StopIteration()
+
+    def __enter__(self):
+        if self.closed:
+            raise ValueError('I/O operation on a closed file')
+        return
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        return
+
+
+if __name__ == '__main__':
+    with atomic_save('/tmp/final.txt') as f:
+        f.write('rofl')
+        f.write('\n')