Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/boltons/ioutils.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/boltons/ioutils.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,505 +0,0 @@ -# -*- coding: utf-8 -*- - -# Coding decl above needed for rendering the emdash properly in the -# documentation. - -""" -Module ``ioutils`` implements a number of helper classes and functions which -are useful when dealing with input, output, and bytestreams in a variety of -ways. -""" -import os -from io import BytesIO -from abc import ( - ABCMeta, - abstractmethod, - abstractproperty, -) -from errno import EINVAL -from codecs import EncodedFile -from tempfile import TemporaryFile - -try: - text_type = unicode # Python 2 - binary_type = str -except NameError: - text_type = str # Python 3 - binary_type = bytes - -READ_CHUNK_SIZE = 21333 -""" -Number of bytes to read at a time. The value is ~ 1/3rd of 64k which means that -the value will easily fit in the L2 cache of most processors even if every -codepoint in a string is three bytes long which makes it a nice fast default -value. -""" - - -class SpooledIOBase(object): - """ - The SpooledTempoaryFile class doesn't support a number of attributes and - methods that a StringIO instance does. This brings the api as close to - compatible as possible with StringIO so that it may be used as a near - drop-in replacement to save memory. - - Another issue with SpooledTemporaryFile is that the spooled file is always - a cStringIO rather than a StringIO which causes issues with some of our - tools. - """ - __metaclass__ = ABCMeta - - def __init__(self, max_size=5000000, dir=None): - self._max_size = max_size - self._dir = dir - - @abstractmethod - def read(self, n=-1): - """Read n characters from the buffer""" - - @abstractmethod - def write(self, s): - """Write into the buffer""" - - @abstractmethod - def seek(self, pos, mode=0): - """Seek to a specific point in a file""" - - @abstractmethod - def readline(self, length=None): - """Returns the next available line""" - - @abstractmethod - def readlines(self, sizehint=0): - """Returns a list of all lines from the current position forward""" - - @abstractmethod - def rollover(self): - """Roll file-like-object over into a real temporary file""" - - @abstractmethod - def tell(self): - """Return the current position""" - - @abstractproperty - def buffer(self): - """Should return a flo instance""" - - @abstractproperty - def _rolled(self): - """Returns whether the file has been rolled to a real file or not""" - - @abstractproperty - def len(self): - """Returns the length of the data""" - - def _get_softspace(self): - return self.buffer.softspace - - def _set_softspace(self, val): - self.buffer.softspace = val - - softspace = property(_get_softspace, _set_softspace) - - @property - def _file(self): - return self.buffer - - def close(self): - return self.buffer.close() - - def flush(self): - return self.buffer.flush() - - def isatty(self): - return self.buffer.isatty() - - def next(self): - line = self.readline() - if not line: - pos = self.buffer.tell() - self.buffer.seek(0, os.SEEK_END) - if pos == self.buffer.tell(): - raise StopIteration - else: - self.buffer.seek(pos) - return line - - @property - def closed(self): - return self.buffer.closed - - @property - def pos(self): - return self.tell() - - @property - def buf(self): - return self.getvalue() - - def fileno(self): - self.rollover() - return self.buffer.fileno() - - def truncate(self, size=None): - """ - Custom version of truncate that takes either no arguments (like the - real SpooledTemporaryFile) or a single argument that truncates the - value to a certain index location. - """ - if size is None: - return self.buffer.truncate() - - if size < 0: - raise IOError(EINVAL, "Negative size not allowed") - - # Emulate truncation to a particular location - pos = self.tell() - self.seek(size) - self.buffer.truncate() - if pos < size: - self.seek(pos) - - def getvalue(self): - """Return the entire files contents""" - pos = self.tell() - self.seek(0) - val = self.read() - self.seek(pos) - return val - - def seekable(self): - return True - - def readable(self): - return True - - def writable(self): - return True - - __next__ = next - - def __len__(self): - return self.len - - def __iter__(self): - return self - - def __enter__(self): - return self - - def __exit__(self, *args): - self._file.close() - - def __eq__(self, other): - if isinstance(other, self.__class__): - return self.getvalue() == other.getvalue() - return False - - def __ne__(self, other): - return not self.__eq__(other) - - def __bool__(self): - return True - - __nonzero__ = __bool__ - - -class SpooledBytesIO(SpooledIOBase): - """ - SpooledBytesIO is a spooled file-like-object that only accepts bytes. On - Python 2.x this means the 'str' type; on Python 3.x this means the 'bytes' - type. Bytes are written in and retrieved exactly as given, but it will - raise TypeErrors if something other than bytes are written. - - Example:: - - >>> from boltons import ioutils - >>> with ioutils.SpooledBytesIO() as f: - ... f.write(b"Happy IO") - ... _ = f.seek(0) - ... isinstance(f.getvalue(), ioutils.binary_type) - True - """ - - def read(self, n=-1): - return self.buffer.read(n) - - def write(self, s): - if not isinstance(s, binary_type): - raise TypeError("{0} expected, got {1}".format( - binary_type.__name__, - type(s).__name__ - )) - - if self.tell() + len(s) >= self._max_size: - self.rollover() - self.buffer.write(s) - - def seek(self, pos, mode=0): - return self.buffer.seek(pos, mode) - - def readline(self, length=None): - if length: - return self.buffer.readline(length) - else: - return self.buffer.readline() - - def readlines(self, sizehint=0): - return self.buffer.readlines(sizehint) - - def rollover(self): - """Roll the StringIO over to a TempFile""" - if not self._rolled: - tmp = TemporaryFile(dir=self._dir) - pos = self.buffer.tell() - tmp.write(self.buffer.getvalue()) - tmp.seek(pos) - self.buffer.close() - self._buffer = tmp - - @property - def _rolled(self): - return not isinstance(self.buffer, BytesIO) - - @property - def buffer(self): - try: - return self._buffer - except AttributeError: - self._buffer = BytesIO() - return self._buffer - - @property - def len(self): - """Determine the length of the file""" - pos = self.tell() - if self._rolled: - self.seek(0) - val = os.fstat(self.fileno()).st_size - else: - self.seek(0, os.SEEK_END) - val = self.tell() - self.seek(pos) - return val - - def tell(self): - return self.buffer.tell() - - -class SpooledStringIO(SpooledIOBase): - """ - SpooledStringIO is a spooled file-like-object that only accepts unicode - values. On Python 2.x this means the 'unicode' type and on Python 3.x this - means the 'str' type. Values are accepted as unicode and then coerced into - utf-8 encoded bytes for storage. On retrieval, the values are returned as - unicode. - - Example:: - - >>> from boltons import ioutils - >>> with ioutils.SpooledStringIO() as f: - ... f.write(u"\u2014 Hey, an emdash!") - ... _ = f.seek(0) - ... isinstance(f.read(), ioutils.text_type) - True - - """ - def __init__(self, *args, **kwargs): - self._tell = 0 - super(SpooledStringIO, self).__init__(*args, **kwargs) - - def read(self, n=-1): - ret = self.buffer.reader.read(n, n) - self._tell = self.tell() + len(ret) - return ret - - def write(self, s): - if not isinstance(s, text_type): - raise TypeError("{0} expected, got {1}".format( - text_type.__name__, - type(s).__name__ - )) - current_pos = self.tell() - if self.buffer.tell() + len(s.encode('utf-8')) >= self._max_size: - self.rollover() - self.buffer.write(s.encode('utf-8')) - self._tell = current_pos + len(s) - - def _traverse_codepoints(self, current_position, n): - """Traverse from current position to the right n codepoints""" - dest = current_position + n - while True: - if current_position == dest: - # By chance we've landed on the right position, break - break - - # If the read would take us past the intended position then - # seek only enough to cover the offset - if current_position + READ_CHUNK_SIZE > dest: - self.read(dest - current_position) - break - else: - ret = self.read(READ_CHUNK_SIZE) - - # Increment our current position - current_position += READ_CHUNK_SIZE - - # If we kept reading but there was nothing here, break - # as we are at the end of the file - if not ret: - break - - return dest - - def seek(self, pos, mode=0): - """Traverse from offset to the specified codepoint""" - # Seek to position from the start of the file - if mode == os.SEEK_SET: - self.buffer.seek(0) - self._traverse_codepoints(0, pos) - self._tell = pos - # Seek to new position relative to current position - elif mode == os.SEEK_CUR: - start_pos = self.tell() - self._traverse_codepoints(self.tell(), pos) - self._tell = start_pos + pos - elif mode == os.SEEK_END: - self.buffer.seek(0) - dest_position = self.len - pos - self._traverse_codepoints(0, dest_position) - self._tell = dest_position - else: - raise ValueError( - "Invalid whence ({0}, should be 0, 1, or 2)".format(mode) - ) - return self.tell() - - def readline(self, length=None): - ret = self.buffer.readline(length).decode('utf-8') - self._tell = self.tell() + len(ret) - return ret - - def readlines(self, sizehint=0): - ret = [x.decode('utf-8') for x in self.buffer.readlines(sizehint)] - self._tell = self.tell() + sum((len(x) for x in ret)) - return ret - - @property - def buffer(self): - try: - return self._buffer - except AttributeError: - self._buffer = EncodedFile(BytesIO(), data_encoding='utf-8') - return self._buffer - - @property - def _rolled(self): - return not isinstance(self.buffer.stream, BytesIO) - - def rollover(self): - """Roll the StringIO over to a TempFile""" - if not self._rolled: - tmp = EncodedFile(TemporaryFile(dir=self._dir), - data_encoding='utf-8') - pos = self.buffer.tell() - tmp.write(self.buffer.getvalue()) - tmp.seek(pos) - self.buffer.close() - self._buffer = tmp - - def tell(self): - """Return the codepoint position""" - return self._tell - - @property - def len(self): - """Determine the number of codepoints in the file""" - pos = self.buffer.tell() - self.buffer.seek(0) - total = 0 - while True: - ret = self.read(READ_CHUNK_SIZE) - if not ret: - break - total += len(ret) - self.buffer.seek(pos) - return total - - -def is_text_fileobj(fileobj): - if getattr(fileobj, 'encoding', False): - # codecs.open and io.TextIOBase - return True - if getattr(fileobj, 'getvalue', False): - # StringIO.StringIO / cStringIO.StringIO / io.StringIO - try: - if isinstance(fileobj.getvalue(), type(u'')): - return True - except Exception: - pass - return False - - -class MultiFileReader(object): - """Takes a list of open files or file-like objects and provides an - interface to read from them all contiguously. Like - :func:`itertools.chain()`, but for reading files. - - >>> mfr = MultiFileReader(BytesIO(b'ab'), BytesIO(b'cd'), BytesIO(b'e')) - >>> mfr.read(3).decode('ascii') - u'abc' - >>> mfr.read(3).decode('ascii') - u'de' - - The constructor takes as many fileobjs as you hand it, and will - raise a TypeError on non-file-like objects. A ValueError is raised - when file-like objects are a mix of bytes- and text-handling - objects (for instance, BytesIO and StringIO). - """ - - def __init__(self, *fileobjs): - if not all([callable(getattr(f, 'read', None)) and - callable(getattr(f, 'seek', None)) for f in fileobjs]): - raise TypeError('MultiFileReader expected file-like objects' - ' with .read() and .seek()') - if all([is_text_fileobj(f) for f in fileobjs]): - # codecs.open and io.TextIOBase - self._joiner = u'' - elif any([is_text_fileobj(f) for f in fileobjs]): - raise ValueError('All arguments to MultiFileReader must handle' - ' bytes OR text, not a mix') - else: - # open/file and io.BytesIO - self._joiner = b'' - self._fileobjs = fileobjs - self._index = 0 - - def read(self, amt=None): - """Read up to the specified *amt*, seamlessly bridging across - files. Returns the appropriate type of string (bytes or text) - for the input, and returns an empty string when the files are - exhausted. - """ - if not amt: - return self._joiner.join(f.read() for f in self._fileobjs) - parts = [] - while amt > 0 and self._index < len(self._fileobjs): - parts.append(self._fileobjs[self._index].read(amt)) - got = len(parts[-1]) - if got < amt: - self._index += 1 - amt -= got - return self._joiner.join(parts) - - def seek(self, offset, whence=os.SEEK_SET): - """Enables setting position of the file cursor to a given - *offset*. Currently only supports ``offset=0``. - """ - if whence != os.SEEK_SET: - raise NotImplementedError( - 'MultiFileReader.seek() only supports os.SEEK_SET') - if offset != 0: - raise NotImplementedError( - 'MultiFileReader only supports seeking to start at this time') - for f in self._fileobjs: - f.seek(0)