comparison commons/core/seq/Bioseq.py @ 38:2c0c0a89fad7

Uploaded
author m-zytnicki
date Thu, 02 May 2013 09:56:47 -0400
parents 44d5973c188c
children
comparison
equal deleted inserted replaced
37:d22fadc825e3 38:2c0c0a89fad7
1 # Copyright INRA (Institut National de la Recherche Agronomique)
2 # http://www.inra.fr
3 # http://urgi.versailles.inra.fr
4 #
5 # This software is governed by the CeCILL license under French law and
6 # abiding by the rules of distribution of free software. You can use,
7 # modify and/ or redistribute the software under the terms of the CeCILL
8 # license as circulated by CEA, CNRS and INRIA at the following URL
9 # "http://www.cecill.info".
10 #
11 # As a counterpart to the access to the source code and rights to copy,
12 # modify and redistribute granted by the license, users are provided only
13 # with a limited warranty and the software's author, the holder of the
14 # economic rights, and the successive licensors have only limited
15 # liability.
16 #
17 # In this respect, the user's attention is drawn to the risks associated
18 # with loading, using, modifying and/or developing or reproducing the
19 # software by the user in light of its specific status of free software,
20 # that may mean that it is complicated to manipulate, and that also
21 # therefore means that it is reserved for developers and experienced
22 # professionals having in-depth computer knowledge. Users are therefore
23 # encouraged to load and test the software's suitability as regards their
24 # requirements in conditions enabling the security of their systems and/or
25 # data to be ensured and, more generally, to use and operate it in the
26 # same conditions as regards security.
27 #
28 # The fact that you are presently reading this means that you have had
29 # knowledge of the CeCILL license and that you accept its terms.
30
31
32 import sys
33 import string
34 import re
35 import random
36 import cStringIO
37 from commons.core.coord.Map import Map
38 from commons.core.checker.RepetException import RepetException
39
40 DNA_ALPHABET_WITH_N = set( ['A','T','G','C','N'] )
41 IUPAC = set(['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N'])
42
43
44 ## Record a sequence with its header
45 #
46 class Bioseq( object ):
47
48 header = ""
49 sequence = ""
50
51 ## constructor
52 #
53 # @param name the header of sequence
54 # @param seq sequence (DNA, RNA, protein)
55 #
56 def __init__( self, name="", seq="" ):
57 self.header = name
58 self.sequence = seq
59
60
61 ## Equal operator
62 #
63 def __eq__( self, o ):
64 if self.header==o.header and self.sequence==o.sequence:
65 return True
66 return False
67
68
69 ## overload __repr__
70 #
71 def __repr__( self ):
72 return "%s;%s" % ( self.header, self.sequence )
73
74
75 ## set attribute header
76 #
77 # @param header a string
78 #
79 def setHeader( self, header ):
80 self.header = header
81
82
83 ## get attribute header
84 #
85 # @return header
86 def getHeader(self):
87 return self.header
88
89
90 ## set attribute sequence
91 #
92 # @param sequence a string
93 #
94 def setSequence( self, sequence ):
95 self.sequence = sequence
96
97
98 def getSequence(self):
99 return self.sequence
100
101 ## reset
102 #
103 def reset( self ):
104 self.setHeader( "" )
105 self.setSequence( "" )
106
107
108 ## Test if bioseq is empty
109 #
110 def isEmpty( self ):
111 return self.header == "" and self.sequence == ""
112
113
114 ## Reverse the sequence
115 #
116 def reverse( self ):
117 tmp = self.sequence
118 self.sequence = tmp[::-1]
119
120
121 ## Turn the sequence into its complement
122 # Force upper case letters
123 # @warning: old name in pyRepet.Bioseq realComplement
124 #
125 def complement( self ):
126 complement = ""
127 self.upCase()
128 for i in xrange(0,len(self.sequence),1):
129 if self.sequence[i] == "A":
130 complement += "T"
131 elif self.sequence[i] == "T":
132 complement += "A"
133 elif self.sequence[i] == "C":
134 complement += "G"
135 elif self.sequence[i] == "G":
136 complement += "C"
137 elif self.sequence[i] == "M":
138 complement += "K"
139 elif self.sequence[i] == "R":
140 complement += "Y"
141 elif self.sequence[i] == "W":
142 complement += "W"
143 elif self.sequence[i] == "S":
144 complement += "S"
145 elif self.sequence[i] == "Y":
146 complement += "R"
147 elif self.sequence[i] == "K":
148 complement += "M"
149 elif self.sequence[i] == "V":
150 complement += "B"
151 elif self.sequence[i] == "H":
152 complement += "D"
153 elif self.sequence[i] == "D":
154 complement += "H"
155 elif self.sequence[i] == "B":
156 complement += "V"
157 elif self.sequence[i] == "N":
158 complement += "N"
159 elif self.sequence[i] == "-":
160 complement += "-"
161 else:
162 print "WARNING: unknown symbol '%s', replacing it by N" % ( self.sequence[i] )
163 complement += "N"
164 self.sequence = complement
165
166
167 ## Reverse and complement the sequence
168 #
169 # Force upper case letters
170 # @warning: old name in pyRepet.Bioseq : complement
171 #
172 def reverseComplement( self ):
173 self.reverse()
174 self.complement()
175
176
177 ## Remove gap in the sequence
178 #
179 def cleanGap(self):
180 self.sequence = self.sequence.replace("-","")
181
182
183 ## Copy current Bioseq Instance
184 #
185 # @return: a Bioseq instance, a copy of current sequence.
186 #
187 def copyBioseqInstance(self):
188 seq = Bioseq()
189 seq.sequence = self.sequence
190 seq.header = self.header
191 return seq
192
193
194 ## Add phase information after the name of sequence in header
195 #
196 # @param phase integer representing phase (1, 2, 3, -1, -2, -3)
197 #
198 def setFrameInfoOnHeader(self, phase):
199 if " " in self.header:
200 name, desc = self.header.split(" ", 1)
201 name = name + "_" + str(phase)
202 self.header = name + " " + desc
203 else:
204 self.header = self.header + "_" + str(phase)
205
206
207 ## Fill Bioseq attributes with fasta file
208 #
209 # @param faFileHandler file handler of a fasta file
210 #
211 def read( self, faFileHandler ):
212 line = faFileHandler.readline()
213 if line == "":
214 self.header = None
215 self.sequence = None
216 return
217 while line == "\n":
218 line = faFileHandler.readline()
219 if line[0] == '>':
220 self.header = string.rstrip(line[1:])
221 else:
222 print "error, line is",string.rstrip(line)
223 return
224 line = " "
225 seq = cStringIO.StringIO()
226 while line:
227 prev_pos = faFileHandler.tell()
228 line = faFileHandler.readline()
229 if line == "":
230 break
231 if line[0] == '>':
232 faFileHandler.seek( prev_pos )
233 break
234 seq.write( string.rstrip(line) )
235 self.sequence = seq.getvalue()
236
237
238 ## Create a subsequence with a modified header
239 #
240 # @param s integer start a required subsequence
241 # @param e integer end a required subsequence
242 #
243 # @return a Bioseq instance, a subsequence of current sequence
244 #
245 def subseq( self, s, e=0 ):
246 if e == 0 :
247 e=len( self.sequence )
248 if s > e :
249 print "error: start must be < or = to end"
250 return
251 if s <= 0 :
252 print "error: start must be > 0"
253 return
254 sub = Bioseq()
255 sub.header = self.header + " fragment " + str(s) + ".." + str(e)
256 sub.sequence = self.sequence[(s-1):e]
257 return sub
258
259
260 ## Get the nucleotide or aminoacid at the given position
261 #
262 # @param pos integer nucleotide or aminoacid position
263 #
264 # @return a string
265 #
266 def getNtFromPosition(self, pos):
267 result = None
268 if not (pos < 1 or pos > self.getLength()):
269 result = self.sequence[pos - 1]
270 return result
271
272
273 ## Print in stdout the Bioseq in fasta format with 60 characters lines
274 #
275 # @param l length of required sequence default is whole sequence
276 #
277 def view(self,l=0):
278 print '>'+self.header
279 i=0
280 if(l==0):
281 l=len(self.sequence)
282 seq=self.sequence[0:l]
283
284 while i<len(seq):
285 print seq[i:i+60]
286 i=i+60
287
288
289 ## Get length of sequence
290 #
291 # @param avoidN boolean don't count 'N' nucleotides
292 #
293 # @return length of current sequence
294 #
295 def getLength( self, countN = True ):
296 if countN:
297 return len(self.sequence)
298 else:
299 return len(self.sequence) - self.countNt('N')
300
301
302 ## Return the proportion of a specific character
303 #
304 # @param nt character that we want to count
305 #
306 def propNt( self, nt ):
307 return self.countNt( nt ) / float( self.getLength() )
308
309
310 ## Count occurrence of specific character
311 #
312 # @param nt character that we want to count
313 #
314 # @return nb of occurrences
315 #
316 def countNt( self, nt ):
317 return self.sequence.count( nt )
318
319
320 ## Count occurrence of each nucleotide in current seq
321 #
322 # @return a dict, keys are nucleotides, values are nb of occurrences
323 #
324 def countAllNt( self ):
325 dNt2Count = {}
326 for nt in ["A","T","G","C","N"]:
327 dNt2Count[ nt ] = self.countNt( nt )
328 return dNt2Count
329
330
331 ## Return a dict with the number of occurrences for each combination of ATGC of specified size and number of word found
332 #
333 # @param size integer required length word
334 #
335 def occ_word( self, size ):
336 occ = {}
337 if size == 0:
338 return occ,0
339 nbword = 0
340 srch = re.compile('[^ATGC]+')
341 wordlist = self._createWordList( size )
342 for i in wordlist:
343 occ[i] = 0
344 lenseq = len(self.sequence)
345 i = 0
346 while i < lenseq-size+1:
347 word = self.sequence[i:i+size].upper()
348 m = srch.search(word)
349 if m == None:
350 occ[word] = occ[word]+1
351 nbword = nbword + 1
352 i = i + 1
353 else:
354 i = i + m.end(0)
355 return occ, nbword
356
357
358 ## Return a dictionary with the frequency of occurs for each combination of ATGC of specified size
359 #
360 # @param size integer required length word
361 #
362 def freq_word( self, size ):
363 dOcc, nbWords = self.occ_word( size )
364 freq = {}
365 for word in dOcc.keys():
366 freq[word] = float(dOcc[word]) / nbWords
367 return freq
368
369
370 ## Find ORF in each phase
371 #
372 # @return: a dict, keys are phases, values are stop codon positions.
373 #
374 def findORF (self):
375 orf = {0:[],1:[],2:[]}
376 length = len (self.sequence)
377 for i in xrange(0,length):
378 triplet = self.sequence[i:i+3]
379 if ( triplet == "TAA" or triplet == "TAG" or triplet == "TGA"):
380 phase = i % 3
381 orf[phase].append(i)
382 return orf
383
384
385 ## Convert the sequence into upper case
386 #
387 def upCase( self ):
388 self.sequence = self.sequence.upper()
389
390
391 ## Convert the sequence into lower case
392 #
393 def lowCase( self ):
394 self.sequence = self.sequence.lower()
395
396
397 ## Extract the cluster of the fragment (output from Grouper)
398 #
399 # @return cluster id (string)
400 #
401 def getClusterID( self ):
402 data = self.header.split()
403 return data[0].split("Cl")[1]
404
405
406 ## Extract the group of the sequence (output from Grouper)
407 #
408 # @return group id (string)
409 #
410 def getGroupID( self ):
411 data = self.header.split()
412 return data[0].split("Gr")[1].split("Cl")[0]
413
414
415 ## Get the header of the full sequence (output from Grouper)
416 #
417 # @example 'Dmel_Grouper_3091_Malign_3:LARD' from '>MbS1566Gr81Cl81 Dmel_Grouper_3091_Malign_3:LARD {Fragment} 1..5203'
418 # @return header (string)
419 #
420 def getHeaderFullSeq( self ):
421 data = self.header.split()
422 return data[1]
423
424
425 ## Get the strand of the fragment (output from Grouper)
426 #
427 # @return: strand (+ or -)
428 #
429 def getFragStrand( self ):
430 data = self.header.split()
431 coord = data[3].split("..")
432 if int(coord[0]) < int(coord[-1]):
433 return "+"
434 else:
435 return "-"
436
437
438 ## Get A, T, G, C or N from an IUPAC letter
439 # IUPAC = ['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N']
440 #
441 # @return A, T, G, C or N
442 #
443 def getATGCNFromIUPAC( self, nt ):
444 subset = ["A","T","G","C","N"]
445
446 if nt in subset:
447 return nt
448 elif nt == "U":
449 return "T"
450 elif nt == "R":
451 return random.choice( "AG" )
452 elif nt == "Y":
453 return random.choice( "CT" )
454 elif nt == "M":
455 return random.choice( "CA" )
456 elif nt == "K":
457 return random.choice( "TG" )
458 elif nt == "W":
459 return random.choice( "TA" )
460 elif nt == "S":
461 return random.choice( "CG" )
462 elif nt == "B":
463 return random.choice( "CTG" )
464 elif nt == "D":
465 return random.choice( "ATG" )
466 elif nt == "H":
467 return random.choice( "ATC" )
468 elif nt == "V":
469 return random.choice( "ACG" )
470 else:
471 return "N"
472
473 ## Get nucleotide from an IUPAC letter and a nucleotide
474 # Works only for IUPAC code with two possibilities ['R','Y','M','K','W','S']
475 # Examples:
476 # Y and C returns T
477 # Y and T returns C
478 # B and C throws RepetException
479 #
480 # @return A, T, G, C
481 #
482 def getATGCNFromIUPACandATGCN(self, IUPACCode, nt):
483 if IUPACCode == "R":
484 possibleNt = set(["A", "G"])
485 if nt not in possibleNt:
486 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
487 return (possibleNt - set(nt)).pop()
488
489 elif IUPACCode == "Y":
490 possibleNt = set(["C", "T"])
491 if nt not in possibleNt:
492 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
493 return (possibleNt - set(nt)).pop()
494
495 elif IUPACCode == "M":
496 possibleNt = set(["A", "C"])
497 if nt not in possibleNt:
498 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
499 return (possibleNt - set(nt)).pop()
500
501 elif IUPACCode == "K":
502 possibleNt = set(["T", "G"])
503 if nt not in possibleNt:
504 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
505 return (possibleNt - set(nt)).pop()
506
507 elif IUPACCode == "W":
508 possibleNt = set(["A", "T"])
509 if nt not in possibleNt:
510 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
511 return (possibleNt - set(nt)).pop()
512
513 elif IUPACCode == "S":
514 possibleNt = set(["C", "G"])
515 if nt not in possibleNt:
516 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
517 return (possibleNt - set(nt)).pop()
518
519 else:
520 raise RepetException("Can't retrieve the third nucleotide from IUPAC code '%s' and nucleotide '%s'" % (IUPACCode, nt))
521
522 def getSeqWithOnlyATGCN( self ):
523 newSeq = ""
524 for nt in self.sequence:
525 newSeq += self.getATGCNFromIUPAC( nt )
526 return newSeq
527
528
529 ## Replace any symbol not in (A,T,G,C,N) by another nucleotide it represents
530 #
531 def partialIUPAC( self ):
532 self.sequence = self.getSeqWithOnlyATGCN()
533
534
535 ## Remove non Unix end-of-line symbols, if any
536 #
537 def checkEOF( self ):
538 symbol = "\r" # corresponds to '^M' from Windows
539 if symbol in self.sequence:
540 print "WARNING: Windows EOF removed in '%s'" % ( self.header )
541 sys.stdout.flush()
542 newSeq = self.sequence.replace( symbol, "" )
543 self.sequence = newSeq
544
545
546 ## Write Bioseq instance into a fasta file handler
547 #
548 # @param faFileHandler file handler of a fasta file
549 #
550 def write( self, faFileHandler ):
551 faFileHandler.write( ">%s\n" % ( self.header ) )
552 self.writeSeqInFasta( faFileHandler )
553
554
555 ## Write only the sequence of Bioseq instance into a fasta file handler
556 #
557 # @param faFileHandler file handler of a fasta file
558 #
559 def writeSeqInFasta( self, faFileHandler ):
560 i = 0
561 while i < self.getLength():
562 faFileHandler.write( "%s\n" % ( self.sequence[i:i+60] ) )
563 i += 60
564
565
566 ## Append Bioseq instance to a fasta file
567 #
568 # @param faFile name of a fasta file as a string
569 # @param mode 'write' or 'append'
570 #
571 def save( self, faFile, mode="a" ):
572 faFileHandler = open( faFile, mode )
573 self.write( faFileHandler )
574 faFileHandler.close()
575
576
577 ## Append Bioseq instance to a fasta file
578 #
579 # @param faFile name of a fasta file as a string
580 #
581 def appendBioseqInFile( self, faFile ):
582 self.save( faFile, "a" )
583
584
585 ## Write Bioseq instance into a fasta file handler
586 #
587 # @param faFileHandler file handler on a file with writing right
588 #
589 def writeABioseqInAFastaFile( self, faFileHandler ):
590 self.write( faFileHandler )
591
592
593 ## Write Bioseq instance with other header into a fasta file handler
594 #
595 # @param faFileHandler file handler on a file with writing right
596 # @param otherHeader a string representing a new header (without the > and the \n)
597 #
598 def writeWithOtherHeader( self, faFileHandler, otherHeader ):
599 self.header = otherHeader
600 self.write( faFileHandler )
601
602
603 ## Append Bioseq header and Bioseq sequence in a fasta file
604 #
605 # @param faFileHandler file handler on a file with writing right
606 # @param otherHeader a string representing a new header (without the > and the \n)
607 #
608 def writeABioseqInAFastaFileWithOtherHeader( self, faFileHandler, otherHeader ):
609 self.writeWithOtherHeader( faFileHandler, otherHeader )
610
611
612 ## get the list of Maps corresponding to seq without gap
613 #
614 # @warning This method was called getMap() in pyRepet.Bioseq
615 # @return a list of Map object
616 #
617 def getLMapWhithoutGap( self ):
618 lMaps = []
619 countSite = 1
620 countSubseq = 1
621 inGap = False
622 startMap = -1
623 endMap = -1
624
625 # initialize with the first site
626 if self.sequence[0] == "-":
627 inGap = True
628 else:
629 startMap = countSite
630
631 # for each remaining site
632 for site in self.sequence[1:]:
633 countSite += 1
634
635 # if it is a gap
636 if site == "-":
637
638 # if this is the beginning of a gap, record the previous subsequence
639 if inGap == False:
640 inGap = True
641 endMap = countSite - 1
642 lMaps.append( Map( "%s_subSeq%i" % (self.header,countSubseq), self.header, startMap, endMap ) )
643 countSubseq += 1
644
645 # if it is NOT a gap
646 if site != "-":
647
648 # if it is the end of a gap, begin the next subsequence
649 if inGap == True:
650 inGap = False
651 startMap = countSite
652
653 # if it is the last site
654 if countSite == self.getLength():
655 endMap = countSite
656 lMaps.append( Map( "%s_subSeq%i" % (self.header,countSubseq), self.header, startMap, endMap ) )
657
658 return lMaps
659
660
661 ## get the percentage of GC
662 #
663 # @return a percentage
664 #
665 def getGCpercentage( self ):
666 tmpSeq = self.getSeqWithOnlyATGCN()
667 nbGC = tmpSeq.count( "G" ) + tmpSeq.count( "C" )
668 return 100 * nbGC / float( self.getLength() )
669
670 ## get the percentage of GC of a sequence without counting N in sequence length
671 #
672 # @return a percentage
673 #
674 def getGCpercentageInSequenceWithoutCountNInLength(self):
675 tmpSeq = self.getSeqWithOnlyATGCN()
676 nbGC = tmpSeq.count( "G" ) + tmpSeq.count( "C" )
677 return 100 * nbGC / float( self.getLength() - self.countNt("N") )
678
679 ## get the 5 prime subsequence of a given length at the given position
680 #
681 # @param position integer
682 # @param flankLength integer subsequence length
683 # @return a sequence string
684 #
685 def get5PrimeFlank(self, position, flankLength):
686 if(position == 1):
687 return ""
688 else:
689 startOfFlank = 1
690 endOfFlank = position -1
691
692 if((position - flankLength) > 0):
693 startOfFlank = position - flankLength
694 else:
695 startOfFlank = 1
696
697 return self.subseq(startOfFlank, endOfFlank).sequence
698
699
700 ## get the 3 prime subsequence of a given length at the given position
701 # In the case of indels, the polymorphism length can be specified
702 #
703 # @param position integer
704 # @param flankLength integer subsequence length
705 # @param polymLength integer polymorphism length
706 # @return a sequence string
707 #
708 def get3PrimeFlank(self, position, flankLength, polymLength = 1):
709 if((position + polymLength) > len( self.sequence )):
710 return ""
711 else:
712 startOfFlank = position + polymLength
713
714 if((position+polymLength+flankLength) > len( self.sequence )):
715 endOfFlank = len( self.sequence )
716 else:
717 endOfFlank = position+polymLength+flankLength-1
718
719 return self.subseq(startOfFlank, endOfFlank).sequence
720
721
722 def _createWordList(self,size,l=['A','T','G','C']):
723 if size == 1 :
724 return l
725 else:
726 l2 = []
727 for i in l:
728 for j in ['A','T','G','C']:
729 l2.append( i + j )
730 return self._createWordList(size-1,l2)
731
732
733 def removeSymbol( self, symbol ):
734 tmp = self.sequence.replace( symbol, "" )
735 self.sequence = tmp