Mercurial > repos > yufei-luo > s_mart
view commons/core/parsing/Multifasta2SNPFile.py @ 41:e57682cd6997
Uploaded
author | m-zytnicki |
---|---|
date | Thu, 30 May 2013 03:04:47 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
import re import os import logging from commons.core.utils.FileUtils import FileUtils from commons.core.seq.BioseqDB import BioseqDB from commons.core.seq.Bioseq import Bioseq from commons.core.LoggerFactory import LoggerFactory DNA_ALPHABET_WITH_N_AND_DELS = set (['A','T','G','C','N','-']) IUPAC = set(['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N', '-', 'a','t','g','c','u','r','y','m','k','w','s','b','d','h','v','n']) class Multifasta2SNPFile( object ): POLYM_TYPE_4_SNP = "SNP" POLYM_TYPE_4_INSERTION = "INSERTION" POLYM_TYPE_4_DELETION = "DELETION" POLYM_DEFAULT_CONFIDENCE_VALUE = "A" SNP_LENGTH = 1 FLANK_LENGTH = 250 def __init__(self, taxon, batchName="", geneName=""): if(batchName): self._batchName = batchName if(geneName): self._geneName = geneName self._taxon = taxon self._outSubSNPFileName = "SubSNP.csv" self._outAlleleFileName = "Allele.csv" self._outIndividualFileName = "Individual.csv" self._outSequenceFSAFileName = "Sequences.fsa" self._outSequenceCSVFileName = "Sequences.csv" self._outBatchFileName = "Batch.txt" self._outBatchLineFileName = "BatchLine.csv" self._logFileName = "multifasta2SNP.log" self._lBatchFileResults = [] self._lSubSNPFileResults = [] self._lRefSequences = [] self._lIndividualFileResults = [] self._lBatchLineFileResults = [] self._dIndividualNumbers4SubSNPResults = {} self._dAlleleFileResults = {} self.dcurrentIndel = {} self.lIndelsOfTheCurrentLine = [] self.lIndelsOverAllLines = [] self.dSNPsPositions = {} self._iCurrentLineNumber = 0 self._currentBatchNumber = 1 self.currentLineName = "" self.currentNucleotide = "" self.currentPosition = 0 self._sPolymConfidenceValue = Multifasta2SNPFile.POLYM_DEFAULT_CONFIDENCE_VALUE self._sPolymType = Multifasta2SNPFile.POLYM_TYPE_4_SNP self._iPolymLength = Multifasta2SNPFile.SNP_LENGTH self._fileUtils = FileUtils() if self._fileUtils.isRessourceExists(self._logFileName): os.remove(self._logFileName) self._logFile = LoggerFactory.createLogger(self._logFileName, logging.INFO, "%(asctime)s %(levelname)s: %(message)s") def runOneBatch( self, inFileName): self._currentFileName = inFileName #TODO: methode a virer; n'utiliser au final que runOneBatchWithoutWriting self._wrapper = self.createWrapperFromFile(inFileName) self._lBatchFileResults = self.completeBatchList() self.detectSNPsAndIndels(self._wrapper) self._writeAllOutputFiles() self._currentBatchNumber += 1 def runOneBatchWithoutWriting( self, inFileName): self.lIndelsOverAllLines = [] self._currentFileName = inFileName self._wrapper = self.createWrapperFromFile(inFileName) self._lBatchFileResults = self.completeBatchList() self.detectSNPsAndIndels(self._wrapper) self._currentBatchNumber += 1 def _cleanOutputsInTheCurrentDir(self): #TODO: create a list of files to be deleted FileUtils.removeFilesByPattern("*.csv") if (FileUtils.isRessourceExists(self._outBatchFileName)): os.remove(self._outBatchFileName) if (FileUtils.isRessourceExists(self._outSequenceFSAFileName)): os.remove(self._outSequenceFSAFileName) def _createOutputObjectsIteratingOnCurrentDir(self): #TODO: gerer les extensions multiples extList = [".fasta", ".fsa"] for dirname, dirnames, filenames in os.walk("."): filenames.sort() for filename in filenames: if os.path.splitext(filename)[1] in extList: self._geneName = os.path.splitext(filename)[0] self._batchName = "Batch_" + self._geneName self.runOneBatchWithoutWriting(filename) def runSeveralBatches( self, inputDir): #TODO: enlever les chdirs, appeler les fichiers en absolu et modifier les tests en consequences os.chdir(inputDir) self._cleanOutputsInTheCurrentDir() self._createOutputObjectsIteratingOnCurrentDir() self._writeAllOutputFiles() os.chdir("../") def _treatADeletionClosingWithAnotherBaseThanRefSeq(self, lineName, nucleotide, position): if (self.isTheIndelOpen4ThisLine): self._closeTheCurrentIndel(lineName, nucleotide, position) self._manageSNPs(lineName, nucleotide, position) self.addOnePolymorphicPosition(position) def _treatNucleotideDifferentThanRefSeqCase(self, refSeq, lineName, index, nucleotide, position): if (nucleotide == "-" or refSeq[index] == "-"): if (self.isTheIndelOpen4ThisLine): self._expandTheCurrentIndel(position, nucleotide) else: self._startAnIndel(position, nucleotide) else: self._treatADeletionClosingWithAnotherBaseThanRefSeq(lineName, nucleotide, position) def _treatSameNucleotideInOneIndel(self, refSeq, lineName, index, nucleotide, position): if (self._sPolymType == Multifasta2SNPFile.POLYM_TYPE_4_DELETION): self._closeTheCurrentIndel(lineName, nucleotide, position) elif (self._sPolymType == Multifasta2SNPFile.POLYM_TYPE_4_INSERTION): if (refSeq[index] == "-"): self._expandTheCurrentIndel(position, nucleotide) else: self._closeTheCurrentIndel(lineName, nucleotide, position) def detectSNPsAndIndels(self, iRefAndLines): refSeq = iRefAndLines.getReferenceSequence() refSeqLength = len ( refSeq ) self.dSNPsPositions = {} for iLineBioseq in iRefAndLines.getLinesBioseqInstances(): lineSequence = iLineBioseq.sequence self.currentLineName = iLineBioseq.header self._manageCurrentIndividual(self.currentLineName) index = 0 self.isTheIndelOpen4ThisLine = 0 self.lIndelsOfTheCurrentLine = [] for nucleotide in lineSequence: position = index + 1 if (index < refSeqLength) and self._isSNPDetected(refSeq, index, nucleotide): self._treatNucleotideDifferentThanRefSeqCase(refSeq, self.currentLineName, index, nucleotide, position) elif(index < refSeqLength and self.isTheIndelOpen4ThisLine) : self._treatSameNucleotideInOneIndel(refSeq, self.currentLineName, index, nucleotide, position) index = index + 1 self.currentNucleotide = nucleotide self.currentPosition = position self.lIndelsOverAllLines = self.lIndelsOverAllLines + self.lIndelsOfTheCurrentLine self._postTraitementDetectSNP(self.currentLineName, self.currentNucleotide, self.currentPosition) def _manageCurrentIndividual(self, lineName): self._lIndividualFileResults = self._completeIndividualListWithCurrentIndividual(self._lIndividualFileResults, lineName) self._lBatchLineFileResults = self._completeBatchLineListWithCurrentIndividual(self._lBatchLineFileResults, self._lIndividualFileResults, lineName) if not self._dIndividualNumbers4SubSNPResults.__contains__(lineName): self._dIndividualNumbers4SubSNPResults[lineName] = len(self._lIndividualFileResults) def _manageLastPositionIndels(self, lineName, nucleotide, position): if (self.isTheIndelOpen4ThisLine): self._closeTheCurrentIndel(lineName, nucleotide, position) self.lIndelsOverAllLines.append(self.lIndelsOfTheCurrentLine.pop()) def _postTraitementDetectSNP(self, lineName, nucleotide, position): self._manageLastPositionIndels(lineName, nucleotide, position) self._mergeAllelesAndSubSNPsFromOverlappingIndels() self._addMissingsAllelesAndSubSNPs() self._lSubSNPFileResults = self._sortSubSNPResultByBatchPositionAndLineName(self._lSubSNPFileResults) def _manageSNPs(self, lineName, nucleotide, position): self._dAlleleFileResults = self._completeAlleleSetWithCurrentAllele(self._dAlleleFileResults, nucleotide) truePosition = self.getUngappedPositionInRefSeq(position) subSNPName = self._formatSubSNPName(lineName, truePosition, Multifasta2SNPFile.POLYM_TYPE_4_SNP) iAlleleNumber = self._dAlleleFileResults[nucleotide] self._sPolymType = Multifasta2SNPFile.POLYM_TYPE_4_SNP flank5Prime, flank3Prime = self.getFlanksOfASubSNP(lineName, position, Multifasta2SNPFile.SNP_LENGTH, Multifasta2SNPFile.FLANK_LENGTH) dSubSNPResult = {'subSNPName':subSNPName, 'position':truePosition, 'lineName':self._dIndividualNumbers4SubSNPResults[lineName], 'allele':iAlleleNumber, 'batchNumber': self._currentBatchNumber, 'confidenceValue':self._sPolymConfidenceValue, 'type':self._sPolymType, 'length': Multifasta2SNPFile.SNP_LENGTH, '5flank':flank5Prime, '3flank':flank3Prime} if(not self.subSNPExistsInSubSNPList(dSubSNPResult, self._lSubSNPFileResults)): self._lSubSNPFileResults.append(dSubSNPResult) def _startAnIndel(self, position, nucleotide): self.dcurrentIndel['start'] = position self.dcurrentIndel['end'] = position self.sCurrentIndelAllele = nucleotide if(nucleotide == "-"): self._sPolymType = Multifasta2SNPFile.POLYM_TYPE_4_DELETION else: self._sPolymType = Multifasta2SNPFile.POLYM_TYPE_4_INSERTION self.isTheIndelOpen4ThisLine = 1 def _expandTheCurrentIndel(self, position, nucleotide): self.sCurrentIndelAllele = self.sCurrentIndelAllele + nucleotide self.dcurrentIndel['end'] = position def _closeTheCurrentIndel(self, lineName, nucleotide, position): subSNPName = self._formatSubSNPName(lineName, self.dcurrentIndel['start'], self._sPolymType) dIndel4TheLine = {'name': subSNPName, 'lineName': lineName, 'start': self.dcurrentIndel['start'],'end' :self.dcurrentIndel['end'], 'allele': self.sCurrentIndelAllele, 'type': self._sPolymType, 'length': self._iPolymLength} dIndel4TheLine['length'] = self.getAnIndelLength(dIndel4TheLine) self.lIndelsOfTheCurrentLine.append(dIndel4TheLine) self.dcurrentIndel.clear() self.isTheIndelOpen4ThisLine = 0 def _mergeAllelesAndSubSNPsFromOverlappingIndels(self): lIndelList = [] for dIndel in self.lIndelsOverAllLines: lIndelList = self.clusteriseIndels(dIndel, self.lIndelsOverAllLines) for dIndel in lIndelList: oldAllele = dIndel['allele'] start = dIndel['start'] stop = dIndel['end'] lineName = dIndel['lineName'] LineBioSeq = self._wrapper._iLinesBioseqDB.fetch(lineName) dIndel = self.updateAllele(oldAllele, start, stop, LineBioSeq, dIndel) dSubSNPResult = self.createSubSNPFromAMissingPolym(dIndel, lineName) if(not self.subSNPExistsInSubSNPList(dSubSNPResult, self._lSubSNPFileResults)): self._lSubSNPFileResults.append(dSubSNPResult) def updateAllele(self, oldAllele, start, stop, LineBioSeq, dIndel): #TODO: creer le test newAllele = LineBioSeq.subseq(start, stop).sequence if newAllele != oldAllele: dIndel['allele'] = newAllele self._dAlleleFileResults = self._completeAlleleSetWithCurrentAllele(self._dAlleleFileResults, newAllele) return dIndel def getFlanksOfASubSNP(self, lineName, subsnpPosition, polymLength, flankLength): bioSeqOfTheLine = self._wrapper._iLinesBioseqDB.fetch(lineName) flank5Prime = bioSeqOfTheLine.get5PrimeFlank(subsnpPosition, flankLength) flank3Prime = bioSeqOfTheLine.get3PrimeFlank(subsnpPosition, flankLength, polymLength) return flank5Prime, flank3Prime def createSubSNPFromAMissingPolym(self, dIndel, lineName): if(dIndel['type'] == Multifasta2SNPFile.POLYM_TYPE_4_INSERTION): start = self.getUngappedPositionInRefSeq(dIndel['start']-1) else: start = self.getUngappedPositionInRefSeq(dIndel['start']) subSNPName = self._formatSubSNPName(dIndel['lineName'], start, dIndel['type']) iAlleleNumber = self._dAlleleFileResults[dIndel['allele']] iPolymLength = self.getAnIndelLength(dIndel) flank5Prime, flank3Prime = self.getFlanksOfASubSNP(lineName, dIndel['start'], iPolymLength, Multifasta2SNPFile.FLANK_LENGTH) dSubSNPResult = {'subSNPName':subSNPName, 'position':start, 'lineName':self._dIndividualNumbers4SubSNPResults[lineName], 'allele':iAlleleNumber, 'batchNumber': self._currentBatchNumber, 'confidenceValue':self._sPolymConfidenceValue, 'type':dIndel['type'], 'length': iPolymLength, '5flank':flank5Prime, '3flank':flank3Prime} return dSubSNPResult def clusteriseIndels(self, dIndel, lIndelsOverAllLines): iIndice = 0 for dIndel in lIndelsOverAllLines: iIndice2Compare = 0 for dIndel2Compare in lIndelsOverAllLines: dIndel, dIndel2Compare = self.mergeBoundsForTwoOverlappingIndels(dIndel, dIndel2Compare) lIndelsOverAllLines = self.updateBoundsForAnIndelInAnIndelList(lIndelsOverAllLines, dIndel) lIndelsOverAllLines = self.updateBoundsForAnIndelInAnIndelList(lIndelsOverAllLines, dIndel2Compare) iIndice2Compare = iIndice2Compare + 1 iIndice = iIndice + 1 return lIndelsOverAllLines def mergeBoundsForTwoOverlappingIndels(self, dIndel1, dIndel2): if((dIndel2['start'] <= dIndel1['start']) and (dIndel2['end'] >= dIndel1['start']) or (dIndel1['start'] <= dIndel2['start']) and (dIndel1['end'] >= dIndel2['start'])): if(dIndel1['start'] <= dIndel2['start']): iStart = dIndel1['start'] else: iStart = dIndel2['start'] if(dIndel1['end'] >= dIndel2['end']): iEnd = dIndel1['end'] else: iEnd = dIndel2['end'] dIndel1['start'] = iStart dIndel1['end'] = iEnd dIndel2['start'] = iStart dIndel2['end'] = iEnd return dIndel1, dIndel2 def updateBoundsForAnIndelInAnIndelList(self, lIndelsList, dIndelWithNewBounds): name = dIndelWithNewBounds['name'] dIndelInTheList, iIndice = self.findAnIndelInAListWithHisName(name, lIndelsList) lIndelsList.remove(dIndelInTheList) lIndelsList.insert(iIndice, dIndelWithNewBounds) return lIndelsList def findASubSNPInAListWithHisName(self, name, lSubSNPList): dSubSNP2Find = {} indice = 0 indice2Find = -1 for dSubSNP in lSubSNPList: if(dSubSNP['subSNPName'] == name): dSubSNP2Find = dSubSNP indice2Find = indice indice = indice + 1 if dSubSNP2Find == {} or indice2Find == -1: msg = "trying to find a SubSNP not existing: " + name self._logFile.error(msg) raise Exception ("trying to find a SubSNP not existing: " + name) else: return dSubSNP2Find, indice2Find def subSNPExistsInSubSNPList(self, dSubSNP2Find, lSubSNPList): flag = 0 for dSubSNP in lSubSNPList: if(dSubSNP2Find['subSNPName'] == dSubSNP['subSNPName']): flag = 1 if flag == 1: return True else: return False def findAnIndelInAListWithHisName(self, name, lIndelList): dIndel2Find = {} indice = 0 indice2Find = -1 for dIndel in lIndelList: if(dIndel['name'] == name): dIndel2Find = dIndel indice2Find = indice indice = indice + 1 if dIndel2Find == {} or indice2Find == -1: msg = "trying to find an indel not existing: " + name self._logFile.error(msg) raise Exception (msg) else: return dIndel2Find, indice2Find def _addMissingsAllelesAndSubSNPs(self): for dIndel in self.lIndelsOverAllLines: start = dIndel['start'] end = dIndel['end'] type = dIndel['type'] self.addMissingAllelesAndSubSNPsForOnePolym(start, end, type) for position in self.dSNPsPositions: self.addMissingAllelesAndSubSNPsForOnePolym(position, position, "SNP") def addMissingAllelesAndSubSNPsForOnePolym(self, start, end, polymType): refSeqAllele = self._wrapper._iReferenceBioseq.subseq(start, end).sequence BioSeqDb = self._wrapper.getLinesBioseqInstances() lBioSeqDbAlleles = self.getAllelesOfASubSeq(BioSeqDb, start, end) for subSequence in lBioSeqDbAlleles: if(subSequence['allele'] == refSeqAllele): lineName = subSequence['header'] dMissingPolym = {'lineName': lineName, 'start': start,'end' :end, 'allele': subSequence['allele'], 'type':polymType} self._dAlleleFileResults = self._completeAlleleSetWithCurrentAllele(self._dAlleleFileResults, subSequence['allele']) dSubSNPResult = self.createSubSNPFromAMissingPolym(dMissingPolym, lineName) if(not self.subSNPExistsInSubSNPList(dSubSNPResult, self._lSubSNPFileResults)): self._lSubSNPFileResults.append(dSubSNPResult) def addOnePolymorphicPosition(self, position): if(not self.dSNPsPositions.has_key(position)): self.dSNPsPositions[position] = 1 def getUngappedPositionInRefSeq(self, position): if(position ==1): nbOfGaps = 0 else: seqIn5Prime = self._wrapper._iReferenceBioseq.subseq(1, position-1).sequence nbOfGaps = seqIn5Prime.count("-") return position - nbOfGaps def getAllelesOfASubSeq(self, BioSeqDb, start, end): lAlleles = [] for iBioSeq in BioSeqDb: dAlleles = {} dAlleles['header'] = iBioSeq.header dAlleles['allele'] = iBioSeq.subseq(start, end).sequence lAlleles.append(dAlleles) return lAlleles def getAnIndelLength(self, dIndel): length = 0 if(dIndel['type'] == Multifasta2SNPFile.POLYM_TYPE_4_DELETION): length = dIndel['end'] - dIndel['start'] + 1 else: length = len(dIndel['allele']) return length def createWrapperFromFile(self, inFileName): faF = open(inFileName, "r") iBioSeqDB = self._extractSequencesFromInputFile(faF) faF.close() iBioSeqDB.upCase() referenceBioseq = iBioSeqDB[0] linesBioSeqDB = iBioSeqDB.extractPart(1, iBioSeqDB.getSize() - 1) try: if(FileUtils.isEmpty(inFileName)): msg = "The input file is empty!" self._logFile.error(self._prefixeWithLineNumber (msg)) raise Exception (self._prefixeWithFileName (msg)) if(self.isHeaderInRefSeqList(referenceBioseq.header)): msg = "This reference sequence already exists in one previous file!" self._logFile.error(self._prefixeWithLineNumber (msg)) raise Exception (self._prefixeWithLineNumber (msg)) except Exception, e : raise Exception ("Problem with one input file: \n" + str(e)) self._lRefSequences.append(referenceBioseq) return ReferenceBioseqAndLinesBioseqDBWrapper(referenceBioseq, linesBioSeqDB, self._logFile, inFileName) def isHeaderInRefSeqList(self, header): isHeader = False for item in self._lRefSequences: if item.header == header: isHeader = True return isHeader def completeBatchList(self): dBatchResults = {'BatchNumber' : self._currentBatchNumber, 'BatchName' : self._batchName, 'GeneName' : self._geneName,'ContactNumber' : "1", 'ProtocolNumber' : "1", 'ThematicNumber' : "1", 'RefSeqName': self._wrapper._iReferenceBioseq.header} self._lBatchFileResults.append(dBatchResults) return self._lBatchFileResults def getLineAsAHeader(self, lineToBeCheck, lineNumber = 0): """ header line begin with the tag(or token) '>' tag ended with an carriage return contain The name of sequence must respect this alphabet [a-zA-Z0-9_-:] """ obsHeader = lineToBeCheck if obsHeader[0]!=">" : msg = "tag '>' omitted before header" self._logFile.error(self._prefixeWithLineNumber (msg)) raise Exception (self._prefixeWithLineNumber (msg)) else : obsHeader = obsHeader[1:] obsHeader = obsHeader.replace ("\n","") obsHeader = self._removeRepeatedBlanksInAStr(obsHeader) obsHeader = self._replaceBlankByUnderScoreInAStr(obsHeader) if self.checkHeaderAlphabet(obsHeader) : return obsHeader self._logFile.error(self._prefixeWithLineNumber ("fatal error on header")) raise Exception (self._prefixeWithLineNumber ("fatal error on header")) def getLineAsASeq(self, lineToBeCheck): """ Sequence line ended with an carriage return contain only character of the IUPAC alphabet """ obsSeq = str.upper(lineToBeCheck) obsSeq = obsSeq.replace ("\n","") obsSeq = obsSeq.replace ("\r","") obsLine = obsSeq.replace("-","") if not self.isIUPAC_bases(obsLine) : msg = "the sequence contain a non nucleic character " self._logFile.error(self._prefixeWithLineNumber (msg)) raise Exception (self._prefixeWithLineNumber (msg)) return obsSeq def checkHeaderAlphabet( self, strToCheck): """ Check the string the string is not a header when founding a pattern not corresponding to the regexp \W Matches any non-alphanumeric character; this is equivalent to the class [^a-zA-Z0-9_-:]. """ if strToCheck=="": return False p = re.compile('[^a-zA-Z0-9_:\-]', re.IGNORECASE) #p = re.compile('(\W|-|:)+', re.IGNORECASE) errList=p.findall(strToCheck) if len( errList ) > 0 : return False else: return True ## Check the string is nucleotides sequence from the DNA_ALPHABET_WITH_N = ["A","T","G","C","N"] of IUPAC nomenclature. # @return True if sequence contain A, T, G, C or N False otherwise # def isDNA_bases( self, sequence): if sequence == "" : return False setFromString = set() for nt in sequence : setFromString.add(nt) return setFromString.issubset(DNA_ALPHABET_WITH_N_AND_DELS) ## Check if the string is nucleotides sequence from the IUPAC ALPHABET . # @return True if sequence contain IUPAC letters False otherwise # def isIUPAC_bases( self, sequence): if sequence == "" : return False setFromString = set() for nt in sequence : setFromString.add(nt) return setFromString.issubset(IUPAC) def _writeAllOutputFiles(self): writer = Multifasta2SNPFileWriter() writer.write(self) def _sortSubSNPResultByBatchPositionAndLineName(self, lSubSNPResults): return sorted(lSubSNPResults, key=lambda SNPresults: (SNPresults['batchNumber'], SNPresults['position'], SNPresults['lineName'])) def _formatSubSNPName(self, currentLineHeader, position, polymType): shortPolymType = polymType[:3] return self._batchName + "_" + shortPolymType + "_" + str(position) + "_" + currentLineHeader def _isSNPDetected(self, referenceSequence, index, nt): if((nt != referenceSequence[index]) and (nt.upper() != "N") and (referenceSequence[index].upper() != "N")): return True else: return False def _extractSequencesFromInputFile(self, inFile): # attention : DNA_ALPHABET_WITH_N_AND_DELS = Set (['A','T','G','C','N']) no including "gap" lInFileLines = inFile.readlines() nbOfLines = len(lInFileLines) - 1 #premiere lecture self._iCurrentLineNumber = 0 isSameSeq = False newSeq = "" bioseqDB = BioseqDB () while self._iCurrentLineNumber < nbOfLines : bioseq = Bioseq() bioseq.header = self.getLineAsAHeader( lInFileLines[self._iCurrentLineNumber] ) isSameSeq = True while isSameSeq and (self._iCurrentLineNumber < nbOfLines) : self._iCurrentLineNumber +=1 if lInFileLines[self._iCurrentLineNumber][0] == ">" : isSameSeq = False else : newSeq = newSeq + self.getLineAsASeq( lInFileLines[self._iCurrentLineNumber] ) isSameSeq = True bioseq.setSequence(newSeq) newSeq = "" bioseqDB.add(bioseq) return bioseqDB def _removeRepeatedBlanksInAStr (self, StrToClean ): resStr=StrToClean.expandtabs(2) compResStr=resStr.replace (" "," ") while compResStr != resStr : resStr=compResStr compResStr=resStr.replace (" "," ") return resStr def _replaceBlankByUnderScoreInAStr (self, StrToClean ): resStr = StrToClean.replace (" ","_") return resStr def _prefixeWithLineNumber (self, strMsg): resStr = "File: " + self._currentFileName + "\t" resStr += "Line %i " % (self._iCurrentLineNumber+1 ) + strMsg return resStr def _prefixeWithFileName (self, strMsg): resStr = "File: " + self._currentFileName + "\n" + strMsg return resStr def _completeAlleleSetWithCurrentAllele(self, dAlleleFileResults, dnaBase): if dAlleleFileResults.has_key(dnaBase): return dAlleleFileResults else: iAlleleNumber = len(dAlleleFileResults) + 1 dAlleleFileResults[dnaBase] = iAlleleNumber return dAlleleFileResults def _completeIndividualListWithCurrentIndividual(self, lIndividualResults, lineName): if lIndividualResults == []: iIndividualNumber = 1 else: iIndividualNumber = len(lIndividualResults) + 1 #TODO: transformer la liste de dictionnaire en liste d'objets if not (self._checkIfALineExistInList(lIndividualResults, lineName)): dIndividual2Add = {'individualNumber': iIndividualNumber, 'individualName': lineName, 'scientificName': self._taxon} lIndividualResults.append(dIndividual2Add) return lIndividualResults def _completeBatchLineListWithCurrentIndividual(self, lBatchLineResults, lIndividualResults, lineName): lineDict = self._getALineDictFromADictListWithALineName(lIndividualResults, lineName) if len(lineDict) != 0: if(lineDict.has_key('individualNumber')): indivNumberOfTheLineDict = lineDict['individualNumber'] else: msg = "Problem with the batchLine results construction: individual named " + lineName + " has no individual number!" self._logFile.error(msg) raise Exception (msg) else: msg = "Problem with the batchLine results construction: individual named " + lineName + " not in individual list!" self._logFile.error(msg) raise Exception (msg) dResults2Add = {'IndividualNumber': str(indivNumberOfTheLineDict), 'BatchNumber' : self._currentBatchNumber} lBatchLineResults.append(dResults2Add) return lBatchLineResults def _getALineDictFromADictListWithALineName(self, lDictList, lineName): dictToReturn = {} for myDict in lDictList: if myDict['individualName'] == lineName: dictToReturn = myDict return dictToReturn def _checkIfALineExistInList(self, lDictList, lineName): for myDict in lDictList: if myDict['individualName'] == lineName: return True return False def _getCurrentBatchResult(self): return self._lBatchFileResults[self._currentBatchNumber-1] class ReferenceBioseqAndLinesBioseqDBWrapper (object): def __init__ (self, iReferenceBioseq, iLinesBioSeqDB, logger, fileName): self._iReferenceBioseq = iReferenceBioseq self._iLinesBioseqDB = iLinesBioSeqDB self._logger = logger self._currentFileName = fileName self._checkAllSeqs() def _checkAllSeqs(self): self._iReferenceBioseq.checkEOF() refSeqLen = self._iReferenceBioseq.getLength() for seq in self._iLinesBioseqDB.db: seq.checkEOF() if(not seq.getLength() == refSeqLen): msg = "File: " + self._currentFileName + ", problem with the sequence " + seq.header + ": its length is different from the reference seq! All the sequences must have the same length.\n" msg += "refseq length: " + str(refSeqLen) + "\n" msg += "seq length: " + str(seq.getLength()) + "\n" self._logger.error(msg) raise Exception (msg) def getLinesBioseqInstances(self): return self._iLinesBioseqDB.db def getReferenceSequence(self): return self._iReferenceBioseq.sequence class Multifasta2SNPFileWriter(object): SUB_SNP_FILE_HEADER = ["SubSNPName","ConfidenceValue","Type","Position","5flank", "3flank","Length","BatchNumber","IndividualNumber","PrimerType","PrimerNumber","Forward_or_Reverse","AlleleNumber"] ALLELE_FILE_HEADER = ["AlleleNumber","Value","Motif","NbCopy","Comment"] INDIVIDUAL_FILE_HEADER = ["IndividualNumber","IndividualName","Description","AberrAneuploide", "FractionLength","DeletionLineSynthesis","UrlEarImage","TypeLine","ChromNumber","ArmChrom","DeletionBin","ScientificName", "local_germplasm_name","submitter_code","local_institute","donor_institute","donor_acc_id"] SEQUENCE_CSV_FILE_HEADER = ["SequenceName","SeqType","BankName","BankVersion","ACNumber","Locus","ScientificName"] BATCH_TXT_FILE_HEADER = ["BatchNumber", "BatchName", "GeneName", "Description", "ContactNumber", "ProtocolNumber", "ThematicNumber", "RefSeqName", "AlignmentFileName", "SeqName"] BATCH_LINE_FILE_HEADER = ["IndividualNumber", "Pos5", "Pos3", "BatchNumber", "Sequence"] def __init__(self): self._csvFieldSeparator = ";" self._txtSubFieldSeparator = ": " self._txtFieldSeparator = "\n" self._primerType = "Sequence" self._csvLineSeparator = "\n" self._txtLineSeparator = "//\n" def write(self, iMultifasta2SNPFile): self._writeSubSNPFile(iMultifasta2SNPFile._outSubSNPFileName, iMultifasta2SNPFile._lSubSNPFileResults) self._writeAlleleFile(iMultifasta2SNPFile._outAlleleFileName, iMultifasta2SNPFile._dAlleleFileResults) self._writeIndividualFile(iMultifasta2SNPFile._outIndividualFileName, iMultifasta2SNPFile._lIndividualFileResults) self._writeSequenceFiles(iMultifasta2SNPFile._outSequenceFSAFileName, iMultifasta2SNPFile._outSequenceCSVFileName, iMultifasta2SNPFile._lRefSequences, iMultifasta2SNPFile._taxon) self._writeBatchFile(iMultifasta2SNPFile._outBatchFileName, iMultifasta2SNPFile._lBatchFileResults) self._writeBatchLineFile(iMultifasta2SNPFile._outBatchLineFileName, iMultifasta2SNPFile._lBatchLineFileResults) def sortAlleleResultByAlleleNumber(self, dAlleleFileResults): return sorted(dAlleleFileResults.items(), key=lambda(k,v):(v,k)) def _writeSubSNPFile(self, subSNPFileName, lSNP2Write): outF = open(subSNPFileName, "w") self._writeSNPFileHeader(outF) for dSNP in lSNP2Write: self._writeSNPFileLine(outF, dSNP) outF.close() def _writeAlleleFile(self, alleleFileName, dAllele2Write): outF = open(alleleFileName, "w") self._writeAlleleFileHeader(outF) lAlleleSortedResults = self.sortAlleleResultByAlleleNumber(dAllele2Write) for tAllele in lAlleleSortedResults: self._writeAlleleFileLine(outF, tAllele[0], tAllele[1]) outF.close() def _writeIndividualFile(self, individualFileName, lIndividual2Write): sorted(lIndividual2Write, key=lambda Individual: (Individual['individualNumber'])) outF = open(individualFileName, "w") self._writeIndividualFileHeader(outF) for dIndiv in lIndividual2Write: self._writeIndividualFileLine(outF, dIndiv) outF.close() def _writeSequenceFiles(self, sequenceFSAFileName, sequenceCSVFileName, lRefSequences, taxon): outFSA = open(sequenceFSAFileName, "w") outCSV = open(sequenceCSVFileName, "w") self._writeSequenceCSVHeader(outCSV) for refSeq in lRefSequences: refSeq.cleanGap() self._writeSequenceFSAFile(outFSA, refSeq) self._writeSequenceCSVLine(outCSV, refSeq, taxon) outFSA.close() outCSV.close() def _writeSequenceFSAFile(self, outF, refSeq): outF.write( ">%s\n" % ( refSeq.header ) ) outF.write( "%s\n" % ( refSeq.sequence[0:refSeq.getLength()] ) ) def _writeBatchFile(self, batchFileName, lBatchResults): outF = open(batchFileName, "w") for dBatchResults in lBatchResults: for head in Multifasta2SNPFileWriter.BATCH_TXT_FILE_HEADER[:]: if dBatchResults.has_key(head): outF.write(head + self._txtSubFieldSeparator + str(dBatchResults[head]) + self._txtFieldSeparator) else: outF.write(head + self._txtSubFieldSeparator + self._txtFieldSeparator) outF.write(self._txtLineSeparator) outF.close() def _writeBatchLineFile(self, batchLineFileName, lBatchLineResults): outF = open(batchLineFileName, "w") self._writeBatchLineFileHeader(outF) for dResult in lBatchLineResults: self._writeBatchLineFileLine(outF, dResult) outF.close() def _writeSNPFileHeader(self, outF): for head in Multifasta2SNPFileWriter.SUB_SNP_FILE_HEADER[:-1]: outF.write(head + self._csvFieldSeparator) outF.write(Multifasta2SNPFileWriter.SUB_SNP_FILE_HEADER[-1] + self._csvLineSeparator) def _writeAlleleFileHeader(self, outF): for head in Multifasta2SNPFileWriter.ALLELE_FILE_HEADER[:-1]: outF.write(head + self._csvFieldSeparator) outF.write(Multifasta2SNPFileWriter.ALLELE_FILE_HEADER[-1] + self._csvLineSeparator) def _writeIndividualFileHeader(self, outF): for head in Multifasta2SNPFileWriter.INDIVIDUAL_FILE_HEADER[:-1]: outF.write(head + self._csvFieldSeparator) outF.write(Multifasta2SNPFileWriter.INDIVIDUAL_FILE_HEADER[-1] + self._csvLineSeparator) def _writeSequenceCSVHeader(self, outF): for head in Multifasta2SNPFileWriter.SEQUENCE_CSV_FILE_HEADER[:-1]: outF.write(head + self._csvFieldSeparator) outF.write(Multifasta2SNPFileWriter.SEQUENCE_CSV_FILE_HEADER[-1] + self._csvLineSeparator) def _writeBatchLineFileHeader(self, outF): for head in Multifasta2SNPFileWriter.BATCH_LINE_FILE_HEADER[:-1]: outF.write(head + self._csvFieldSeparator) outF.write(Multifasta2SNPFileWriter.BATCH_LINE_FILE_HEADER[-1] + self._csvLineSeparator) def _writeSNPFileLine(self, outF, dSNP): outF.write(dSNP['subSNPName'] + self._csvFieldSeparator) outF.write(dSNP['confidenceValue'] + self._csvFieldSeparator + dSNP['type'] + self._csvFieldSeparator) outF.write(str(dSNP['position']) + self._csvFieldSeparator + dSNP['5flank'] + self._csvFieldSeparator + dSNP['3flank'] + self._csvFieldSeparator) outF.write(str(dSNP['length']) + self._csvFieldSeparator + str(dSNP['batchNumber']) + self._csvFieldSeparator) outF.write(str(dSNP['lineName']) + self._csvFieldSeparator) outF.write(self._primerType + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator + str(dSNP['allele']) + self._csvLineSeparator) def _writeAlleleFileLine(self, outF, sAllele2Write, iAlleleNumber): outF.write(str(iAlleleNumber) + self._csvFieldSeparator) outF.write(sAllele2Write + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator + self._csvLineSeparator) def _writeIndividualFileLine(self, outF, dIndividual): outF.write(str(dIndividual['individualNumber']) + self._csvFieldSeparator) outF.write(dIndividual['individualName'] + self._csvFieldSeparator + self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator) outF.write(dIndividual['scientificName'] + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator+ self._csvFieldSeparator + self._csvFieldSeparator + self._csvLineSeparator) def _writeSequenceCSVLine(self, outF, refSeq, taxon): outF.write(refSeq.header + self._csvFieldSeparator) outF.write("Reference" + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator) outF.write(taxon + self._csvLineSeparator) def _writeBatchLineFileLine(self, outF, dResult): outF.write(str(dResult['IndividualNumber']) + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator) outF.write(str(dResult['BatchNumber']) + self._csvFieldSeparator + self._csvLineSeparator)