Mercurial > repos > nick > sequence_content_trimmer
view getreads.py @ 1:464aee13e2df draft default tip
"planemo upload commit 8e52aac4afce4ab7c4d244e2b70f205f70c16749-dirty"
author | nick |
---|---|
date | Fri, 27 May 2022 23:29:45 +0000 |
parents | 7f170cb06e2e |
children |
line wrap: on
line source
#!/usr/bin/env python3 import argparse import logging import os import sys import types """A simple parser for FASTA, FASTQ, SAM, etc. Create generators that just return the read name and sequence. All format parsers follow this API: with open('sequence.fasta') as fasta: for read in getreads.getparser(fasta, filetype='fasta'): print "There is a sequence with this FASTA identifier: "+read.id print "Its sequence is "+read.seq The properties of Read are: name: The entire FASTA header line, SAM column 1, etc. id: The first whitespace-delimited part of the name. seq: The sequence. qual: The quality scores (unless the format is FASTA). """ # Available formats. FORMATS = ('fasta', 'fastq', 'sam', 'tsv', 'lines') QUAL_OFFSETS = {'sanger':33, 'solexa':64} def getparser(input, filetype, qual_format='sanger', name_col=1, seq_col=2, qual_col=3): # Detect whether the input is an open file or a path. # Return the appropriate reader. if filetype == 'fasta': return FastaReader(input) elif filetype == 'fastq': return FastqReader(input, qual_format=qual_format) elif filetype == 'sam': return SamReader(input, qual_format=qual_format) elif filetype == 'tsv': return TsvReader(input, qual_format=qual_format, name_col=name_col, seq_col=seq_col, qual_col=qual_col) elif filetype == 'lines': return LineReader(input) else: raise ValueError('Unrecognized format: {!r}'.format(filetype)) def detect_input_type(obj): """Is this an open filehandle, or is it a file path (string)?""" try: os.path.isfile(obj) return 'path' except TypeError: if isinstance(obj, types.GeneratorType): return 'generator' elif hasattr(obj, 'read') and hasattr(obj, 'close'): return 'file' else: return None class FormatError(Exception): def __init__(self, message=None): if message: Exception.__init__(self, message) class Read(object): def __init__(self, name='', seq='', id_='', qual='', qual_format='sanger'): self.name = name self.seq = seq self.qual = qual if id_ or not self.name: self.id = id_ elif self.name: self.id = self.name.split()[0] self.offset = QUAL_OFFSETS[qual_format] @property def scores(self): if self.qual is None: return None scores = [] for qual_char in self.qual: scores.append(ord(qual_char) - self.offset) return scores def to_fasta(self): return f'>{self.name}\n{self.seq}' def to_fastq(self): return f'@{self.name}\n{self.seq}\n+\n{self.qual}' def __str__(self): if self.qual: return self.to_fastq() else: return self.to_fasta() def __repr__(self): kwarg_strs = [] for kwarg in 'name', 'seq', 'id_', 'qual': attr = kwarg.rstrip('_') raw_value = getattr(self, attr) if raw_value is not None and len(raw_value) >= 200: value = raw_value[:199]+'…' else: value = raw_value if value: kwarg_strs.append(f'{kwarg}={value!r}') return type(self).__name__+'('+', '.join(kwarg_strs)+')' class Reader(object): """Base class for all other parsers.""" def __init__(self, input, **kwargs): self.input = input self.input_type = detect_input_type(input) if self.input_type not in ('path', 'file', 'generator'): raise ValueError('Input object {!r} not a file, string, or generator.'.format(input)) for key, value in kwargs.items(): setattr(self, key, value) def __iter__(self): return self.parser() def bases(self): for read in self.parser(): for base in read.seq: yield base def get_input_iterator(self): if self.input_type == 'path': return open(self.input) else: return self.input class LineReader(Reader): """A parser for the simplest format: Only the sequence, one line per read.""" def parser(self): input_iterator = self.get_input_iterator() try: for line in input_iterator: read = Read(seq=line.rstrip('\r\n')) yield read finally: if self.input_type == 'path': input_iterator.close() class TsvReader(Reader): """A parser for a simple tab-delimited format. Column 1: name Column 2: sequence Column 3: quality scores (optional)""" def parser(self): min_fields = max(self.name_col, self.seq_col) input_iterator = self.get_input_iterator() try: for line in input_iterator: fields = line.rstrip('\r\n').split('\t') if len(fields) < min_fields: continue read = Read(qual_format=self.qual_format) read.name = fields[self.name_col-1] if read.name: read.id = read.name.split()[0] read.seq = fields[self.seq_col-1] try: read.qual = fields[self.qual_col-1] except (TypeError, IndexError): pass yield read finally: if self.input_type == 'path': input_iterator.close() class SamReader(Reader): """A simple SAM parser. Assumptions: Lines starting with "@" with 3 fields are headers. All others are alignments. All alignment lines have 11 or more fields. Other lines will be skipped. """ def parser(self): input_iterator = self.get_input_iterator() try: for line in input_iterator: fields = line.split('\t') if len(fields) < 11: continue # Skip headers. if fields[0].startswith('@') and len(fields[0]) == 3: continue read = Read(qual_format=self.qual_format) read.name = fields[0] if read.name: read.id = read.name.split()[0] read.seq = fields[9] read.qual = fields[10].rstrip('\r\n') yield read finally: if self.input_type == 'path': input_iterator.close() class FastaReader(Reader): """A simple FASTA parser that reads one sequence at a time into memory.""" def parser(self): input_iterator = self.get_input_iterator() try: read = None while True: try: line_raw = next(input_iterator) except StopIteration: if read is not None: yield read return line = line_raw.rstrip('\r\n') if line.startswith('>'): if read is not None: yield read read = Read() read.name = line[1:] # remove ">" if read.name: read.id = read.name.split()[0] continue else: read.seq += line finally: if self.input_type == 'path': input_iterator.close() class FastqReader(Reader): """A simple FASTQ parser. Can handle multi-line sequences, though.""" def parser(self): input_iterator = self.get_input_iterator() try: read = None line_num = 0 state = 'header' while True: try: line_raw = next(input_iterator) except StopIteration: if read is not None: yield read return line_num += 1 line = line_raw.rstrip('\r\n') if state == 'header': if not line.startswith('@'): if line: raise FormatError('line state = "header" but line does not start with "@":\n'+line) else: # Allow empty lines. continue if read is not None: yield read read = Read(qual_format=self.qual_format) read.name = line[1:] # remove '@' if read.name: read.id = read.name.split()[0] state = 'sequence' elif state == 'sequence': if line.startswith('+'): state = 'plus' else: read.seq += line elif state == 'plus' or state == 'quality': if line.startswith('@') and state == 'quality': logging.warning('Looking for more quality scores but line starts with "@". This might ' 'be a header line and there were fewer quality scores than bases: {}' .format(line[:69])) state = 'quality' togo = len(read.seq) - len(read.qual) read.qual += line[:togo] # The end of the quality lines is when we have a quality string as long as the sequence. if len(read.qual) >= len(read.seq): state = 'header' finally: if self.input_type == 'path': input_iterator.close() DESCRIPTION = 'Test parser by parsing an input file and printing its contents.' def make_argparser(): parser = argparse.ArgumentParser(description=DESCRIPTION) parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin, help='Input reads.') parser.add_argument('-f', '--format', choices=('fasta', 'fastq', 'sam', 'tsv', 'lines'), help='Input read format. Will be detected from the filename, if given.') return parser def main(argv): parser = make_argparser() args = parser.parse_args(argv[1:]) if args.format: format = args.format elif args.infile is sys.stdin: fail('Error: Must give a --format if reading from stdin.') else: ext = os.path.splitext(args.infile.name)[1] if ext == '.fq': format = 'fastq' elif ext == '.fa': format = 'fasta' elif ext == '.txt': format = 'lines' else: format = ext[1:] print('Reading input as format {!r}.'.format(format)) for i, read in enumerate(getparser(args.infile, filetype=format)): print('Read {} id/name: {!r}/{!r}'.format(i+1, read.id, read.name)) print('Read {} seq: {!r}'.format(i+1, read.seq)) print('Read {} qual: {!r}'.format(i+1, read.qual)) def fail(message): sys.stderr.write(message+"\n") sys.exit(1) if __name__ == '__main__': sys.exit(main(sys.argv))