comparison genome_diversity.py @ 0:2c498d40ecde

Uploaded
author miller-lab
date Mon, 09 Apr 2012 12:03:06 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:2c498d40ecde
1 #!/usr/bin/env python
2
3 import sys
4 import cdblib
5
6 def _openfile( filename=None, mode='r' ):
7 try:
8 fh = open( filename, mode )
9 except IOError, err:
10 raise RuntimeError( "can't open file: %s\n" % str( err ) )
11 return fh
12
13 def get_filename_from_loc( species=None, filename=None ):
14 fh = _openfile( filename )
15 for line in fh:
16 if line and not line.startswith( '#' ):
17 line = line.rstrip( '\r\n' )
18 if line:
19 elems = line.split( '\t' )
20 if len( elems ) >= 2 and elems[0] == species:
21 return elems[1]
22
23 raise RuntimeError( "can't find '%s' in location file: %s\n" % ( species, filename ) )
24
25
26 class SnpFile( object ):
27 def __init__( self, filename=None, seq_col=1, pos_col=2, ref_seq_col=7, ref_pos_col=8 ):
28 self.filename = filename
29 self.fh = _openfile( filename )
30 self.seq_col = seq_col
31 self.pos_col = pos_col
32 self.ref_seq_col = ref_seq_col
33 self.ref_pos_col = ref_pos_col
34 self.elems = None
35 self.line = None
36 self.comments = []
37
38 def next( self ):
39 while self.fh:
40 try:
41 self.line = self.fh.next()
42 except StopIteration:
43 self.line = None
44 self.elems = None
45 return None
46 if self.line:
47 self.line = self.line.rstrip( '\r\n' )
48 if self.line:
49 if self.line.startswith( '#' ):
50 self.comments.append( self.line )
51 else:
52 self.elems = self.line.split( '\t' )
53 return 1
54
55 def get_seq_pos( self ):
56 if self.elems:
57 return self.elems[ self.seq_col - 1 ], self.elems[ self.pos_col - 1 ]
58 else:
59 return None, None
60
61 def get_ref_seq_pos( self ):
62 if self.elems:
63 return self.elems[ self.ref_seq_seq - 1 ], self.elems[ self.ref_pos_col - 1 ]
64 else:
65 return None, None
66
67
68 class IndexedFile( object ):
69
70 def __init__( self, data_file=None, index_file=None ):
71 self.data_file = data_file
72 self.index_file = index_file
73 self.data_fh = _openfile( data_file )
74 self.index_fh = _openfile( index_file )
75 self._reader = cdblib.Reader( self.index_fh.read(), hash )
76
77 def get_indexed_line( self, key=None ):
78 line = None
79 if key in self._reader:
80 offset = self._reader.getint( key )
81 self.data_fh.seek( offset )
82 try:
83 line = self.data_fh.next()
84 except StopIteration:
85 raise RuntimeError( 'index file out of sync for %s' % key )
86 return line
87
88 class PrimersFile( IndexedFile ):
89 def get_primer_header( self, sequence=None, position=None ):
90 key = "%s %s" % ( str( sequence ), str( position ) )
91 header = self.get_indexed_line( key )
92 if header:
93 if header.startswith( '>' ):
94 elems = header.split()
95 if len( elems ) < 3:
96 raise RuntimeError( 'short primers header for %s' % key )
97 if sequence != elems[1] or str( position ) != elems[2]:
98 raise RuntimeError( 'primers index for %s finds %s %s' % ( key, elems[1], elems[2] ) )
99 else:
100 raise RuntimeError( 'primers index out of sync for %s' % key )
101 return header
102
103 def get_entry( self, sequence=None, position=None ):
104 entry = self.get_primer_header( sequence, position )
105 if entry:
106 while self.data_fh:
107 try:
108 line = self.data_fh.next()
109 except StopIteration:
110 break
111 if line.startswith( '>' ):
112 break
113 entry += line
114 return entry
115
116 def get_enzymes( self, sequence=None, position=None ):
117 entry = self.get_primer_header( sequence, position )
118 enzyme_list = []
119 if entry:
120 try:
121 line = self.data_fh.next()
122 except StopIteration:
123 raise RuntimeError( 'primers entry for %s %s is truncated' % ( str( sequence ), str( position ) ) )
124 if line.startswith( '>' ):
125 raise RuntimeError( 'primers entry for %s %s is truncated' % ( str( sequence ), str( position ) ) )
126 line.rstrip( '\r\n' )
127 if line:
128 enzymes = line.split( ',' )
129 for enzyme in enzymes:
130 enzyme = enzyme.strip()
131 if enzyme:
132 enzyme_list.append( enzyme )
133 return enzyme_list
134
135 class SnpcallsFile( IndexedFile ):
136 def get_snp_seq( self, sequence=None, position=None ):
137 key = "%s %s" % ( str( sequence ), str( position ) )
138 line = self.get_indexed_line( key )
139 if line:
140 elems = line.split( '\t' )
141 if len (elems) < 3:
142 raise RuntimeError( 'short snpcalls line for %s' % key )
143 if sequence != elems[0] or str( position ) != elems[1]:
144 raise RuntimeError( 'snpcalls index for %s finds %s %s' % ( key, elems[0], elems[1] ) )
145 return elems[2]
146 else:
147 return None
148
149 def get_flanking_dna( self, sequence=None, position=None, format='fasta' ):
150 if format != 'fasta' and format != 'primer3':
151 raise RuntimeError( 'invalid format for flanking dna: %s' % str( format ) )
152 seq = self.get_snp_seq( sequence, position )
153 if seq:
154 p = seq.find('[')
155 if p == -1:
156 raise RuntimeError( 'snpcalls entry for %s %s missing left bracket: %s' % ( str( sequence ), str( position ), seq ) )
157 q = seq.find(']', p + 1)
158 if q == -1:
159 raise RuntimeError( 'snpcalls entry for %s %s missing right bracket: %s' % ( str( sequence ), str( position ), seq ) )
160 q += 1
161
162 if format == 'fasta':
163 flanking_seq = '> '
164 else:
165 flanking_seq = 'SEQUENCE_ID='
166
167 flanking_seq += "%s %s %s %s\n" % ( str( sequence ), str( position ), seq[p+1], seq[p+3] )
168
169 if format == 'primer3':
170 flanking_seq += 'SEQUENCE_TEMPLATE='
171
172 flanking_seq += "%sn%s\n" % ( seq[0:p], seq[q:] )
173
174 if format == 'primer3':
175 flanking_seq += "SEQUENCE_TARGET=%d,11\n=\n" % ( p - 5 )
176
177 return flanking_seq
178 else:
179 return None
180
181
182
183 class LocationFile( object ):
184 def __init__(self, filename):
185 self.build_map(filename)
186
187 def build_map(self, filename):
188 self.map = {}
189 self.open_file(filename)
190 for line in self.read_lines():
191 elems = line.split('\t', 1)
192 if len(elems) == 2:
193 self.map[ elems[0].strip() ] = elems[1].strip()
194 self.close_file()
195
196 def read_lines(self):
197 for line in self.fh:
198 if not line.startswith('#'):
199 line = line.rstrip('\r\n')
200 yield line
201
202 def open_file(self, filename):
203 self.filename = filename
204 try:
205 self.fh = open(filename, 'r')
206 except IOError, err:
207 print >> sys.stderr, "Error opening location file '%s': %s" % (filename, str(err))
208 sys.exit(1)
209
210 def close_file(self):
211 self.fh.close()
212
213 def loc_file( self, key ):
214 if key in self.map:
215 return self.map[key]
216 else:
217 print >> sys.stderr, "'%s' does not appear in location file '%s'" % (key, self.filename)
218 sys.exit(1)
219
220 class ChrLens( object ):
221 def __init__( self, chrlen_filename ):
222 self.chrlen_filename = chrlen_filename
223 self.build_map()
224
225 def build_map(self):
226 self.map = {}
227 self.open_file(self.chrlen_filename)
228 for line in self.read_lines():
229 elems = line.split('\t', 1)
230 if len(elems) == 2:
231 chrom = elems[0].strip()
232 chrom_len_text = elems[1].strip()
233 try:
234 chrom_len = int( chrom_len_text )
235 except ValueError:
236 print >> sys.stderr, "Bad length '%s' for chromosome '%s' in '%s'" % (chrom_len_text, chrom, self.chrlen_filename)
237 self.map[ chrom ] = chrom_len
238 self.close_file()
239
240 def read_lines(self):
241 for line in self.fh:
242 if not line.startswith('#'):
243 line = line.rstrip('\r\n')
244 yield line
245
246 def open_file(self, filename):
247 self.filename = filename
248 try:
249 self.fh = open(filename, 'r')
250 except IOError, err:
251 print >> sys.stderr, "Error opening chromosome length file '%s': %s" % (filename, str(err))
252 sys.exit(1)
253
254 def close_file(self):
255 self.fh.close()
256
257 def length( self, key ):
258 if key in self.map:
259 return self.map[key]
260 else:
261 return None
262
263 def __iter__( self ):
264 for chrom in self.map:
265 yield chrom
266