Mercurial > repos > xuebing > sharplabtool
comparison tools/mytools/sequence.py @ 0:9071e359b9a3
Uploaded
| author | xuebing | 
|---|---|
| date | Fri, 09 Mar 2012 19:37:19 -0500 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:9071e359b9a3 | 
|---|---|
| 1 #!@WHICHPYTHON@ | |
| 2 | |
| 3 import copy, string, sys | |
| 4 | |
| 5 #------------------ Alphabet ------------------- | |
| 6 | |
| 7 class Alphabet(object): | |
| 8 """Biological alphabet class. | |
| 9 This defines the set of symbols from which various objects can be built, e.g. sequences and motifs. | |
| 10 The symbol set is immutable and accessed as a tuple. | |
| 11 symstr: symbols in alphabet as either a tuple or string | |
| 12 complement: dictionary defining letters and their complement | |
| 13 """ | |
| 14 def __init__(self, symstr, complement = None): | |
| 15 """Construct an alphabet from a string or tuple of characters. | |
| 16 Lower case characters will be converted to upper case. | |
| 17 An optional mapping for complements may be provided. | |
| 18 Example: | |
| 19 >>> alpha = sequence.Alphabet('ACGTttga', {'A':'C', 'G':'T'}) | |
| 20 >>> alpha.getSymbols() | |
| 21 will construct the DNA alphabet and output: | |
| 22 ('A', 'C', 'G', 'T') | |
| 23 """ | |
| 24 symlst = [] | |
| 25 for s in [str(sym).upper()[0] for sym in symstr]: | |
| 26 if not s in symlst: | |
| 27 symlst.append(s) | |
| 28 self.symbols = tuple(symlst) | |
| 29 if complement != None: | |
| 30 # expand the mapping and check for contradictions | |
| 31 cmap = {} | |
| 32 for s in self.symbols: | |
| 33 c = complement.get(s, None) | |
| 34 if c != None: | |
| 35 if s in cmap and cmap[s] != c: | |
| 36 raise RuntimeError("Alphabet complement map " | |
| 37 "contains contradictory mapping") | |
| 38 cmap[s] = c | |
| 39 cmap[c] = s | |
| 40 # replace mapping with indicies | |
| 41 cimap = {} | |
| 42 for idx in range (len(self.symbols)): | |
| 43 s = self.symbols[idx] | |
| 44 if s in cmap: | |
| 45 cimap[cmap[s]] = idx | |
| 46 # create tuple | |
| 47 cidxlst = [] | |
| 48 for idx in range (len(self.symbols)): | |
| 49 cidxlst.append(cimap.get(self.symbols[idx], None)) | |
| 50 self.complements = tuple(cidxlst) | |
| 51 else: | |
| 52 self.complements = None | |
| 53 | |
| 54 def getSymbols(self): | |
| 55 """Retrieve a tuple with all symbols, immutable membership and order""" | |
| 56 return self.symbols | |
| 57 | |
| 58 def getComplements(self): | |
| 59 """Retrieve a tuple with all complement indicies, immutable""" | |
| 60 return self.complements | |
| 61 | |
| 62 def isValidSymbol(self, sym): | |
| 63 """Check if the symbol is a member of alphabet""" | |
| 64 return any([s==sym for s in self.symbols]) | |
| 65 | |
| 66 def getIndex(self, sym): | |
| 67 """Retrieve the index of the symbol (immutable)""" | |
| 68 for idx in range (len(self.symbols)): | |
| 69 if self.symbols[idx] == sym: | |
| 70 return idx | |
| 71 raise RuntimeError("Symbol " + sym + " does not exist in alphabet") | |
| 72 | |
| 73 def isComplementable(self): | |
| 74 return self.complements != None | |
| 75 | |
| 76 def getComplement(self, sym): | |
| 77 """Retrieve the complement of the symbol (immutable)""" | |
| 78 return self.symbols[self.complements[self.getIndex(sym)]]; | |
| 79 | |
| 80 def isValidString(self, symstr): | |
| 81 """Check if the string contains only symbols that belong to the alphabet""" | |
| 82 found = True | |
| 83 for sym in symstr: | |
| 84 if self.isValidSymbol(sym) == False: | |
| 85 return False | |
| 86 return True | |
| 87 | |
| 88 def getLen(self): | |
| 89 """Retrieve the number of symbols in (the length of) the alphabet""" | |
| 90 return len(self.symbols) | |
| 91 | |
| 92 # pre-defined alphabets that can be specified by their name | |
| 93 predefAlphabets = [ | |
| 94 ("DNA" , Alphabet('ACGT', {'A':'T', 'G':'C'})), | |
| 95 ("RNA" , Alphabet('ACGU')), | |
| 96 ("Extended DNA" , Alphabet('ACGTYRN')), | |
| 97 ("Protein" , Alphabet('ACDEFGHIKLMNPQRSTVWY')), | |
| 98 ("Extended Protein" , Alphabet('ACDEFGHIKLMNPQRSTVWYX')), | |
| 99 ("TM Labels" , Alphabet('MIO')) | |
| 100 ] | |
| 101 | |
| 102 def getAlphabet(name): | |
| 103 """Retrieve a pre-defined alphabet by name. | |
| 104 Currently, "Protein", "DNA", "RNA", "Extended DNA", "Extended Protein" and "TM Labels" are available. | |
| 105 Example: | |
| 106 >>> alpha = sequence.getAlphabet('Protein') | |
| 107 >>> alpha.getSymbols() | |
| 108 will retrieve the 20 amino acid alphabet and output the tuple: | |
| 109 ('A', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'V', 'W', 'Y') | |
| 110 """ | |
| 111 for (xname, xalpha) in predefAlphabets: | |
| 112 if xname == name: | |
| 113 return xalpha | |
| 114 return None | |
| 115 | |
| 116 #------------------ Sequence ------------------- | |
| 117 | |
| 118 class Sequence(object): | |
| 119 """Biological sequence class. Sequence data is immutable. | |
| 120 | |
| 121 data: the sequence data as a tuple or string | |
| 122 alpha: the alphabet from which symbols are taken | |
| 123 name: the sequence name, if any | |
| 124 info: can contain additional sequence information apart from the name | |
| 125 """ | |
| 126 def __init__(self, sequence, alpha = None, name = "", seqinfo = ""): | |
| 127 """Create a sequence with sequence data. | |
| 128 Specifying the alphabet is optional, so is the name and info. | |
| 129 Example: | |
| 130 >>> myseq = sequence.Sequence('MVSAKKVPAIAMSFGVSF') | |
| 131 will create a sequence with name "", and assign one of the predefined alphabets on basis of what symbols were used. | |
| 132 >>> myseq.getAlphabet().getSymbols() | |
| 133 will most likely output the standard protein alphabet: | |
| 134 ('A', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'V', 'W', 'Y') | |
| 135 """ | |
| 136 if type(sequence) is str: | |
| 137 self.data = tuple(sequence.upper()) | |
| 138 elif type(sequence) is tuple: | |
| 139 self.data = sequence | |
| 140 elif type(sequence) is list: | |
| 141 self.data = tuple([s.upper() for s in sequence]) | |
| 142 else: | |
| 143 raise RuntimeError("Sequence data is not specified correctly: must be string or tuple") | |
| 144 # Resolve choice of alphabet | |
| 145 validAlphabet = False | |
| 146 if alpha == None: # Alphabet is not set, attempt to set it automatically... | |
| 147 for (xname, xalpha) in predefAlphabets: # Iterate through each predefined alphabet, in order | |
| 148 if xalpha.isValidString( self.data ): # This alphabet works, go with it | |
| 149 self.alpha = alpha = xalpha | |
| 150 validAlphabet = True | |
| 151 break | |
| 152 self.name = name | |
| 153 self.info = seqinfo | |
| 154 if validAlphabet == False: # we were either unsuccessful above or the alphabet was specified so test it | |
| 155 if type(alpha) is str: # check if name is a predefined alphabet | |
| 156 for (xname, xalpha) in predefAlphabets: # Iterate through each predefined alphabet, check for name | |
| 157 if (xname == alpha): | |
| 158 alpha = xalpha | |
| 159 break | |
| 160 if type(alpha) is Alphabet: # the alphabet is specified | |
| 161 if alpha.isValidString(self.data) == False: | |
| 162 raise RuntimeError("Invalid alphabet specified: "+"".join(alpha.getSymbols())+" is not compatible with sequence '"+"".join(self.data)+"'") | |
| 163 else: | |
| 164 self.alpha = alpha | |
| 165 else: | |
| 166 raise RuntimeError("Could not identify alphabet from sequence") | |
| 167 | |
| 168 #basic getters and setters for the class | |
| 169 def getName(self): | |
| 170 """Get the name of the sequence""" | |
| 171 return self.name | |
| 172 def getInfo(self): | |
| 173 """Get additional info of the sequence (e.g. from the defline in a FASTA file)""" | |
| 174 return self.info | |
| 175 def getAlphabet(self): | |
| 176 """Retrieve the alphabet that is assigned to this sequence""" | |
| 177 return self.alpha | |
| 178 def setName(self, name): | |
| 179 """Change the name of the sequence""" | |
| 180 self.name = name | |
| 181 def setAlphabet(self, alpha): | |
| 182 """Set the alphabet, throws an exception if it is not compatible with the sequence data""" | |
| 183 if type(alpha) is Alphabet: | |
| 184 if alpha.isValid( sequence ) == False: | |
| 185 raise RuntimeError( "Invalid alphabet specified" ) | |
| 186 #sequence functions | |
| 187 def getSequence(self): | |
| 188 """Retrieve the sequence data (a tuple of symbols)""" | |
| 189 return self.data | |
| 190 def getString(self): | |
| 191 """Retrieve the sequence data as a string (copy of actual data)""" | |
| 192 return "".join(self.data) | |
| 193 def getLen(self): | |
| 194 """Get the length of the sequence (number of symbols)""" | |
| 195 return len(self.data) | |
| 196 def getSite(self, position, length = 1): | |
| 197 """Retrieve a site in the sequence of desired length. | |
| 198 Note that positions go from 0 to length-1, and that if the requested site | |
| 199 extends beyond those the method throws an exception. | |
| 200 """ | |
| 201 if position >= 0 and position <= self.getLen() - length: | |
| 202 if length == 1: | |
| 203 return self.data[position] | |
| 204 else: | |
| 205 return self.data[position:position+length] | |
| 206 else: | |
| 207 raise RuntimeError( "Attempt to access invalid position in sequence "+self.getName() ) | |
| 208 | |
| 209 def nice(self): | |
| 210 """ A short description of the sequence """ | |
| 211 print self.getName(), ":", self.getLen() | |
| 212 | |
| 213 def readStrings(filename): | |
| 214 """ Read one or more lines of text from a file--for example an alignment. | |
| 215 Return as a list of strings. | |
| 216 filename: name of file | |
| 217 """ | |
| 218 txtlist = [] | |
| 219 f = open(filename) | |
| 220 for line in f.readlines(): | |
| 221 txtlist.extend(line.split()) | |
| 222 return txtlist | |
| 223 | |
| 224 def readFASTA(filename, alpha = None): | |
| 225 """ Read one or more sequences from a file in FASTA format. | |
| 226 filename: name of file to load sequences from | |
| 227 alpha: alphabet that is used (if left unspecified, an attempt is made to identify the alphabet for each individual sequence) | |
| 228 """ | |
| 229 seqlist = [] | |
| 230 seqname = None | |
| 231 seqinfo = None | |
| 232 seqdata = [] | |
| 233 fh = open(filename) | |
| 234 thisline = fh.readline() | |
| 235 while (thisline): | |
| 236 if (thisline[0] == '>'): # new sequence | |
| 237 if (seqname): # take care of the data that is already in the buffer before processing the new sequence | |
| 238 try: | |
| 239 seqnew = Sequence(seqdata, alpha, seqname, seqinfo) | |
| 240 seqlist.append(seqnew) | |
| 241 except RuntimeError, e: | |
| 242 print >> sys.stderr, "Warning: "+seqname+" is invalid (ignored): ", e | |
| 243 seqinfo = thisline[1:-1] # everything on the defline is "info" | |
| 244 seqname = seqinfo.split()[0] # up to first space | |
| 245 seqdata = [] | |
| 246 else: # pull out the sequence data | |
| 247 cleanline = thisline.split() | |
| 248 for line in cleanline: | |
| 249 seqdata.extend(tuple(line.strip('*'))) # sometimes a line ends with an asterisk in FASTA files | |
| 250 thisline = fh.readline() | |
| 251 | |
| 252 if (seqname): | |
| 253 try: | |
| 254 seqnew = Sequence(seqdata, alpha, seqname, seqinfo) | |
| 255 seqlist.append(seqnew) | |
| 256 except RuntimeError, e: | |
| 257 print >> sys.stderr, "Warning: " + seqname + " is invalid (ignored): ", e | |
| 258 else: | |
| 259 raise RuntimeError("No sequences on FASTA format found in this file") | |
| 260 fh.close() | |
| 261 return seqlist | |
| 262 | |
| 263 def _writeOneFASTA(sequence, filehandle): | |
| 264 """Write one sequence in FASTA format to an already open file""" | |
| 265 filehandle.write(">" + sequence.getName()+"\n") | |
| 266 data = sequence.getSequence() | |
| 267 lines = ( sequence.getLen() - 1) / 60 + 1 | |
| 268 for i in range(lines): | |
| 269 #note: python lets us get the last line (var length) free | |
| 270 #lineofseq = data[i*60 : (i+1)*60] + "\n" | |
| 271 lineofseq = "".join(data[i*60 : (i+1)*60]) + "\n" | |
| 272 filehandle.write(lineofseq) | |
| 273 | |
| 274 def writeFASTA(sequence, filename): | |
| 275 """Write a list (or a single) of sequences to a file in the FASTA format""" | |
| 276 fh = open(filename, "w") | |
| 277 if isinstance(sequence, Sequence): | |
| 278 _writeOneFASTA(sequence, fh) | |
| 279 else: | |
| 280 for seq in sequence: | |
| 281 if isinstance(seq, Sequence): | |
| 282 _writeOneFASTA(seq, fh) | |
| 283 else: | |
| 284 print >> sys.stderr, "Warning: could not write " + seq.getName() + " (ignored)." | |
| 285 fh.flush() | |
| 286 fh.close() | |
| 287 | |
| 288 #------------------ Distrib ------------------- | |
| 289 | |
| 290 class Distrib(object): | |
| 291 """Class for storing a multinomial probability distribution over the symbols in an alphabet""" | |
| 292 def __init__(self, alpha, pseudo_count = 0.0): | |
| 293 self.alpha = alpha | |
| 294 self.tot = pseudo_count * self.alpha.getLen() | |
| 295 self.cnt = [pseudo_count for _ in range( self.alpha.getLen() )] | |
| 296 | |
| 297 def __deepcopy__(self, memo): | |
| 298 dup = Distrib(self.alpha) | |
| 299 dup.tot = copy.deepcopy(self.tot, memo) | |
| 300 dup.cnt = copy.deepcopy(self.cnt, memo) | |
| 301 return dup | |
| 302 | |
| 303 def count(self, syms = None ): | |
| 304 """Count an observation of a symbol""" | |
| 305 if syms == None: | |
| 306 syms = self.alpha.getSymbols() | |
| 307 for sym in syms: | |
| 308 idx = self.alpha.getIndex( sym ) | |
| 309 self.cnt[idx] += 1.0 | |
| 310 self.tot += 1 | |
| 311 | |
| 312 def complement(self): | |
| 313 """Complement the counts, throw an error if this is impossible""" | |
| 314 if not self.alpha.isComplementable(): | |
| 315 raise RuntimeError("Attempt to complement a Distrib " | |
| 316 "based on a non-complementable alphabet.") | |
| 317 coms = self.alpha.getComplements() | |
| 318 new_count = [] | |
| 319 for idx in range(len(coms)): | |
| 320 cidx = coms[idx] | |
| 321 if cidx == None: | |
| 322 cidx = idx | |
| 323 new_count.append(self.cnt[cidx]) | |
| 324 self.cnt = new_count | |
| 325 return self | |
| 326 | |
| 327 def reset(self): | |
| 328 """Reset the distribution, that is, restart counting.""" | |
| 329 self.tot = 0 | |
| 330 self.cnt = [0.0 for _ in range( self.alpha.getLen() )] | |
| 331 | |
| 332 def getFreq(self, sym = None): | |
| 333 """Determine the probability distribution from the current counts. | |
| 334 The order in which probabilities are given follow the order of the symbols in the alphabet.""" | |
| 335 if self.tot > 0: | |
| 336 if sym == None: | |
| 337 freq = tuple([ y / self.tot for y in self.cnt ]) | |
| 338 return freq | |
| 339 else: | |
| 340 idx = self.alpha.getIndex( sym ) | |
| 341 return self.cnt[idx] / self.tot | |
| 342 return None | |
| 343 | |
| 344 def pretty(self): | |
| 345 """Retrieve the probabilites for all symbols and return as a pretty table (a list of text strings)""" | |
| 346 table = ["".join(["%4s " % s for s in self.alpha.getSymbols()])] | |
| 347 table.append("".join(["%3.2f " % y for y in Distrib.getFreq(self)])) | |
| 348 return table | |
| 349 | |
| 350 def getSymbols(self): | |
| 351 """Get the symbols in the alphabet in the same order as probabilities are given.""" | |
| 352 return self.alpha.getSymbols() | |
| 353 | |
| 354 def getAlphabet(self): | |
| 355 """Get the alphabet over which the distribution is defined.""" | |
| 356 return self.alpha | |
| 357 | |
| 358 #------------------ Motif (and subclasses) ------------------- | |
| 359 | |
| 360 class Motif(object): | |
| 361 """ Sequence motif class--defining a pattern that can be searched in sequences. | |
| 362 This class is not intended for direct use. Instead use and develop sub-classes (see below). | |
| 363 """ | |
| 364 def __init__(self, alpha): | |
| 365 self.len = 0 | |
| 366 self.alpha = alpha | |
| 367 | |
| 368 def getLen(self): | |
| 369 """Get the length of the motif""" | |
| 370 return self.len | |
| 371 | |
| 372 def getAlphabet(self): | |
| 373 """Get the alphabet that is used in the motif""" | |
| 374 return self.alpha | |
| 375 | |
| 376 def isAlphabet(self, seqstr): | |
| 377 """Check if the sequence can be processed by this motif""" | |
| 378 mystr = seqstr | |
| 379 if type(seqstr) is Sequence: | |
| 380 mystr = seqstr.getString() | |
| 381 return self.getAlphabet().isValidString(mystr) | |
| 382 | |
| 383 import re | |
| 384 | |
| 385 class RegExp(Motif): | |
| 386 """A motif class that defines the pattern in terms of a regular expression""" | |
| 387 def __init__(self, alpha, re_string): | |
| 388 Motif.__init__(self, alpha) | |
| 389 self.pattern = re.compile(re_string) | |
| 390 | |
| 391 def match(self, seq): | |
| 392 """Find matches to the motif in a specified sequence. | |
| 393 The method is a generator, hence subsequent hits can be retrieved using next(). | |
| 394 The returned result is a tuple (position, match-sequence, score), where score is | |
| 395 always 1.0 since a regular expression is either true or false (not returned). | |
| 396 """ | |
| 397 myseq = seq | |
| 398 if not type(seq) is Sequence: | |
| 399 myseq = Sequence(seq, self.alpha) | |
| 400 mystr = myseq.getString() | |
| 401 if not Motif.isAlphabet(self, mystr): | |
| 402 raise RuntimeError("Motif alphabet is not valid for sequence " + myseq.getName()) | |
| 403 for m in re.finditer(self.pattern, mystr): | |
| 404 yield (m.start(), m.group(), 1.0) | |
| 405 | |
| 406 import math, time | |
| 407 | |
| 408 # Variables used by the PWM for creating an EPS file | |
| 409 _colour_def = ( | |
| 410 "/black [0 0 0] def\n" | |
| 411 "/red [0.8 0 0] def\n" | |
| 412 "/green [0 0.5 0] def\n" | |
| 413 "/blue [0 0 0.8] def\n" | |
| 414 "/yellow [1 1 0] def\n" | |
| 415 "/purple [0.8 0 0.8] def\n" | |
| 416 "/magenta [1.0 0 1.0] def\n" | |
| 417 "/cyan [0 1.0 1.0] def\n" | |
| 418 "/pink [1.0 0.8 0.8] def\n" | |
| 419 "/turquoise [0.2 0.9 0.8] def\n" | |
| 420 "/orange [1 0.7 0] def\n" | |
| 421 "/lightred [0.8 0.56 0.56] def\n" | |
| 422 "/lightgreen [0.35 0.5 0.35] def\n" | |
| 423 "/lightblue [0.56 0.56 0.8] def\n" | |
| 424 "/lightyellow [1 1 0.71] def\n" | |
| 425 "/lightpurple [0.8 0.56 0.8] def\n" | |
| 426 "/lightmagenta [1.0 0.7 1.0] def\n" | |
| 427 "/lightcyan [0.7 1.0 1.0] def\n" | |
| 428 "/lightpink [1.0 0.9 0.9] def\n" | |
| 429 "/lightturquoise [0.81 0.9 0.89] def\n" | |
| 430 "/lightorange [1 0.91 0.7] def\n") | |
| 431 _colour_dict = ( | |
| 432 "/fullColourDict <<\n" | |
| 433 " (G) orange\n" | |
| 434 " (T) green\n" | |
| 435 " (C) blue\n" | |
| 436 " (A) red\n" | |
| 437 " (U) green\n" | |
| 438 ">> def\n" | |
| 439 "/mutedColourDict <<\n" | |
| 440 " (G) lightorange\n" | |
| 441 " (T) lightgreen\n" | |
| 442 " (C) lightblue\n" | |
| 443 " (A) lightred\n" | |
| 444 " (U) lightgreen\n" | |
| 445 ">> def\n" | |
| 446 "/colorDict fullColourDict def\n") | |
| 447 | |
| 448 _eps_defaults = { | |
| 449 'LOGOTYPE': 'NA', | |
| 450 'FONTSIZE': '12', | |
| 451 'TITLEFONTSIZE': '12', | |
| 452 'SMALLFONTSIZE': '6', | |
| 453 'TOPMARGIN': '0.9', | |
| 454 'BOTTOMMARGIN': '0.9', | |
| 455 'YAXIS': 'true', | |
| 456 'YAXISLABEL': 'bits', | |
| 457 'XAXISLABEL': '', | |
| 458 'TITLE': '', | |
| 459 'ERRORBARFRACTION': '1.0', | |
| 460 'SHOWINGBOX': 'false', | |
| 461 'BARBITS': '2.0', | |
| 462 'TICBITS': '1', | |
| 463 'COLORDEF': _colour_def, | |
| 464 'COLORDICT': _colour_dict, | |
| 465 'SHOWENDS': 'false', | |
| 466 'NUMBERING': 'true', | |
| 467 'OUTLINE': 'false', | |
| 468 } | |
| 469 class PWM(Motif): | |
| 470 """This motif subclass defines a pattern in terms of a position weight matrix. | |
| 471 An alphabet must be provided. A pseudo-count to be added to each count is | |
| 472 optional. A uniform background distribution is used by default. | |
| 473 """ | |
| 474 def __init__(self, alpha): | |
| 475 Motif.__init__(self, alpha) # set alphabet of this multinomial distribution | |
| 476 self.background = Distrib(alpha) # the default background ... | |
| 477 self.background.count(alpha.getSymbols()) # ... is uniform | |
| 478 self.nsites = 0 | |
| 479 | |
| 480 def setFromAlignment(self, aligned, pseudo_count = 0.0): | |
| 481 """Set the probabilities in the PWM from an alignment. | |
| 482 The alignment is a list of equal-length strings (see readStrings), OR | |
| 483 a list of Sequence. | |
| 484 """ | |
| 485 self.cols = -1 | |
| 486 self.nsites = len(aligned) | |
| 487 seqs = [] | |
| 488 # Below we create a list of Sequence from the alignment, | |
| 489 # while doing some error checking, and figure out the number of columns | |
| 490 for s in aligned: | |
| 491 # probably a text string, so we make a nameless sequence from it | |
| 492 if not type(s) is Sequence: | |
| 493 s=Sequence(s, Motif.getAlphabet(self)) | |
| 494 else: | |
| 495 # it was a sequence, so we check that the alphabet in | |
| 496 # this motif will be able to process it | |
| 497 if not Motif.isAlphabet(self, s): | |
| 498 raise RuntimeError("Motif alphabet is not valid for sequence " + s.getName()) | |
| 499 if self.cols == -1: | |
| 500 self.cols = s.getLen() | |
| 501 elif self.cols != s.getLen(): | |
| 502 raise RuntimeError("Sequences in alignment are not of equal length") | |
| 503 seqs.append(s) | |
| 504 # The line below initializes the list of Distrib (one for each column of the alignment) | |
| 505 self.counts = [Distrib(Motif.getAlphabet(self), pseudo_count) for _ in range(self.cols)] | |
| 506 # Next, we do the counting, column by column | |
| 507 for c in range( self.cols ): # iterate through columns | |
| 508 for s in seqs: # iterate through rows | |
| 509 # determine the index of the symbol we find at this position (row, column c) | |
| 510 self.counts[c].count(s.getSite(c)) | |
| 511 # Update the length | |
| 512 self.len = self.cols | |
| 513 | |
| 514 def reverseComplement(self): | |
| 515 """Reverse complement the PWM""" | |
| 516 i = 0 | |
| 517 j = len(self.counts)-1 | |
| 518 while (i < j): | |
| 519 temp = self.counts[i]; | |
| 520 self.counts[i] = self.counts[j] | |
| 521 self.counts[j] = temp | |
| 522 self.counts[i].complement() | |
| 523 self.counts[j].complement() | |
| 524 i += 1; | |
| 525 j -= 1; | |
| 526 if i == j: | |
| 527 self.counts[i].complement() | |
| 528 return self | |
| 529 | |
| 530 def getNSites(self): | |
| 531 """Get the number of sites that made the PWM""" | |
| 532 return self.nsites | |
| 533 | |
| 534 def setBackground(self, distrib): | |
| 535 """Set the background distribution""" | |
| 536 if not distrib.getAlphabet() == Motif.getAlphabet(self): | |
| 537 raise RuntimeError("Incompatible alphabets") | |
| 538 self.background = distrib | |
| 539 | |
| 540 def getFreq(self, col = None, sym = None): | |
| 541 """Get the probabilities for all positions in the PWM (a list of Distribs)""" | |
| 542 if (col == None): | |
| 543 return [y.getFreq() for y in self.counts] | |
| 544 else: | |
| 545 return self.counts[col].getFreq(sym) | |
| 546 | |
| 547 def pretty(self): | |
| 548 """Retrieve the probabilites for all positions in the PWM as a pretty table (a list of text strings)""" | |
| 549 #table = ["".join(["%8s " % s for s in self.alpha.getSymbols()])] | |
| 550 table = [] | |
| 551 for row in PWM.getFreq(self): | |
| 552 table.append("".join(["%8.6f " % y for y in row])) | |
| 553 return table | |
| 554 | |
| 555 def logoddsPretty(self, bkg): | |
| 556 """Retrieve the (base-2) log-odds for all positions in the PWM as a pretty table (a list of text strings)""" | |
| 557 table = [] | |
| 558 for row in PWM.getFreq(self): | |
| 559 #table.append("".join(["%8.6f " % (math.log((row[i]+1e-6)/bkg[i])/math.log(2)) for i in range(len(row))])) | |
| 560 table.append("".join(["%8.6f " % (math.log((row[i])/bkg[i])/math.log(2)) for i in range(len(row))])) | |
| 561 #table.append("".join(["%8.6f " % row[i] for i in range(len(row))])) | |
| 562 return table | |
| 563 | |
| 564 | |
| 565 def consensus_sequence(self): | |
| 566 """ | |
| 567 Get the consensus sequence corresponding to a PWM. | |
| 568 Consensus sequence is the letter in each column | |
| 569 with the highest probability. | |
| 570 """ | |
| 571 consensus = "" | |
| 572 alphabet = Motif.getAlphabet(self).getSymbols() | |
| 573 for pos in range(self.cols): | |
| 574 best_letter = alphabet[0] | |
| 575 best_p = self.counts[pos].getFreq(best_letter) | |
| 576 for letter in alphabet[1:]: | |
| 577 p = self.counts[pos].getFreq(letter) | |
| 578 if p > best_p: | |
| 579 best_p = p | |
| 580 best_letter = letter | |
| 581 consensus += best_letter | |
| 582 return consensus | |
| 583 | |
| 584 | |
| 585 def consensus(self): | |
| 586 """ | |
| 587 Get the consensus corresponding to a PWM. | |
| 588 Consensus at each column of motif is a list of | |
| 589 characters with non-zero probabilities. | |
| 590 """ | |
| 591 consensus = [] | |
| 592 for pos in range(self.cols): | |
| 593 matches = [] | |
| 594 for letter in Motif.getAlphabet(self).getSymbols(): | |
| 595 p = self.counts[pos].getFreq(letter) | |
| 596 if p > 0: | |
| 597 matches += letter | |
| 598 consensus.append(matches) | |
| 599 return consensus | |
| 600 | |
| 601 | |
| 602 def getScore(self, seq, start): | |
| 603 """Score this particular list of symbols using the PFM (background needs to be set separately)""" | |
| 604 sum = 0.0 | |
| 605 seqdata = seq.getSequence()[start : start+self.cols] | |
| 606 for pos in range(len(seqdata)): | |
| 607 q = self.counts[pos].getFreq(seqdata[pos]) | |
| 608 if q == 0: | |
| 609 q = 0.0001 # to avoid log(0) == -Infinity | |
| 610 logodds = math.log(q / self.background.getFreq(seqdata[pos])) | |
| 611 sum += logodds | |
| 612 return sum | |
| 613 | |
| 614 def match(self, seq, _LOG0 = -10): | |
| 615 """Find matches to the motif in a specified sequence. | |
| 616 The method is a generator, hence subsequent hits can be retrieved using next(). | |
| 617 The returned result is a tuple (position, match-sequence, score). | |
| 618 The optional parameter _LOG0 specifies a lower bound on reported logodds scores. | |
| 619 """ | |
| 620 myseq = seq | |
| 621 if not type(seq) is Sequence: | |
| 622 myseq = Sequence(seq, self.alpha) | |
| 623 if not Motif.isAlphabet(self, myseq): | |
| 624 raise RuntimeError("Motif alphabet is not valid for sequence " + myseq.getName()) | |
| 625 for pos in range(myseq.getLen() - self.cols): | |
| 626 score = PWM.getScore(self, myseq, pos) | |
| 627 if score > _LOG0: | |
| 628 yield (pos, "".join(myseq.getSite(pos, self.cols)), score) | |
| 629 | |
| 630 def writeEPS(self, program, template_file, eps_fh, | |
| 631 timestamp = time.localtime()): | |
| 632 """Write out a DNA motif to EPS format.""" | |
| 633 small_dfmt = "%d.%m.%Y %H:%M" | |
| 634 full_dfmt = "%d.%m.%Y %H:%M:%S %Z" | |
| 635 small_date = time.strftime(small_dfmt, timestamp) | |
| 636 full_date = time.strftime(full_dfmt, timestamp) | |
| 637 points_per_cm = 72.0 / 2.54 | |
| 638 height = 4.5 | |
| 639 width = self.getLen() * 0.8 + 2 | |
| 640 width = min(30, width) | |
| 641 points_height = int(height * points_per_cm) | |
| 642 points_width = int(width * points_per_cm) | |
| 643 defaults = _eps_defaults.copy() | |
| 644 defaults['CREATOR'] = program | |
| 645 defaults['CREATIONDATE'] = full_date | |
| 646 defaults['LOGOHEIGHT'] = str(height) | |
| 647 defaults['LOGOWIDTH'] = str(width) | |
| 648 defaults['FINEPRINT'] = program + ' ' + small_date | |
| 649 defaults['CHARSPERLINE'] = str(self.getLen()) | |
| 650 defaults['BOUNDINGHEIGHT'] = str(points_height) | |
| 651 defaults['BOUNDINGWIDTH'] = str(points_width) | |
| 652 defaults['LOGOLINEHEIGHT'] = str(height) | |
| 653 with open(template_file, 'r') as template_fh: | |
| 654 m_var = re.compile("\{\$([A-Z]+)\}") | |
| 655 for line in template_fh: | |
| 656 last = 0 | |
| 657 match = m_var.search(line) | |
| 658 while (match): | |
| 659 if (last < match.start()): | |
| 660 prev = line[last:match.start()] | |
| 661 eps_fh.write(prev) | |
| 662 key = match.group(1) | |
| 663 if (key == "DATA"): | |
| 664 eps_fh.write("\nStartLine\n") | |
| 665 for pos in range(self.getLen()): | |
| 666 eps_fh.write("({0:d}) startstack\n".format(pos+1)) | |
| 667 stack = [] | |
| 668 # calculate the stack information content | |
| 669 alpha_ic = 2 | |
| 670 h = 0 | |
| 671 for sym in self.getAlphabet().getSymbols(): | |
| 672 freq = self.getFreq(pos, sym) | |
| 673 if (freq == 0): | |
| 674 continue | |
| 675 h -= (freq * math.log(freq, 2)) | |
| 676 stack_ic = alpha_ic - h | |
| 677 # calculate the heights of each symbol | |
| 678 for sym in self.getAlphabet().getSymbols(): | |
| 679 freq = self.getFreq(pos, sym) | |
| 680 if (freq == 0): | |
| 681 continue | |
| 682 stack.append((freq * stack_ic, sym)) | |
| 683 stack.sort(); | |
| 684 # output the symbols | |
| 685 for symh, sym in stack: | |
| 686 eps_fh.write(" {0:f} ({1:s}) numchar\n".format( | |
| 687 symh, sym)) | |
| 688 eps_fh.write("endstack\n\n") | |
| 689 eps_fh.write("EndLine\n") | |
| 690 elif (key in defaults): | |
| 691 eps_fh.write(defaults[key]) | |
| 692 else: | |
| 693 raise RuntimeError('Unknown variable "' + key + | |
| 694 '" in EPS template') | |
| 695 last = match.end(); | |
| 696 match = m_var.search(line, last) | |
| 697 if (last < len(line)): | |
| 698 eps_fh.write(line[last:]) | |
| 699 | |
| 700 | |
| 701 #------------------ Main method ------------------- | |
| 702 # Executed if you run this file from the operating system prompt, e.g. | |
| 703 # > python sequence.py | |
| 704 | |
| 705 if __name__=='__main__': | |
| 706 alpha = getAlphabet('Extended DNA') | |
| 707 #seqs = readFASTA('pos.fasta') | |
| 708 seqs = [] | |
| 709 aln = readStrings('tmp0') | |
| 710 #regexp = RegExp(alpha, '[AG]G.[DE]TT[AS].') | |
| 711 pwm = PWM(alpha) | |
| 712 pwm.setFromAlignment(aln) | |
| 713 for row in pwm.pretty(): | |
| 714 print row | |
| 715 for s in seqs: | |
| 716 print s.getName(), s.getLen(), s.getAlphabet().getSymbols() | |
| 717 for m in regexp.match( s ): | |
| 718 print "pos: %d pat: %s %4.2f" % (m[0], m[1], m[2]) | |
| 719 for m in pwm.match( s ): | |
| 720 print "pos: %d pat: %s %4.2f" % (m[0], m[1], m[2]) | 
