diff getreads.py @ 1:464aee13e2df draft default tip

"planemo upload commit 8e52aac4afce4ab7c4d244e2b70f205f70c16749-dirty"
author nick
date Fri, 27 May 2022 23:29:45 +0000
parents 7f170cb06e2e
children
line wrap: on
line diff
--- a/getreads.py	Tue Dec 01 21:33:27 2015 -0500
+++ b/getreads.py	Fri May 27 23:29:45 2022 +0000
@@ -1,3 +1,9 @@
+#!/usr/bin/env python3
+import argparse
+import logging
+import os
+import sys
+import types
 """A simple parser for FASTA, FASTQ, SAM, etc. Create generators that just return the read name and
 sequence.
 All format parsers follow this API:
@@ -12,18 +18,42 @@
   qual: The quality scores (unless the format is FASTA).
 """
 
+# Available formats.
+FORMATS = ('fasta', 'fastq', 'sam', 'tsv', 'lines')
 
-def getparser(filehandle, filetype='fasta'):
+QUAL_OFFSETS = {'sanger':33, 'solexa':64}
+
+
+def getparser(input, filetype, qual_format='sanger', name_col=1, seq_col=2, qual_col=3):
+  # Detect whether the input is an open file or a path.
+  # Return the appropriate reader.
   if filetype == 'fasta':
-    return FastaReader(filehandle)
+    return FastaReader(input)
   elif filetype == 'fastq':
-    return FastqReader(filehandle)
+    return FastqReader(input, qual_format=qual_format)
   elif filetype == 'sam':
-    return SamReader(filehandle)
+    return SamReader(input, qual_format=qual_format)
   elif filetype == 'tsv':
-    return TsvReader(filehandle)
+    return TsvReader(input, qual_format=qual_format,
+                     name_col=name_col, seq_col=seq_col, qual_col=qual_col)
+  elif filetype == 'lines':
+    return LineReader(input)
   else:
-    raise ValueError('Illegal argument: filetype=\''+filetype+'\'')
+    raise ValueError('Unrecognized format: {!r}'.format(filetype))
+
+
+def detect_input_type(obj):
+  """Is this an open filehandle, or is it a file path (string)?"""
+  try:
+    os.path.isfile(obj)
+    return 'path'
+  except TypeError:
+    if isinstance(obj, types.GeneratorType):
+      return 'generator'
+    elif hasattr(obj, 'read') and hasattr(obj, 'close'):
+      return 'file'
+    else:
+      return None
 
 
 class FormatError(Exception):
@@ -33,19 +63,79 @@
 
 
 class Read(object):
-  def __init__(self, name='', seq='', id_='', qual=''):
+  def __init__(self, name='', seq='', id_='', qual='', qual_format='sanger'):
     self.name = name
     self.seq = seq
-    self.id = id_
     self.qual = qual
+    if id_ or not self.name:
+      self.id = id_
+    elif self.name:
+      self.id = self.name.split()[0]
+    self.offset = QUAL_OFFSETS[qual_format]
+  @property
+  def scores(self):
+    if self.qual is None:
+      return None
+    scores = []
+    for qual_char in self.qual:
+      scores.append(ord(qual_char) - self.offset)
+    return scores
+  def to_fasta(self):
+    return f'>{self.name}\n{self.seq}'
+  def to_fastq(self):
+    return f'@{self.name}\n{self.seq}\n+\n{self.qual}'
+  def __str__(self):
+    if self.qual:
+      return self.to_fastq()
+    else:
+      return self.to_fasta()
+  def __repr__(self):
+    kwarg_strs = []
+    for kwarg in 'name', 'seq', 'id_', 'qual':
+      attr = kwarg.rstrip('_')
+      raw_value = getattr(self, attr)
+      if raw_value is not None and len(raw_value) >= 200:
+        value = raw_value[:199]+'…'
+      else:
+        value = raw_value
+      if value:
+        kwarg_strs.append(f'{kwarg}={value!r}')
+    return type(self).__name__+'('+', '.join(kwarg_strs)+')'
 
 
 class Reader(object):
   """Base class for all other parsers."""
-  def __init__(self, filehandle):
-    self.filehandle = filehandle
+  def __init__(self, input, **kwargs):
+    self.input = input
+    self.input_type = detect_input_type(input)
+    if self.input_type not in ('path', 'file', 'generator'):
+      raise ValueError('Input object {!r} not a file, string, or generator.'.format(input))
+    for key, value in kwargs.items():
+      setattr(self, key, value)
   def __iter__(self):
     return self.parser()
+  def bases(self):
+    for read in self.parser():
+      for base in read.seq:
+        yield base
+  def get_input_iterator(self):
+    if self.input_type == 'path':
+      return open(self.input)
+    else:
+      return self.input
+
+
+class LineReader(Reader):
+  """A parser for the simplest format: Only the sequence, one line per read."""
+  def parser(self):
+    input_iterator = self.get_input_iterator()
+    try:
+      for line in input_iterator:
+        read = Read(seq=line.rstrip('\r\n'))
+        yield read
+    finally:
+      if self.input_type == 'path':
+        input_iterator.close()
 
 
 class TsvReader(Reader):
@@ -54,18 +144,26 @@
   Column 2: sequence
   Column 3: quality scores (optional)"""
   def parser(self):
-    for line in self.filehandle:
-      fields = line.rstrip('\r\n').split('\t')
-      if len(fields) < 2:
-        continue
-      read = Read()
-      read.name = fields[0]
-      if read.name:
-        read.id = read.name.split()[0]
-      read.seq = fields[1]
-      if len(fields) >= 3:
-        read.qual = fields[2]
-      yield read
+    min_fields = max(self.name_col, self.seq_col)
+    input_iterator = self.get_input_iterator()
+    try:
+      for line in input_iterator:
+        fields = line.rstrip('\r\n').split('\t')
+        if len(fields) < min_fields:
+          continue
+        read = Read(qual_format=self.qual_format)
+        read.name = fields[self.name_col-1]
+        if read.name:
+          read.id = read.name.split()[0]
+        read.seq = fields[self.seq_col-1]
+        try:
+          read.qual = fields[self.qual_col-1]
+        except (TypeError, IndexError):
+          pass
+        yield read
+    finally:
+      if self.input_type == 'path':
+        input_iterator.close()
 
 
 class SamReader(Reader):
@@ -75,82 +173,148 @@
   All alignment lines have 11 or more fields. Other lines will be skipped.
   """
   def parser(self):
-    for line in self.filehandle:
-      fields = line.split('\t')
-      if len(fields) < 11:
-        continue
-      # Skip headers.
-      if fields[0].startswith('@') and len(fields[0]) == 3:
-        continue
-      read = Read()
-      read.name = fields[0]
-      if read.name:
-        read.id = read.name.split()[0]
-      read.seq = fields[9]
-      read.qual = fields[10].rstrip('\r\n')
-      yield read
+    input_iterator = self.get_input_iterator()
+    try:
+      for line in input_iterator:
+        fields = line.split('\t')
+        if len(fields) < 11:
+          continue
+        # Skip headers.
+        if fields[0].startswith('@') and len(fields[0]) == 3:
+          continue
+        read = Read(qual_format=self.qual_format)
+        read.name = fields[0]
+        if read.name:
+          read.id = read.name.split()[0]
+        read.seq = fields[9]
+        read.qual = fields[10].rstrip('\r\n')
+        yield read
+    finally:
+      if self.input_type == 'path':
+        input_iterator.close()
 
 
 class FastaReader(Reader):
   """A simple FASTA parser that reads one sequence at a time into memory."""
   def parser(self):
-    read = Read()
-    while True:
-      line_raw = self.filehandle.readline()
-      if not line_raw:
-        if read.seq:
-          yield read
-        raise StopIteration
-      line = line_raw.strip()
-      # Allow empty lines.
-      if not line:
-        continue
-      if line.startswith('>'):
-        if read.seq:
-          yield read
-        read = Read()
-        read.name = line[1:]  # remove ">"
-        if read.name:
-          read.id = read.name.split()[0]
-        continue
-      else:
-        read.seq += line
+    input_iterator = self.get_input_iterator()
+    try:
+      read = None
+      while True:
+        try:
+          line_raw = next(input_iterator)
+        except StopIteration:
+          if read is not None:
+            yield read
+          return
+        line = line_raw.rstrip('\r\n')
+        if line.startswith('>'):
+          if read is not None:
+            yield read
+          read = Read()
+          read.name = line[1:]  # remove ">"
+          if read.name:
+            read.id = read.name.split()[0]
+          continue
+        else:
+          read.seq += line
+    finally:
+      if self.input_type == 'path':
+        input_iterator.close()
 
 
 class FastqReader(Reader):
   """A simple FASTQ parser. Can handle multi-line sequences, though."""
   def parser(self):
-    read = Read()
-    state = 'header'
-    while True:
-      line_raw = self.filehandle.readline()
-      if not line_raw:
-        if read.seq:
-          yield read
-        raise StopIteration
-      line = line_raw.strip()
-      # Allow empty lines.
-      if not line:
-        continue
-      if state == 'header':
-        if not line.startswith('@'):
-          raise FormatError('line state = "header" but line does not start with "@"')
-        if read.seq:
-          yield read
-        read = Read()
-        read.name = line[1:]  # remove '@'
-        if read.name:
-          read.id = read.name.split()[0]
-        state = 'sequence'
-      elif state == 'sequence':
-        if line.startswith('+'):
-          state = 'plus'
-        else:
-          read.seq += line
-      elif state == 'plus' or state == 'quality':
-        state = 'quality'
-        togo = len(read.seq) - len(read.qual)
-        read.qual += line[:togo]
-        # The end of the quality lines is when we have a quality string as long as the sequence.
-        if len(read.qual) >= len(read.seq):
-          state = 'header'
+    input_iterator = self.get_input_iterator()
+    try:
+      read = None
+      line_num = 0
+      state = 'header'
+      while True:
+        try:
+          line_raw = next(input_iterator)
+        except StopIteration:
+          if read is not None:
+            yield read
+          return
+        line_num += 1
+        line = line_raw.rstrip('\r\n')
+        if state == 'header':
+          if not line.startswith('@'):
+            if line:
+              raise FormatError('line state = "header" but line does not start with "@":\n'+line)
+            else:
+              # Allow empty lines.
+              continue
+          if read is not None:
+            yield read
+          read = Read(qual_format=self.qual_format)
+          read.name = line[1:]  # remove '@'
+          if read.name:
+            read.id = read.name.split()[0]
+          state = 'sequence'
+        elif state == 'sequence':
+          if line.startswith('+'):
+            state = 'plus'
+          else:
+            read.seq += line
+        elif state == 'plus' or state == 'quality':
+          if line.startswith('@') and state == 'quality':
+            logging.warning('Looking for more quality scores but line starts with "@". This might '
+                            'be a header line and there were fewer quality scores than bases: {}'
+                            .format(line[:69]))
+          state = 'quality'
+          togo = len(read.seq) - len(read.qual)
+          read.qual += line[:togo]
+          # The end of the quality lines is when we have a quality string as long as the sequence.
+          if len(read.qual) >= len(read.seq):
+            state = 'header'
+    finally:
+      if self.input_type == 'path':
+        input_iterator.close()
+
+
+DESCRIPTION = 'Test parser by parsing an input file and printing its contents.'
+
+
+def make_argparser():
+  parser = argparse.ArgumentParser(description=DESCRIPTION)
+  parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin,
+    help='Input reads.')
+  parser.add_argument('-f', '--format', choices=('fasta', 'fastq', 'sam', 'tsv', 'lines'),
+    help='Input read format. Will be detected from the filename, if given.')
+  return parser
+
+
+def main(argv):
+  parser = make_argparser()
+  args = parser.parse_args(argv[1:])
+  if args.format:
+    format = args.format
+  elif args.infile is sys.stdin:
+    fail('Error: Must give a --format if reading from stdin.')
+  else:
+    ext = os.path.splitext(args.infile.name)[1]
+    if ext == '.fq':
+      format = 'fastq'
+    elif ext == '.fa':
+      format = 'fasta'
+    elif ext == '.txt':
+      format = 'lines'
+    else:
+      format = ext[1:]
+  print('Reading input as format {!r}.'.format(format))
+  for i, read in enumerate(getparser(args.infile, filetype=format)):
+    print('Read {} id/name: {!r}/{!r}'.format(i+1, read.id, read.name))
+    print('Read {} seq:  {!r}'.format(i+1, read.seq))
+    print('Read {} qual: {!r}'.format(i+1, read.qual))
+
+
+def fail(message):
+  sys.stderr.write(message+"\n")
+  sys.exit(1)
+
+
+if __name__ == '__main__':
+  sys.exit(main(sys.argv))