Mercurial > repos > miller-lab > genome_diversity
diff genome_diversity.py @ 0:2c498d40ecde
Uploaded
author | miller-lab |
---|---|
date | Mon, 09 Apr 2012 12:03:06 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/genome_diversity.py Mon Apr 09 12:03:06 2012 -0400 @@ -0,0 +1,266 @@ +#!/usr/bin/env python + +import sys +import cdblib + +def _openfile( filename=None, mode='r' ): + try: + fh = open( filename, mode ) + except IOError, err: + raise RuntimeError( "can't open file: %s\n" % str( err ) ) + return fh + +def get_filename_from_loc( species=None, filename=None ): + fh = _openfile( filename ) + for line in fh: + if line and not line.startswith( '#' ): + line = line.rstrip( '\r\n' ) + if line: + elems = line.split( '\t' ) + if len( elems ) >= 2 and elems[0] == species: + return elems[1] + + raise RuntimeError( "can't find '%s' in location file: %s\n" % ( species, filename ) ) + + +class SnpFile( object ): + def __init__( self, filename=None, seq_col=1, pos_col=2, ref_seq_col=7, ref_pos_col=8 ): + self.filename = filename + self.fh = _openfile( filename ) + self.seq_col = seq_col + self.pos_col = pos_col + self.ref_seq_col = ref_seq_col + self.ref_pos_col = ref_pos_col + self.elems = None + self.line = None + self.comments = [] + + def next( self ): + while self.fh: + try: + self.line = self.fh.next() + except StopIteration: + self.line = None + self.elems = None + return None + if self.line: + self.line = self.line.rstrip( '\r\n' ) + if self.line: + if self.line.startswith( '#' ): + self.comments.append( self.line ) + else: + self.elems = self.line.split( '\t' ) + return 1 + + def get_seq_pos( self ): + if self.elems: + return self.elems[ self.seq_col - 1 ], self.elems[ self.pos_col - 1 ] + else: + return None, None + + def get_ref_seq_pos( self ): + if self.elems: + return self.elems[ self.ref_seq_seq - 1 ], self.elems[ self.ref_pos_col - 1 ] + else: + return None, None + + +class IndexedFile( object ): + + def __init__( self, data_file=None, index_file=None ): + self.data_file = data_file + self.index_file = index_file + self.data_fh = _openfile( data_file ) + self.index_fh = _openfile( index_file ) + self._reader = cdblib.Reader( self.index_fh.read(), hash ) + + def get_indexed_line( self, key=None ): + line = None + if key in self._reader: + offset = self._reader.getint( key ) + self.data_fh.seek( offset ) + try: + line = self.data_fh.next() + except StopIteration: + raise RuntimeError( 'index file out of sync for %s' % key ) + return line + +class PrimersFile( IndexedFile ): + def get_primer_header( self, sequence=None, position=None ): + key = "%s %s" % ( str( sequence ), str( position ) ) + header = self.get_indexed_line( key ) + if header: + if header.startswith( '>' ): + elems = header.split() + if len( elems ) < 3: + raise RuntimeError( 'short primers header for %s' % key ) + if sequence != elems[1] or str( position ) != elems[2]: + raise RuntimeError( 'primers index for %s finds %s %s' % ( key, elems[1], elems[2] ) ) + else: + raise RuntimeError( 'primers index out of sync for %s' % key ) + return header + + def get_entry( self, sequence=None, position=None ): + entry = self.get_primer_header( sequence, position ) + if entry: + while self.data_fh: + try: + line = self.data_fh.next() + except StopIteration: + break + if line.startswith( '>' ): + break + entry += line + return entry + + def get_enzymes( self, sequence=None, position=None ): + entry = self.get_primer_header( sequence, position ) + enzyme_list = [] + if entry: + try: + line = self.data_fh.next() + except StopIteration: + raise RuntimeError( 'primers entry for %s %s is truncated' % ( str( sequence ), str( position ) ) ) + if line.startswith( '>' ): + raise RuntimeError( 'primers entry for %s %s is truncated' % ( str( sequence ), str( position ) ) ) + line.rstrip( '\r\n' ) + if line: + enzymes = line.split( ',' ) + for enzyme in enzymes: + enzyme = enzyme.strip() + if enzyme: + enzyme_list.append( enzyme ) + return enzyme_list + +class SnpcallsFile( IndexedFile ): + def get_snp_seq( self, sequence=None, position=None ): + key = "%s %s" % ( str( sequence ), str( position ) ) + line = self.get_indexed_line( key ) + if line: + elems = line.split( '\t' ) + if len (elems) < 3: + raise RuntimeError( 'short snpcalls line for %s' % key ) + if sequence != elems[0] or str( position ) != elems[1]: + raise RuntimeError( 'snpcalls index for %s finds %s %s' % ( key, elems[0], elems[1] ) ) + return elems[2] + else: + return None + + def get_flanking_dna( self, sequence=None, position=None, format='fasta' ): + if format != 'fasta' and format != 'primer3': + raise RuntimeError( 'invalid format for flanking dna: %s' % str( format ) ) + seq = self.get_snp_seq( sequence, position ) + if seq: + p = seq.find('[') + if p == -1: + raise RuntimeError( 'snpcalls entry for %s %s missing left bracket: %s' % ( str( sequence ), str( position ), seq ) ) + q = seq.find(']', p + 1) + if q == -1: + raise RuntimeError( 'snpcalls entry for %s %s missing right bracket: %s' % ( str( sequence ), str( position ), seq ) ) + q += 1 + + if format == 'fasta': + flanking_seq = '> ' + else: + flanking_seq = 'SEQUENCE_ID=' + + flanking_seq += "%s %s %s %s\n" % ( str( sequence ), str( position ), seq[p+1], seq[p+3] ) + + if format == 'primer3': + flanking_seq += 'SEQUENCE_TEMPLATE=' + + flanking_seq += "%sn%s\n" % ( seq[0:p], seq[q:] ) + + if format == 'primer3': + flanking_seq += "SEQUENCE_TARGET=%d,11\n=\n" % ( p - 5 ) + + return flanking_seq + else: + return None + + + +class LocationFile( object ): + def __init__(self, filename): + self.build_map(filename) + + def build_map(self, filename): + self.map = {} + self.open_file(filename) + for line in self.read_lines(): + elems = line.split('\t', 1) + if len(elems) == 2: + self.map[ elems[0].strip() ] = elems[1].strip() + self.close_file() + + def read_lines(self): + for line in self.fh: + if not line.startswith('#'): + line = line.rstrip('\r\n') + yield line + + def open_file(self, filename): + self.filename = filename + try: + self.fh = open(filename, 'r') + except IOError, err: + print >> sys.stderr, "Error opening location file '%s': %s" % (filename, str(err)) + sys.exit(1) + + def close_file(self): + self.fh.close() + + def loc_file( self, key ): + if key in self.map: + return self.map[key] + else: + print >> sys.stderr, "'%s' does not appear in location file '%s'" % (key, self.filename) + sys.exit(1) + +class ChrLens( object ): + def __init__( self, chrlen_filename ): + self.chrlen_filename = chrlen_filename + self.build_map() + + def build_map(self): + self.map = {} + self.open_file(self.chrlen_filename) + for line in self.read_lines(): + elems = line.split('\t', 1) + if len(elems) == 2: + chrom = elems[0].strip() + chrom_len_text = elems[1].strip() + try: + chrom_len = int( chrom_len_text ) + except ValueError: + print >> sys.stderr, "Bad length '%s' for chromosome '%s' in '%s'" % (chrom_len_text, chrom, self.chrlen_filename) + self.map[ chrom ] = chrom_len + self.close_file() + + def read_lines(self): + for line in self.fh: + if not line.startswith('#'): + line = line.rstrip('\r\n') + yield line + + def open_file(self, filename): + self.filename = filename + try: + self.fh = open(filename, 'r') + except IOError, err: + print >> sys.stderr, "Error opening chromosome length file '%s': %s" % (filename, str(err)) + sys.exit(1) + + def close_file(self): + self.fh.close() + + def length( self, key ): + if key in self.map: + return self.map[key] + else: + return None + + def __iter__( self ): + for chrom in self.map: + yield chrom +