Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/boltons/ioutils.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/boltons/ioutils.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,505 @@ +# -*- coding: utf-8 -*- + +# Coding decl above needed for rendering the emdash properly in the +# documentation. + +""" +Module ``ioutils`` implements a number of helper classes and functions which +are useful when dealing with input, output, and bytestreams in a variety of +ways. +""" +import os +from io import BytesIO +from abc import ( + ABCMeta, + abstractmethod, + abstractproperty, +) +from errno import EINVAL +from codecs import EncodedFile +from tempfile import TemporaryFile + +try: + text_type = unicode # Python 2 + binary_type = str +except NameError: + text_type = str # Python 3 + binary_type = bytes + +READ_CHUNK_SIZE = 21333 +""" +Number of bytes to read at a time. The value is ~ 1/3rd of 64k which means that +the value will easily fit in the L2 cache of most processors even if every +codepoint in a string is three bytes long which makes it a nice fast default +value. +""" + + +class SpooledIOBase(object): + """ + The SpooledTempoaryFile class doesn't support a number of attributes and + methods that a StringIO instance does. This brings the api as close to + compatible as possible with StringIO so that it may be used as a near + drop-in replacement to save memory. + + Another issue with SpooledTemporaryFile is that the spooled file is always + a cStringIO rather than a StringIO which causes issues with some of our + tools. + """ + __metaclass__ = ABCMeta + + def __init__(self, max_size=5000000, dir=None): + self._max_size = max_size + self._dir = dir + + @abstractmethod + def read(self, n=-1): + """Read n characters from the buffer""" + + @abstractmethod + def write(self, s): + """Write into the buffer""" + + @abstractmethod + def seek(self, pos, mode=0): + """Seek to a specific point in a file""" + + @abstractmethod + def readline(self, length=None): + """Returns the next available line""" + + @abstractmethod + def readlines(self, sizehint=0): + """Returns a list of all lines from the current position forward""" + + @abstractmethod + def rollover(self): + """Roll file-like-object over into a real temporary file""" + + @abstractmethod + def tell(self): + """Return the current position""" + + @abstractproperty + def buffer(self): + """Should return a flo instance""" + + @abstractproperty + def _rolled(self): + """Returns whether the file has been rolled to a real file or not""" + + @abstractproperty + def len(self): + """Returns the length of the data""" + + def _get_softspace(self): + return self.buffer.softspace + + def _set_softspace(self, val): + self.buffer.softspace = val + + softspace = property(_get_softspace, _set_softspace) + + @property + def _file(self): + return self.buffer + + def close(self): + return self.buffer.close() + + def flush(self): + return self.buffer.flush() + + def isatty(self): + return self.buffer.isatty() + + def next(self): + line = self.readline() + if not line: + pos = self.buffer.tell() + self.buffer.seek(0, os.SEEK_END) + if pos == self.buffer.tell(): + raise StopIteration + else: + self.buffer.seek(pos) + return line + + @property + def closed(self): + return self.buffer.closed + + @property + def pos(self): + return self.tell() + + @property + def buf(self): + return self.getvalue() + + def fileno(self): + self.rollover() + return self.buffer.fileno() + + def truncate(self, size=None): + """ + Custom version of truncate that takes either no arguments (like the + real SpooledTemporaryFile) or a single argument that truncates the + value to a certain index location. + """ + if size is None: + return self.buffer.truncate() + + if size < 0: + raise IOError(EINVAL, "Negative size not allowed") + + # Emulate truncation to a particular location + pos = self.tell() + self.seek(size) + self.buffer.truncate() + if pos < size: + self.seek(pos) + + def getvalue(self): + """Return the entire files contents""" + pos = self.tell() + self.seek(0) + val = self.read() + self.seek(pos) + return val + + def seekable(self): + return True + + def readable(self): + return True + + def writable(self): + return True + + __next__ = next + + def __len__(self): + return self.len + + def __iter__(self): + return self + + def __enter__(self): + return self + + def __exit__(self, *args): + self._file.close() + + def __eq__(self, other): + if isinstance(other, self.__class__): + return self.getvalue() == other.getvalue() + return False + + def __ne__(self, other): + return not self.__eq__(other) + + def __bool__(self): + return True + + __nonzero__ = __bool__ + + +class SpooledBytesIO(SpooledIOBase): + """ + SpooledBytesIO is a spooled file-like-object that only accepts bytes. On + Python 2.x this means the 'str' type; on Python 3.x this means the 'bytes' + type. Bytes are written in and retrieved exactly as given, but it will + raise TypeErrors if something other than bytes are written. + + Example:: + + >>> from boltons import ioutils + >>> with ioutils.SpooledBytesIO() as f: + ... f.write(b"Happy IO") + ... _ = f.seek(0) + ... isinstance(f.getvalue(), ioutils.binary_type) + True + """ + + def read(self, n=-1): + return self.buffer.read(n) + + def write(self, s): + if not isinstance(s, binary_type): + raise TypeError("{0} expected, got {1}".format( + binary_type.__name__, + type(s).__name__ + )) + + if self.tell() + len(s) >= self._max_size: + self.rollover() + self.buffer.write(s) + + def seek(self, pos, mode=0): + return self.buffer.seek(pos, mode) + + def readline(self, length=None): + if length: + return self.buffer.readline(length) + else: + return self.buffer.readline() + + def readlines(self, sizehint=0): + return self.buffer.readlines(sizehint) + + def rollover(self): + """Roll the StringIO over to a TempFile""" + if not self._rolled: + tmp = TemporaryFile(dir=self._dir) + pos = self.buffer.tell() + tmp.write(self.buffer.getvalue()) + tmp.seek(pos) + self.buffer.close() + self._buffer = tmp + + @property + def _rolled(self): + return not isinstance(self.buffer, BytesIO) + + @property + def buffer(self): + try: + return self._buffer + except AttributeError: + self._buffer = BytesIO() + return self._buffer + + @property + def len(self): + """Determine the length of the file""" + pos = self.tell() + if self._rolled: + self.seek(0) + val = os.fstat(self.fileno()).st_size + else: + self.seek(0, os.SEEK_END) + val = self.tell() + self.seek(pos) + return val + + def tell(self): + return self.buffer.tell() + + +class SpooledStringIO(SpooledIOBase): + """ + SpooledStringIO is a spooled file-like-object that only accepts unicode + values. On Python 2.x this means the 'unicode' type and on Python 3.x this + means the 'str' type. Values are accepted as unicode and then coerced into + utf-8 encoded bytes for storage. On retrieval, the values are returned as + unicode. + + Example:: + + >>> from boltons import ioutils + >>> with ioutils.SpooledStringIO() as f: + ... f.write(u"\u2014 Hey, an emdash!") + ... _ = f.seek(0) + ... isinstance(f.read(), ioutils.text_type) + True + + """ + def __init__(self, *args, **kwargs): + self._tell = 0 + super(SpooledStringIO, self).__init__(*args, **kwargs) + + def read(self, n=-1): + ret = self.buffer.reader.read(n, n) + self._tell = self.tell() + len(ret) + return ret + + def write(self, s): + if not isinstance(s, text_type): + raise TypeError("{0} expected, got {1}".format( + text_type.__name__, + type(s).__name__ + )) + current_pos = self.tell() + if self.buffer.tell() + len(s.encode('utf-8')) >= self._max_size: + self.rollover() + self.buffer.write(s.encode('utf-8')) + self._tell = current_pos + len(s) + + def _traverse_codepoints(self, current_position, n): + """Traverse from current position to the right n codepoints""" + dest = current_position + n + while True: + if current_position == dest: + # By chance we've landed on the right position, break + break + + # If the read would take us past the intended position then + # seek only enough to cover the offset + if current_position + READ_CHUNK_SIZE > dest: + self.read(dest - current_position) + break + else: + ret = self.read(READ_CHUNK_SIZE) + + # Increment our current position + current_position += READ_CHUNK_SIZE + + # If we kept reading but there was nothing here, break + # as we are at the end of the file + if not ret: + break + + return dest + + def seek(self, pos, mode=0): + """Traverse from offset to the specified codepoint""" + # Seek to position from the start of the file + if mode == os.SEEK_SET: + self.buffer.seek(0) + self._traverse_codepoints(0, pos) + self._tell = pos + # Seek to new position relative to current position + elif mode == os.SEEK_CUR: + start_pos = self.tell() + self._traverse_codepoints(self.tell(), pos) + self._tell = start_pos + pos + elif mode == os.SEEK_END: + self.buffer.seek(0) + dest_position = self.len - pos + self._traverse_codepoints(0, dest_position) + self._tell = dest_position + else: + raise ValueError( + "Invalid whence ({0}, should be 0, 1, or 2)".format(mode) + ) + return self.tell() + + def readline(self, length=None): + ret = self.buffer.readline(length).decode('utf-8') + self._tell = self.tell() + len(ret) + return ret + + def readlines(self, sizehint=0): + ret = [x.decode('utf-8') for x in self.buffer.readlines(sizehint)] + self._tell = self.tell() + sum((len(x) for x in ret)) + return ret + + @property + def buffer(self): + try: + return self._buffer + except AttributeError: + self._buffer = EncodedFile(BytesIO(), data_encoding='utf-8') + return self._buffer + + @property + def _rolled(self): + return not isinstance(self.buffer.stream, BytesIO) + + def rollover(self): + """Roll the StringIO over to a TempFile""" + if not self._rolled: + tmp = EncodedFile(TemporaryFile(dir=self._dir), + data_encoding='utf-8') + pos = self.buffer.tell() + tmp.write(self.buffer.getvalue()) + tmp.seek(pos) + self.buffer.close() + self._buffer = tmp + + def tell(self): + """Return the codepoint position""" + return self._tell + + @property + def len(self): + """Determine the number of codepoints in the file""" + pos = self.buffer.tell() + self.buffer.seek(0) + total = 0 + while True: + ret = self.read(READ_CHUNK_SIZE) + if not ret: + break + total += len(ret) + self.buffer.seek(pos) + return total + + +def is_text_fileobj(fileobj): + if getattr(fileobj, 'encoding', False): + # codecs.open and io.TextIOBase + return True + if getattr(fileobj, 'getvalue', False): + # StringIO.StringIO / cStringIO.StringIO / io.StringIO + try: + if isinstance(fileobj.getvalue(), type(u'')): + return True + except Exception: + pass + return False + + +class MultiFileReader(object): + """Takes a list of open files or file-like objects and provides an + interface to read from them all contiguously. Like + :func:`itertools.chain()`, but for reading files. + + >>> mfr = MultiFileReader(BytesIO(b'ab'), BytesIO(b'cd'), BytesIO(b'e')) + >>> mfr.read(3).decode('ascii') + u'abc' + >>> mfr.read(3).decode('ascii') + u'de' + + The constructor takes as many fileobjs as you hand it, and will + raise a TypeError on non-file-like objects. A ValueError is raised + when file-like objects are a mix of bytes- and text-handling + objects (for instance, BytesIO and StringIO). + """ + + def __init__(self, *fileobjs): + if not all([callable(getattr(f, 'read', None)) and + callable(getattr(f, 'seek', None)) for f in fileobjs]): + raise TypeError('MultiFileReader expected file-like objects' + ' with .read() and .seek()') + if all([is_text_fileobj(f) for f in fileobjs]): + # codecs.open and io.TextIOBase + self._joiner = u'' + elif any([is_text_fileobj(f) for f in fileobjs]): + raise ValueError('All arguments to MultiFileReader must handle' + ' bytes OR text, not a mix') + else: + # open/file and io.BytesIO + self._joiner = b'' + self._fileobjs = fileobjs + self._index = 0 + + def read(self, amt=None): + """Read up to the specified *amt*, seamlessly bridging across + files. Returns the appropriate type of string (bytes or text) + for the input, and returns an empty string when the files are + exhausted. + """ + if not amt: + return self._joiner.join(f.read() for f in self._fileobjs) + parts = [] + while amt > 0 and self._index < len(self._fileobjs): + parts.append(self._fileobjs[self._index].read(amt)) + got = len(parts[-1]) + if got < amt: + self._index += 1 + amt -= got + return self._joiner.join(parts) + + def seek(self, offset, whence=os.SEEK_SET): + """Enables setting position of the file cursor to a given + *offset*. Currently only supports ``offset=0``. + """ + if whence != os.SEEK_SET: + raise NotImplementedError( + 'MultiFileReader.seek() only supports os.SEEK_SET') + if offset != 0: + raise NotImplementedError( + 'MultiFileReader only supports seeking to start at this time') + for f in self._fileobjs: + f.seek(0)