Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/boltons/fileutils.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/boltons/fileutils.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,660 +0,0 @@ -# -*- coding: utf-8 -*- -"""Virtually every Python programmer has used Python for wrangling -disk contents, and ``fileutils`` collects solutions to some of the -most commonly-found gaps in the standard library. -""" - -from __future__ import print_function - -import os -import re -import sys -import stat -import errno -import fnmatch -from shutil import copy2, copystat, Error - - -__all__ = ['mkdir_p', 'atomic_save', 'AtomicSaver', 'FilePerms', - 'iter_find_files', 'copytree'] - - -FULL_PERMS = 511 # 0777 that both Python 2 and 3 can digest -RW_PERMS = 438 -_SINGLE_FULL_PERM = 7 # or 07 in Python 2 -try: - basestring -except NameError: - unicode = str # Python 3 compat - basestring = (str, bytes) - - -def mkdir_p(path): - """Creates a directory and any parent directories that may need to - be created along the way, without raising errors for any existing - directories. This function mimics the behavior of the ``mkdir -p`` - command available in Linux/BSD environments, but also works on - Windows. - """ - try: - os.makedirs(path) - except OSError as exc: - if exc.errno == errno.EEXIST and os.path.isdir(path): - return - raise - return - - -class FilePerms(object): - """The :class:`FilePerms` type is used to represent standard POSIX - filesystem permissions: - - * Read - * Write - * Execute - - Across three classes of user: - - * Owning (u)ser - * Owner's (g)roup - * Any (o)ther user - - This class assists with computing new permissions, as well as - working with numeric octal ``777``-style and ``rwx``-style - permissions. Currently it only considers the bottom 9 permission - bits; it does not support sticky bits or more advanced permission - systems. - - Args: - user (str): A string in the 'rwx' format, omitting characters - for which owning user's permissions are not provided. - group (str): A string in the 'rwx' format, omitting characters - for which owning group permissions are not provided. - other (str): A string in the 'rwx' format, omitting characters - for which owning other/world permissions are not provided. - - There are many ways to use :class:`FilePerms`: - - >>> FilePerms(user='rwx', group='xrw', other='wxr') # note character order - FilePerms(user='rwx', group='rwx', other='rwx') - >>> int(FilePerms('r', 'r', '')) - 288 - >>> oct(288)[-3:] # XXX Py3k - '440' - - See also the :meth:`FilePerms.from_int` and - :meth:`FilePerms.from_path` classmethods for useful alternative - ways to construct :class:`FilePerms` objects. - """ - # TODO: consider more than the lower 9 bits - class _FilePermProperty(object): - _perm_chars = 'rwx' - _perm_set = frozenset('rwx') - _perm_val = {'r': 4, 'w': 2, 'x': 1} # for sorting - - def __init__(self, attribute, offset): - self.attribute = attribute - self.offset = offset - - def __get__(self, fp_obj, type_=None): - if fp_obj is None: - return self - return getattr(fp_obj, self.attribute) - - def __set__(self, fp_obj, value): - cur = getattr(fp_obj, self.attribute) - if cur == value: - return - try: - invalid_chars = set(str(value)) - self._perm_set - except TypeError: - raise TypeError('expected string, not %r' % value) - if invalid_chars: - raise ValueError('got invalid chars %r in permission' - ' specification %r, expected empty string' - ' or one or more of %r' - % (invalid_chars, value, self._perm_chars)) - - sort_key = lambda c: self._perm_val[c] - new_value = ''.join(sorted(set(value), - key=sort_key, reverse=True)) - setattr(fp_obj, self.attribute, new_value) - self._update_integer(fp_obj, new_value) - - def _update_integer(self, fp_obj, value): - mode = 0 - key = 'xwr' - for symbol in value: - bit = 2 ** key.index(symbol) - mode |= (bit << (self.offset * 3)) - fp_obj._integer |= mode - - def __init__(self, user='', group='', other=''): - self._user, self._group, self._other = '', '', '' - self._integer = 0 - self.user = user - self.group = group - self.other = other - - @classmethod - def from_int(cls, i): - """Create a :class:`FilePerms` object from an integer. - - >>> FilePerms.from_int(0o644) # note the leading zero-oh for octal - FilePerms(user='rw', group='r', other='r') - """ - i &= FULL_PERMS - key = ('', 'x', 'w', 'xw', 'r', 'rx', 'rw', 'rwx') - parts = [] - while i: - parts.append(key[i & _SINGLE_FULL_PERM]) - i >>= 3 - parts.reverse() - return cls(*parts) - - @classmethod - def from_path(cls, path): - """Make a new :class:`FilePerms` object based on the permissions - assigned to the file or directory at *path*. - - Args: - path (str): Filesystem path of the target file. - - Here's an example that holds true on most systems: - - >>> import tempfile - >>> 'r' in FilePerms.from_path(tempfile.gettempdir()).user - True - """ - stat_res = os.stat(path) - return cls.from_int(stat.S_IMODE(stat_res.st_mode)) - - def __int__(self): - return self._integer - - # Sphinx tip: attribute docstrings come after the attribute - user = _FilePermProperty('_user', 2) - "Stores the ``rwx``-formatted *user* permission." - group = _FilePermProperty('_group', 1) - "Stores the ``rwx``-formatted *group* permission." - other = _FilePermProperty('_other', 0) - "Stores the ``rwx``-formatted *other* permission." - - def __repr__(self): - cn = self.__class__.__name__ - return ('%s(user=%r, group=%r, other=%r)' - % (cn, self.user, self.group, self.other)) - -#### - - -_TEXT_OPENFLAGS = os.O_RDWR | os.O_CREAT | os.O_EXCL -if hasattr(os, 'O_NOINHERIT'): - _TEXT_OPENFLAGS |= os.O_NOINHERIT -if hasattr(os, 'O_NOFOLLOW'): - _TEXT_OPENFLAGS |= os.O_NOFOLLOW -_BIN_OPENFLAGS = _TEXT_OPENFLAGS -if hasattr(os, 'O_BINARY'): - _BIN_OPENFLAGS |= os.O_BINARY - - -try: - import fcntl as fcntl -except ImportError: - def set_cloexec(fd): - "Dummy set_cloexec for platforms without fcntl support" - pass -else: - def set_cloexec(fd): - """Does a best-effort :func:`fcntl.fcntl` call to set a fd to be - automatically closed by any future child processes. - - Implementation from the :mod:`tempfile` module. - """ - try: - flags = fcntl.fcntl(fd, fcntl.F_GETFD, 0) - except IOError: - pass - else: - # flags read successfully, modify - flags |= fcntl.FD_CLOEXEC - fcntl.fcntl(fd, fcntl.F_SETFD, flags) - return - - -def atomic_save(dest_path, **kwargs): - """A convenient interface to the :class:`AtomicSaver` type. See the - :class:`AtomicSaver` documentation for details. - """ - return AtomicSaver(dest_path, **kwargs) - - -def path_to_unicode(path): - if isinstance(path, unicode): - return path - encoding = sys.getfilesystemencoding() or sys.getdefaultencoding() - return path.decode(encoding) - - -if os.name == 'nt': - import ctypes - from ctypes import c_wchar_p - from ctypes.wintypes import DWORD, LPVOID - - _ReplaceFile = ctypes.windll.kernel32.ReplaceFile - _ReplaceFile.argtypes = [c_wchar_p, c_wchar_p, c_wchar_p, - DWORD, LPVOID, LPVOID] - - def replace(src, dst): - # argument names match stdlib docs, docstring below - try: - # ReplaceFile fails if the dest file does not exist, so - # first try to rename it into position - os.rename(src, dst) - return - except WindowsError as we: - if we.errno == errno.EEXIST: - pass # continue with the ReplaceFile logic below - else: - raise - - src = path_to_unicode(src) - dst = path_to_unicode(dst) - res = _ReplaceFile(c_wchar_p(dst), c_wchar_p(src), - None, 0, None, None) - if not res: - raise OSError('failed to replace %r with %r' % (dst, src)) - return - - def atomic_rename(src, dst, overwrite=False): - "Rename *src* to *dst*, replacing *dst* if *overwrite is True" - if overwrite: - replace(src, dst) - else: - os.rename(src, dst) - return -else: - # wrapper func for cross compat + docs - def replace(src, dst): - # os.replace does the same thing on unix - return os.rename(src, dst) - - def atomic_rename(src, dst, overwrite=False): - "Rename *src* to *dst*, replacing *dst* if *overwrite is True" - if overwrite: - os.rename(src, dst) - else: - os.link(src, dst) - os.unlink(src) - return - - -_atomic_rename = atomic_rename # backwards compat - -replace.__doc__ = """Similar to :func:`os.replace` in Python 3.3+, -this function will atomically create or replace the file at path -*dst* with the file at path *src*. - -On Windows, this function uses the ReplaceFile API for maximum -possible atomicity on a range of filesystems. -""" - - -class AtomicSaver(object): - """``AtomicSaver`` is a configurable `context manager`_ that provides - a writable :class:`file` which will be moved into place as long as - no exceptions are raised within the context manager's block. These - "part files" are created in the same directory as the destination - path to ensure atomic move operations (i.e., no cross-filesystem - moves occur). - - Args: - dest_path (str): The path where the completed file will be - written. - overwrite (bool): Whether to overwrite the destination file if - it exists at completion time. Defaults to ``True``. - file_perms (int): Integer representation of file permissions - for the newly-created file. Defaults are, when the - destination path already exists, to copy the permissions - from the previous file, or if the file did not exist, to - respect the user's configured `umask`_, usually resulting - in octal 0644 or 0664. - part_file (str): Name of the temporary *part_file*. Defaults - to *dest_path* + ``.part``. Note that this argument is - just the filename, and not the full path of the part - file. To guarantee atomic saves, part files are always - created in the same directory as the destination path. - overwrite_part (bool): Whether to overwrite the *part_file*, - should it exist at setup time. Defaults to ``False``, - which results in an :exc:`OSError` being raised on - pre-existing part files. Be careful of setting this to - ``True`` in situations when multiple threads or processes - could be writing to the same part file. - rm_part_on_exc (bool): Remove *part_file* on exception cases. - Defaults to ``True``, but ``False`` can be useful for - recovery in some cases. Note that resumption is not - automatic and by default an :exc:`OSError` is raised if - the *part_file* exists. - - Practically, the AtomicSaver serves a few purposes: - - * Avoiding overwriting an existing, valid file with a partially - written one. - * Providing a reasonable guarantee that a part file only has one - writer at a time. - * Optional recovery of partial data in failure cases. - - .. _context manager: https://docs.python.org/2/reference/compound_stmts.html#with - .. _umask: https://en.wikipedia.org/wiki/Umask - - """ - _default_file_perms = RW_PERMS - - # TODO: option to abort if target file modify date has changed since start? - def __init__(self, dest_path, **kwargs): - self.dest_path = dest_path - self.overwrite = kwargs.pop('overwrite', True) - self.file_perms = kwargs.pop('file_perms', None) - self.overwrite_part = kwargs.pop('overwrite_part', False) - self.part_filename = kwargs.pop('part_file', None) - self.rm_part_on_exc = kwargs.pop('rm_part_on_exc', True) - self.text_mode = kwargs.pop('text_mode', False) # for windows - self.buffering = kwargs.pop('buffering', -1) - if kwargs: - raise TypeError('unexpected kwargs: %r' % (kwargs.keys(),)) - - self.dest_path = os.path.abspath(self.dest_path) - self.dest_dir = os.path.dirname(self.dest_path) - if not self.part_filename: - self.part_path = dest_path + '.part' - else: - self.part_path = os.path.join(self.dest_dir, self.part_filename) - self.mode = 'w+' if self.text_mode else 'w+b' - self.open_flags = _TEXT_OPENFLAGS if self.text_mode else _BIN_OPENFLAGS - - self.part_file = None - - def _open_part_file(self): - do_chmod = True - file_perms = self.file_perms - if file_perms is None: - try: - # try to copy from file being replaced - stat_res = os.stat(self.dest_path) - file_perms = stat.S_IMODE(stat_res.st_mode) - except (OSError, IOError): - # default if no destination file exists - file_perms = self._default_file_perms - do_chmod = False # respect the umask - - fd = os.open(self.part_path, self.open_flags, file_perms) - set_cloexec(fd) - self.part_file = os.fdopen(fd, self.mode, self.buffering) - - # if default perms are overridden by the user or previous dest_path - # chmod away the effects of the umask - if do_chmod: - try: - os.chmod(self.part_path, file_perms) - except (OSError, IOError): - self.part_file.close() - raise - return - - def setup(self): - """Called on context manager entry (the :keyword:`with` statement), - the ``setup()`` method creates the temporary file in the same - directory as the destination file. - - ``setup()`` tests for a writable directory with rename permissions - early, as the part file may not be written to immediately (not - using :func:`os.access` because of the potential issues of - effective vs. real privileges). - - If the caller is not using the :class:`AtomicSaver` as a - context manager, this method should be called explicitly - before writing. - """ - if os.path.lexists(self.dest_path): - if not self.overwrite: - raise OSError(errno.EEXIST, - 'Overwrite disabled and file already exists', - self.dest_path) - if self.overwrite_part and os.path.lexists(self.part_path): - os.unlink(self.part_path) - self._open_part_file() - return - - def __enter__(self): - self.setup() - return self.part_file - - def __exit__(self, exc_type, exc_val, exc_tb): - self.part_file.close() - if exc_type: - if self.rm_part_on_exc: - try: - os.unlink(self.part_path) - except Exception: - pass # avoid masking original error - return - try: - atomic_rename(self.part_path, self.dest_path, - overwrite=self.overwrite) - except OSError: - if self.rm_part_on_exc: - try: - os.unlink(self.part_path) - except Exception: - pass # avoid masking original error - raise # could not save destination file - return - - -def iter_find_files(directory, patterns, ignored=None, include_dirs=False): - """Returns a generator that yields file paths under a *directory*, - matching *patterns* using `glob`_ syntax (e.g., ``*.txt``). Also - supports *ignored* patterns. - - Args: - directory (str): Path that serves as the root of the - search. Yielded paths will include this as a prefix. - patterns (str or list): A single pattern or list of - glob-formatted patterns to find under *directory*. - ignored (str or list): A single pattern or list of - glob-formatted patterns to ignore. - include_dirs (bool): Whether to include directories that match - patterns, as well. Defaults to ``False``. - - For example, finding Python files in the current directory: - - >>> _CUR_DIR = os.path.dirname(os.path.abspath(__file__)) - >>> filenames = sorted(iter_find_files(_CUR_DIR, '*.py')) - >>> os.path.basename(filenames[-1]) - 'urlutils.py' - - Or, Python files while ignoring emacs lockfiles: - - >>> filenames = iter_find_files(_CUR_DIR, '*.py', ignored='.#*') - - .. _glob: https://en.wikipedia.org/wiki/Glob_%28programming%29 - - """ - if isinstance(patterns, basestring): - patterns = [patterns] - pats_re = re.compile('|'.join([fnmatch.translate(p) for p in patterns])) - - if not ignored: - ignored = [] - elif isinstance(ignored, basestring): - ignored = [ignored] - ign_re = re.compile('|'.join([fnmatch.translate(p) for p in ignored])) - for root, dirs, files in os.walk(directory): - if include_dirs: - for basename in dirs: - if pats_re.match(basename): - if ignored and ign_re.match(basename): - continue - filename = os.path.join(root, basename) - yield filename - - for basename in files: - if pats_re.match(basename): - if ignored and ign_re.match(basename): - continue - filename = os.path.join(root, basename) - yield filename - return - - -def copy_tree(src, dst, symlinks=False, ignore=None): - """The ``copy_tree`` function is an exact copy of the built-in - :func:`shutil.copytree`, with one key difference: it will not - raise an exception if part of the tree already exists. It achieves - this by using :func:`mkdir_p`. - - Args: - src (str): Path of the source directory to copy. - dst (str): Destination path. Existing directories accepted. - symlinks (bool): If ``True``, copy symlinks rather than their - contents. - ignore (callable): A callable that takes a path and directory - listing, returning the files within the listing to be ignored. - - For more details, check out :func:`shutil.copytree` and - :func:`shutil.copy2`. - - """ - names = os.listdir(src) - if ignore is not None: - ignored_names = ignore(src, names) - else: - ignored_names = set() - - mkdir_p(dst) - errors = [] - for name in names: - if name in ignored_names: - continue - srcname = os.path.join(src, name) - dstname = os.path.join(dst, name) - try: - if symlinks and os.path.islink(srcname): - linkto = os.readlink(srcname) - os.symlink(linkto, dstname) - elif os.path.isdir(srcname): - copytree(srcname, dstname, symlinks, ignore) - else: - # Will raise a SpecialFileError for unsupported file types - copy2(srcname, dstname) - # catch the Error from the recursive copytree so that we can - # continue with other files - except Error as e: - errors.extend(e.args[0]) - except EnvironmentError as why: - errors.append((srcname, dstname, str(why))) - try: - copystat(src, dst) - except OSError as why: - if WindowsError is not None and isinstance(why, WindowsError): - # Copying file access times may fail on Windows - pass - else: - errors.append((src, dst, str(why))) - if errors: - raise Error(errors) - - -copytree = copy_tree # alias for drop-in replacement of shutil - - -try: - file -except NameError: - file = object - - -# like open(os.devnull) but with even fewer side effects -class DummyFile(file): - # TODO: raise ValueErrors on closed for all methods? - # TODO: enforce read/write - def __init__(self, path, mode='r', buffering=None): - self.name = path - self.mode = mode - self.closed = False - self.errors = None - self.isatty = False - self.encoding = None - self.newlines = None - self.softspace = 0 - - def close(self): - self.closed = True - - def fileno(self): - return -1 - - def flush(self): - if self.closed: - raise ValueError('I/O operation on a closed file') - return - - def next(self): - raise StopIteration() - - def read(self, size=0): - if self.closed: - raise ValueError('I/O operation on a closed file') - return '' - - def readline(self, size=0): - if self.closed: - raise ValueError('I/O operation on a closed file') - return '' - - def readlines(self, size=0): - if self.closed: - raise ValueError('I/O operation on a closed file') - return [] - - def seek(self): - if self.closed: - raise ValueError('I/O operation on a closed file') - return - - def tell(self): - if self.closed: - raise ValueError('I/O operation on a closed file') - return 0 - - def truncate(self): - if self.closed: - raise ValueError('I/O operation on a closed file') - return - - def write(self, string): - if self.closed: - raise ValueError('I/O operation on a closed file') - return - - def writelines(self, list_of_strings): - if self.closed: - raise ValueError('I/O operation on a closed file') - return - - def __next__(self): - raise StopIteration() - - def __enter__(self): - if self.closed: - raise ValueError('I/O operation on a closed file') - return - - def __exit__(self, exc_type, exc_val, exc_tb): - return - - -if __name__ == '__main__': - with atomic_save('/tmp/final.txt') as f: - f.write('rofl') - f.write('\n')