Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/boltons/fileutils.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/boltons/fileutils.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,660 @@ +# -*- coding: utf-8 -*- +"""Virtually every Python programmer has used Python for wrangling +disk contents, and ``fileutils`` collects solutions to some of the +most commonly-found gaps in the standard library. +""" + +from __future__ import print_function + +import os +import re +import sys +import stat +import errno +import fnmatch +from shutil import copy2, copystat, Error + + +__all__ = ['mkdir_p', 'atomic_save', 'AtomicSaver', 'FilePerms', + 'iter_find_files', 'copytree'] + + +FULL_PERMS = 511 # 0777 that both Python 2 and 3 can digest +RW_PERMS = 438 +_SINGLE_FULL_PERM = 7 # or 07 in Python 2 +try: + basestring +except NameError: + unicode = str # Python 3 compat + basestring = (str, bytes) + + +def mkdir_p(path): + """Creates a directory and any parent directories that may need to + be created along the way, without raising errors for any existing + directories. This function mimics the behavior of the ``mkdir -p`` + command available in Linux/BSD environments, but also works on + Windows. + """ + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + return + raise + return + + +class FilePerms(object): + """The :class:`FilePerms` type is used to represent standard POSIX + filesystem permissions: + + * Read + * Write + * Execute + + Across three classes of user: + + * Owning (u)ser + * Owner's (g)roup + * Any (o)ther user + + This class assists with computing new permissions, as well as + working with numeric octal ``777``-style and ``rwx``-style + permissions. Currently it only considers the bottom 9 permission + bits; it does not support sticky bits or more advanced permission + systems. + + Args: + user (str): A string in the 'rwx' format, omitting characters + for which owning user's permissions are not provided. + group (str): A string in the 'rwx' format, omitting characters + for which owning group permissions are not provided. + other (str): A string in the 'rwx' format, omitting characters + for which owning other/world permissions are not provided. + + There are many ways to use :class:`FilePerms`: + + >>> FilePerms(user='rwx', group='xrw', other='wxr') # note character order + FilePerms(user='rwx', group='rwx', other='rwx') + >>> int(FilePerms('r', 'r', '')) + 288 + >>> oct(288)[-3:] # XXX Py3k + '440' + + See also the :meth:`FilePerms.from_int` and + :meth:`FilePerms.from_path` classmethods for useful alternative + ways to construct :class:`FilePerms` objects. + """ + # TODO: consider more than the lower 9 bits + class _FilePermProperty(object): + _perm_chars = 'rwx' + _perm_set = frozenset('rwx') + _perm_val = {'r': 4, 'w': 2, 'x': 1} # for sorting + + def __init__(self, attribute, offset): + self.attribute = attribute + self.offset = offset + + def __get__(self, fp_obj, type_=None): + if fp_obj is None: + return self + return getattr(fp_obj, self.attribute) + + def __set__(self, fp_obj, value): + cur = getattr(fp_obj, self.attribute) + if cur == value: + return + try: + invalid_chars = set(str(value)) - self._perm_set + except TypeError: + raise TypeError('expected string, not %r' % value) + if invalid_chars: + raise ValueError('got invalid chars %r in permission' + ' specification %r, expected empty string' + ' or one or more of %r' + % (invalid_chars, value, self._perm_chars)) + + sort_key = lambda c: self._perm_val[c] + new_value = ''.join(sorted(set(value), + key=sort_key, reverse=True)) + setattr(fp_obj, self.attribute, new_value) + self._update_integer(fp_obj, new_value) + + def _update_integer(self, fp_obj, value): + mode = 0 + key = 'xwr' + for symbol in value: + bit = 2 ** key.index(symbol) + mode |= (bit << (self.offset * 3)) + fp_obj._integer |= mode + + def __init__(self, user='', group='', other=''): + self._user, self._group, self._other = '', '', '' + self._integer = 0 + self.user = user + self.group = group + self.other = other + + @classmethod + def from_int(cls, i): + """Create a :class:`FilePerms` object from an integer. + + >>> FilePerms.from_int(0o644) # note the leading zero-oh for octal + FilePerms(user='rw', group='r', other='r') + """ + i &= FULL_PERMS + key = ('', 'x', 'w', 'xw', 'r', 'rx', 'rw', 'rwx') + parts = [] + while i: + parts.append(key[i & _SINGLE_FULL_PERM]) + i >>= 3 + parts.reverse() + return cls(*parts) + + @classmethod + def from_path(cls, path): + """Make a new :class:`FilePerms` object based on the permissions + assigned to the file or directory at *path*. + + Args: + path (str): Filesystem path of the target file. + + Here's an example that holds true on most systems: + + >>> import tempfile + >>> 'r' in FilePerms.from_path(tempfile.gettempdir()).user + True + """ + stat_res = os.stat(path) + return cls.from_int(stat.S_IMODE(stat_res.st_mode)) + + def __int__(self): + return self._integer + + # Sphinx tip: attribute docstrings come after the attribute + user = _FilePermProperty('_user', 2) + "Stores the ``rwx``-formatted *user* permission." + group = _FilePermProperty('_group', 1) + "Stores the ``rwx``-formatted *group* permission." + other = _FilePermProperty('_other', 0) + "Stores the ``rwx``-formatted *other* permission." + + def __repr__(self): + cn = self.__class__.__name__ + return ('%s(user=%r, group=%r, other=%r)' + % (cn, self.user, self.group, self.other)) + +#### + + +_TEXT_OPENFLAGS = os.O_RDWR | os.O_CREAT | os.O_EXCL +if hasattr(os, 'O_NOINHERIT'): + _TEXT_OPENFLAGS |= os.O_NOINHERIT +if hasattr(os, 'O_NOFOLLOW'): + _TEXT_OPENFLAGS |= os.O_NOFOLLOW +_BIN_OPENFLAGS = _TEXT_OPENFLAGS +if hasattr(os, 'O_BINARY'): + _BIN_OPENFLAGS |= os.O_BINARY + + +try: + import fcntl as fcntl +except ImportError: + def set_cloexec(fd): + "Dummy set_cloexec for platforms without fcntl support" + pass +else: + def set_cloexec(fd): + """Does a best-effort :func:`fcntl.fcntl` call to set a fd to be + automatically closed by any future child processes. + + Implementation from the :mod:`tempfile` module. + """ + try: + flags = fcntl.fcntl(fd, fcntl.F_GETFD, 0) + except IOError: + pass + else: + # flags read successfully, modify + flags |= fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFD, flags) + return + + +def atomic_save(dest_path, **kwargs): + """A convenient interface to the :class:`AtomicSaver` type. See the + :class:`AtomicSaver` documentation for details. + """ + return AtomicSaver(dest_path, **kwargs) + + +def path_to_unicode(path): + if isinstance(path, unicode): + return path + encoding = sys.getfilesystemencoding() or sys.getdefaultencoding() + return path.decode(encoding) + + +if os.name == 'nt': + import ctypes + from ctypes import c_wchar_p + from ctypes.wintypes import DWORD, LPVOID + + _ReplaceFile = ctypes.windll.kernel32.ReplaceFile + _ReplaceFile.argtypes = [c_wchar_p, c_wchar_p, c_wchar_p, + DWORD, LPVOID, LPVOID] + + def replace(src, dst): + # argument names match stdlib docs, docstring below + try: + # ReplaceFile fails if the dest file does not exist, so + # first try to rename it into position + os.rename(src, dst) + return + except WindowsError as we: + if we.errno == errno.EEXIST: + pass # continue with the ReplaceFile logic below + else: + raise + + src = path_to_unicode(src) + dst = path_to_unicode(dst) + res = _ReplaceFile(c_wchar_p(dst), c_wchar_p(src), + None, 0, None, None) + if not res: + raise OSError('failed to replace %r with %r' % (dst, src)) + return + + def atomic_rename(src, dst, overwrite=False): + "Rename *src* to *dst*, replacing *dst* if *overwrite is True" + if overwrite: + replace(src, dst) + else: + os.rename(src, dst) + return +else: + # wrapper func for cross compat + docs + def replace(src, dst): + # os.replace does the same thing on unix + return os.rename(src, dst) + + def atomic_rename(src, dst, overwrite=False): + "Rename *src* to *dst*, replacing *dst* if *overwrite is True" + if overwrite: + os.rename(src, dst) + else: + os.link(src, dst) + os.unlink(src) + return + + +_atomic_rename = atomic_rename # backwards compat + +replace.__doc__ = """Similar to :func:`os.replace` in Python 3.3+, +this function will atomically create or replace the file at path +*dst* with the file at path *src*. + +On Windows, this function uses the ReplaceFile API for maximum +possible atomicity on a range of filesystems. +""" + + +class AtomicSaver(object): + """``AtomicSaver`` is a configurable `context manager`_ that provides + a writable :class:`file` which will be moved into place as long as + no exceptions are raised within the context manager's block. These + "part files" are created in the same directory as the destination + path to ensure atomic move operations (i.e., no cross-filesystem + moves occur). + + Args: + dest_path (str): The path where the completed file will be + written. + overwrite (bool): Whether to overwrite the destination file if + it exists at completion time. Defaults to ``True``. + file_perms (int): Integer representation of file permissions + for the newly-created file. Defaults are, when the + destination path already exists, to copy the permissions + from the previous file, or if the file did not exist, to + respect the user's configured `umask`_, usually resulting + in octal 0644 or 0664. + part_file (str): Name of the temporary *part_file*. Defaults + to *dest_path* + ``.part``. Note that this argument is + just the filename, and not the full path of the part + file. To guarantee atomic saves, part files are always + created in the same directory as the destination path. + overwrite_part (bool): Whether to overwrite the *part_file*, + should it exist at setup time. Defaults to ``False``, + which results in an :exc:`OSError` being raised on + pre-existing part files. Be careful of setting this to + ``True`` in situations when multiple threads or processes + could be writing to the same part file. + rm_part_on_exc (bool): Remove *part_file* on exception cases. + Defaults to ``True``, but ``False`` can be useful for + recovery in some cases. Note that resumption is not + automatic and by default an :exc:`OSError` is raised if + the *part_file* exists. + + Practically, the AtomicSaver serves a few purposes: + + * Avoiding overwriting an existing, valid file with a partially + written one. + * Providing a reasonable guarantee that a part file only has one + writer at a time. + * Optional recovery of partial data in failure cases. + + .. _context manager: https://docs.python.org/2/reference/compound_stmts.html#with + .. _umask: https://en.wikipedia.org/wiki/Umask + + """ + _default_file_perms = RW_PERMS + + # TODO: option to abort if target file modify date has changed since start? + def __init__(self, dest_path, **kwargs): + self.dest_path = dest_path + self.overwrite = kwargs.pop('overwrite', True) + self.file_perms = kwargs.pop('file_perms', None) + self.overwrite_part = kwargs.pop('overwrite_part', False) + self.part_filename = kwargs.pop('part_file', None) + self.rm_part_on_exc = kwargs.pop('rm_part_on_exc', True) + self.text_mode = kwargs.pop('text_mode', False) # for windows + self.buffering = kwargs.pop('buffering', -1) + if kwargs: + raise TypeError('unexpected kwargs: %r' % (kwargs.keys(),)) + + self.dest_path = os.path.abspath(self.dest_path) + self.dest_dir = os.path.dirname(self.dest_path) + if not self.part_filename: + self.part_path = dest_path + '.part' + else: + self.part_path = os.path.join(self.dest_dir, self.part_filename) + self.mode = 'w+' if self.text_mode else 'w+b' + self.open_flags = _TEXT_OPENFLAGS if self.text_mode else _BIN_OPENFLAGS + + self.part_file = None + + def _open_part_file(self): + do_chmod = True + file_perms = self.file_perms + if file_perms is None: + try: + # try to copy from file being replaced + stat_res = os.stat(self.dest_path) + file_perms = stat.S_IMODE(stat_res.st_mode) + except (OSError, IOError): + # default if no destination file exists + file_perms = self._default_file_perms + do_chmod = False # respect the umask + + fd = os.open(self.part_path, self.open_flags, file_perms) + set_cloexec(fd) + self.part_file = os.fdopen(fd, self.mode, self.buffering) + + # if default perms are overridden by the user or previous dest_path + # chmod away the effects of the umask + if do_chmod: + try: + os.chmod(self.part_path, file_perms) + except (OSError, IOError): + self.part_file.close() + raise + return + + def setup(self): + """Called on context manager entry (the :keyword:`with` statement), + the ``setup()`` method creates the temporary file in the same + directory as the destination file. + + ``setup()`` tests for a writable directory with rename permissions + early, as the part file may not be written to immediately (not + using :func:`os.access` because of the potential issues of + effective vs. real privileges). + + If the caller is not using the :class:`AtomicSaver` as a + context manager, this method should be called explicitly + before writing. + """ + if os.path.lexists(self.dest_path): + if not self.overwrite: + raise OSError(errno.EEXIST, + 'Overwrite disabled and file already exists', + self.dest_path) + if self.overwrite_part and os.path.lexists(self.part_path): + os.unlink(self.part_path) + self._open_part_file() + return + + def __enter__(self): + self.setup() + return self.part_file + + def __exit__(self, exc_type, exc_val, exc_tb): + self.part_file.close() + if exc_type: + if self.rm_part_on_exc: + try: + os.unlink(self.part_path) + except Exception: + pass # avoid masking original error + return + try: + atomic_rename(self.part_path, self.dest_path, + overwrite=self.overwrite) + except OSError: + if self.rm_part_on_exc: + try: + os.unlink(self.part_path) + except Exception: + pass # avoid masking original error + raise # could not save destination file + return + + +def iter_find_files(directory, patterns, ignored=None, include_dirs=False): + """Returns a generator that yields file paths under a *directory*, + matching *patterns* using `glob`_ syntax (e.g., ``*.txt``). Also + supports *ignored* patterns. + + Args: + directory (str): Path that serves as the root of the + search. Yielded paths will include this as a prefix. + patterns (str or list): A single pattern or list of + glob-formatted patterns to find under *directory*. + ignored (str or list): A single pattern or list of + glob-formatted patterns to ignore. + include_dirs (bool): Whether to include directories that match + patterns, as well. Defaults to ``False``. + + For example, finding Python files in the current directory: + + >>> _CUR_DIR = os.path.dirname(os.path.abspath(__file__)) + >>> filenames = sorted(iter_find_files(_CUR_DIR, '*.py')) + >>> os.path.basename(filenames[-1]) + 'urlutils.py' + + Or, Python files while ignoring emacs lockfiles: + + >>> filenames = iter_find_files(_CUR_DIR, '*.py', ignored='.#*') + + .. _glob: https://en.wikipedia.org/wiki/Glob_%28programming%29 + + """ + if isinstance(patterns, basestring): + patterns = [patterns] + pats_re = re.compile('|'.join([fnmatch.translate(p) for p in patterns])) + + if not ignored: + ignored = [] + elif isinstance(ignored, basestring): + ignored = [ignored] + ign_re = re.compile('|'.join([fnmatch.translate(p) for p in ignored])) + for root, dirs, files in os.walk(directory): + if include_dirs: + for basename in dirs: + if pats_re.match(basename): + if ignored and ign_re.match(basename): + continue + filename = os.path.join(root, basename) + yield filename + + for basename in files: + if pats_re.match(basename): + if ignored and ign_re.match(basename): + continue + filename = os.path.join(root, basename) + yield filename + return + + +def copy_tree(src, dst, symlinks=False, ignore=None): + """The ``copy_tree`` function is an exact copy of the built-in + :func:`shutil.copytree`, with one key difference: it will not + raise an exception if part of the tree already exists. It achieves + this by using :func:`mkdir_p`. + + Args: + src (str): Path of the source directory to copy. + dst (str): Destination path. Existing directories accepted. + symlinks (bool): If ``True``, copy symlinks rather than their + contents. + ignore (callable): A callable that takes a path and directory + listing, returning the files within the listing to be ignored. + + For more details, check out :func:`shutil.copytree` and + :func:`shutil.copy2`. + + """ + names = os.listdir(src) + if ignore is not None: + ignored_names = ignore(src, names) + else: + ignored_names = set() + + mkdir_p(dst) + errors = [] + for name in names: + if name in ignored_names: + continue + srcname = os.path.join(src, name) + dstname = os.path.join(dst, name) + try: + if symlinks and os.path.islink(srcname): + linkto = os.readlink(srcname) + os.symlink(linkto, dstname) + elif os.path.isdir(srcname): + copytree(srcname, dstname, symlinks, ignore) + else: + # Will raise a SpecialFileError for unsupported file types + copy2(srcname, dstname) + # catch the Error from the recursive copytree so that we can + # continue with other files + except Error as e: + errors.extend(e.args[0]) + except EnvironmentError as why: + errors.append((srcname, dstname, str(why))) + try: + copystat(src, dst) + except OSError as why: + if WindowsError is not None and isinstance(why, WindowsError): + # Copying file access times may fail on Windows + pass + else: + errors.append((src, dst, str(why))) + if errors: + raise Error(errors) + + +copytree = copy_tree # alias for drop-in replacement of shutil + + +try: + file +except NameError: + file = object + + +# like open(os.devnull) but with even fewer side effects +class DummyFile(file): + # TODO: raise ValueErrors on closed for all methods? + # TODO: enforce read/write + def __init__(self, path, mode='r', buffering=None): + self.name = path + self.mode = mode + self.closed = False + self.errors = None + self.isatty = False + self.encoding = None + self.newlines = None + self.softspace = 0 + + def close(self): + self.closed = True + + def fileno(self): + return -1 + + def flush(self): + if self.closed: + raise ValueError('I/O operation on a closed file') + return + + def next(self): + raise StopIteration() + + def read(self, size=0): + if self.closed: + raise ValueError('I/O operation on a closed file') + return '' + + def readline(self, size=0): + if self.closed: + raise ValueError('I/O operation on a closed file') + return '' + + def readlines(self, size=0): + if self.closed: + raise ValueError('I/O operation on a closed file') + return [] + + def seek(self): + if self.closed: + raise ValueError('I/O operation on a closed file') + return + + def tell(self): + if self.closed: + raise ValueError('I/O operation on a closed file') + return 0 + + def truncate(self): + if self.closed: + raise ValueError('I/O operation on a closed file') + return + + def write(self, string): + if self.closed: + raise ValueError('I/O operation on a closed file') + return + + def writelines(self, list_of_strings): + if self.closed: + raise ValueError('I/O operation on a closed file') + return + + def __next__(self): + raise StopIteration() + + def __enter__(self): + if self.closed: + raise ValueError('I/O operation on a closed file') + return + + def __exit__(self, exc_type, exc_val, exc_tb): + return + + +if __name__ == '__main__': + with atomic_save('/tmp/final.txt') as f: + f.write('rofl') + f.write('\n')