Mercurial > repos > yufei-luo > s_mart
diff commons/core/parsing/Multifasta2SNPFile.py @ 38:2c0c0a89fad7
Uploaded
author | m-zytnicki |
---|---|
date | Thu, 02 May 2013 09:56:47 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/core/parsing/Multifasta2SNPFile.py Thu May 02 09:56:47 2013 -0400 @@ -0,0 +1,846 @@ +import re +import os +import logging +from commons.core.utils.FileUtils import FileUtils +from commons.core.seq.BioseqDB import BioseqDB +from commons.core.seq.Bioseq import Bioseq +from commons.core.LoggerFactory import LoggerFactory + +DNA_ALPHABET_WITH_N_AND_DELS = set (['A','T','G','C','N','-']) +IUPAC = set(['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N', '-', 'a','t','g','c','u','r','y','m','k','w','s','b','d','h','v','n']) + +class Multifasta2SNPFile( object ): + + POLYM_TYPE_4_SNP = "SNP" + POLYM_TYPE_4_INSERTION = "INSERTION" + POLYM_TYPE_4_DELETION = "DELETION" + POLYM_DEFAULT_CONFIDENCE_VALUE = "A" + SNP_LENGTH = 1 + FLANK_LENGTH = 250 + + def __init__(self, taxon, batchName="", geneName=""): + + if(batchName): + self._batchName = batchName + + if(geneName): + self._geneName = geneName + + self._taxon = taxon + self._outSubSNPFileName = "SubSNP.csv" + self._outAlleleFileName = "Allele.csv" + self._outIndividualFileName = "Individual.csv" + self._outSequenceFSAFileName = "Sequences.fsa" + self._outSequenceCSVFileName = "Sequences.csv" + self._outBatchFileName = "Batch.txt" + self._outBatchLineFileName = "BatchLine.csv" + self._logFileName = "multifasta2SNP.log" + + self._lBatchFileResults = [] + self._lSubSNPFileResults = [] + self._lRefSequences = [] + self._lIndividualFileResults = [] + self._lBatchLineFileResults = [] + self._dIndividualNumbers4SubSNPResults = {} + self._dAlleleFileResults = {} + + + self.dcurrentIndel = {} + self.lIndelsOfTheCurrentLine = [] + self.lIndelsOverAllLines = [] + self.dSNPsPositions = {} + + self._iCurrentLineNumber = 0 + self._currentBatchNumber = 1 + self.currentLineName = "" + self.currentNucleotide = "" + self.currentPosition = 0 + self._sPolymConfidenceValue = Multifasta2SNPFile.POLYM_DEFAULT_CONFIDENCE_VALUE + self._sPolymType = Multifasta2SNPFile.POLYM_TYPE_4_SNP + self._iPolymLength = Multifasta2SNPFile.SNP_LENGTH + self._fileUtils = FileUtils() + + if self._fileUtils.isRessourceExists(self._logFileName): + os.remove(self._logFileName) + self._logFile = LoggerFactory.createLogger(self._logFileName, logging.INFO, "%(asctime)s %(levelname)s: %(message)s") + + def runOneBatch( self, inFileName): + self._currentFileName = inFileName + #TODO: methode a virer; n'utiliser au final que runOneBatchWithoutWriting + self._wrapper = self.createWrapperFromFile(inFileName) + self._lBatchFileResults = self.completeBatchList() + self.detectSNPsAndIndels(self._wrapper) + self._writeAllOutputFiles() + self._currentBatchNumber += 1 + + def runOneBatchWithoutWriting( self, inFileName): + self.lIndelsOverAllLines = [] + self._currentFileName = inFileName + self._wrapper = self.createWrapperFromFile(inFileName) + self._lBatchFileResults = self.completeBatchList() + self.detectSNPsAndIndels(self._wrapper) + self._currentBatchNumber += 1 + + + def _cleanOutputsInTheCurrentDir(self): + #TODO: create a list of files to be deleted + FileUtils.removeFilesByPattern("*.csv") + if (FileUtils.isRessourceExists(self._outBatchFileName)): + os.remove(self._outBatchFileName) + if (FileUtils.isRessourceExists(self._outSequenceFSAFileName)): + os.remove(self._outSequenceFSAFileName) + + + def _createOutputObjectsIteratingOnCurrentDir(self): + #TODO: gerer les extensions multiples + extList = [".fasta", ".fsa"] + for dirname, dirnames, filenames in os.walk("."): + filenames.sort() + for filename in filenames: + if os.path.splitext(filename)[1] in extList: + self._geneName = os.path.splitext(filename)[0] + self._batchName = "Batch_" + self._geneName + self.runOneBatchWithoutWriting(filename) + + def runSeveralBatches( self, inputDir): + #TODO: enlever les chdirs, appeler les fichiers en absolu et modifier les tests en consequences + os.chdir(inputDir) + self._cleanOutputsInTheCurrentDir() + self._createOutputObjectsIteratingOnCurrentDir() + self._writeAllOutputFiles() + os.chdir("../") + + + def _treatADeletionClosingWithAnotherBaseThanRefSeq(self, lineName, nucleotide, position): + if (self.isTheIndelOpen4ThisLine): + self._closeTheCurrentIndel(lineName, nucleotide, position) + self._manageSNPs(lineName, nucleotide, position) + self.addOnePolymorphicPosition(position) + + def _treatNucleotideDifferentThanRefSeqCase(self, refSeq, lineName, index, nucleotide, position): + if (nucleotide == "-" or refSeq[index] == "-"): + if (self.isTheIndelOpen4ThisLine): + self._expandTheCurrentIndel(position, nucleotide) + else: + self._startAnIndel(position, nucleotide) + else: + self._treatADeletionClosingWithAnotherBaseThanRefSeq(lineName, nucleotide, position) + + + def _treatSameNucleotideInOneIndel(self, refSeq, lineName, index, nucleotide, position): + if (self._sPolymType == Multifasta2SNPFile.POLYM_TYPE_4_DELETION): + self._closeTheCurrentIndel(lineName, nucleotide, position) + elif (self._sPolymType == Multifasta2SNPFile.POLYM_TYPE_4_INSERTION): + if (refSeq[index] == "-"): + self._expandTheCurrentIndel(position, nucleotide) + else: + self._closeTheCurrentIndel(lineName, nucleotide, position) + + def detectSNPsAndIndels(self, iRefAndLines): + refSeq = iRefAndLines.getReferenceSequence() + refSeqLength = len ( refSeq ) + self.dSNPsPositions = {} + + for iLineBioseq in iRefAndLines.getLinesBioseqInstances(): + lineSequence = iLineBioseq.sequence + self.currentLineName = iLineBioseq.header + self._manageCurrentIndividual(self.currentLineName) + + index = 0 + self.isTheIndelOpen4ThisLine = 0 + self.lIndelsOfTheCurrentLine = [] + for nucleotide in lineSequence: + position = index + 1 + if (index < refSeqLength) and self._isSNPDetected(refSeq, index, nucleotide): + self._treatNucleotideDifferentThanRefSeqCase(refSeq, self.currentLineName, index, nucleotide, position) + elif(index < refSeqLength and self.isTheIndelOpen4ThisLine) : + self._treatSameNucleotideInOneIndel(refSeq, self.currentLineName, index, nucleotide, position) + index = index + 1 + self.currentNucleotide = nucleotide + self.currentPosition = position + + self.lIndelsOverAllLines = self.lIndelsOverAllLines + self.lIndelsOfTheCurrentLine + + self._postTraitementDetectSNP(self.currentLineName, self.currentNucleotide, self.currentPosition) + + def _manageCurrentIndividual(self, lineName): + self._lIndividualFileResults = self._completeIndividualListWithCurrentIndividual(self._lIndividualFileResults, lineName) + self._lBatchLineFileResults = self._completeBatchLineListWithCurrentIndividual(self._lBatchLineFileResults, self._lIndividualFileResults, lineName) + if not self._dIndividualNumbers4SubSNPResults.__contains__(lineName): + self._dIndividualNumbers4SubSNPResults[lineName] = len(self._lIndividualFileResults) + + + def _manageLastPositionIndels(self, lineName, nucleotide, position): + if (self.isTheIndelOpen4ThisLine): + self._closeTheCurrentIndel(lineName, nucleotide, position) + self.lIndelsOverAllLines.append(self.lIndelsOfTheCurrentLine.pop()) + + def _postTraitementDetectSNP(self, lineName, nucleotide, position): + self._manageLastPositionIndels(lineName, nucleotide, position) + + self._mergeAllelesAndSubSNPsFromOverlappingIndels() + self._addMissingsAllelesAndSubSNPs() + + self._lSubSNPFileResults = self._sortSubSNPResultByBatchPositionAndLineName(self._lSubSNPFileResults) + + def _manageSNPs(self, lineName, nucleotide, position): + self._dAlleleFileResults = self._completeAlleleSetWithCurrentAllele(self._dAlleleFileResults, nucleotide) + truePosition = self.getUngappedPositionInRefSeq(position) + subSNPName = self._formatSubSNPName(lineName, truePosition, Multifasta2SNPFile.POLYM_TYPE_4_SNP) + iAlleleNumber = self._dAlleleFileResults[nucleotide] + self._sPolymType = Multifasta2SNPFile.POLYM_TYPE_4_SNP + flank5Prime, flank3Prime = self.getFlanksOfASubSNP(lineName, position, Multifasta2SNPFile.SNP_LENGTH, Multifasta2SNPFile.FLANK_LENGTH) + dSubSNPResult = {'subSNPName':subSNPName, 'position':truePosition, 'lineName':self._dIndividualNumbers4SubSNPResults[lineName], + 'allele':iAlleleNumber, 'batchNumber': self._currentBatchNumber, 'confidenceValue':self._sPolymConfidenceValue, + 'type':self._sPolymType, 'length': Multifasta2SNPFile.SNP_LENGTH, + '5flank':flank5Prime, '3flank':flank3Prime} + if(not self.subSNPExistsInSubSNPList(dSubSNPResult, self._lSubSNPFileResults)): + self._lSubSNPFileResults.append(dSubSNPResult) + + def _startAnIndel(self, position, nucleotide): + self.dcurrentIndel['start'] = position + self.dcurrentIndel['end'] = position + self.sCurrentIndelAllele = nucleotide + if(nucleotide == "-"): + self._sPolymType = Multifasta2SNPFile.POLYM_TYPE_4_DELETION + else: + self._sPolymType = Multifasta2SNPFile.POLYM_TYPE_4_INSERTION + self.isTheIndelOpen4ThisLine = 1 + + def _expandTheCurrentIndel(self, position, nucleotide): + self.sCurrentIndelAllele = self.sCurrentIndelAllele + nucleotide + self.dcurrentIndel['end'] = position + + def _closeTheCurrentIndel(self, lineName, nucleotide, position): + subSNPName = self._formatSubSNPName(lineName, self.dcurrentIndel['start'], self._sPolymType) + + dIndel4TheLine = {'name': subSNPName, 'lineName': lineName, 'start': self.dcurrentIndel['start'],'end' :self.dcurrentIndel['end'], + 'allele': self.sCurrentIndelAllele, 'type': self._sPolymType, 'length': self._iPolymLength} + + dIndel4TheLine['length'] = self.getAnIndelLength(dIndel4TheLine) + + self.lIndelsOfTheCurrentLine.append(dIndel4TheLine) + + self.dcurrentIndel.clear() + self.isTheIndelOpen4ThisLine = 0 + + + def _mergeAllelesAndSubSNPsFromOverlappingIndels(self): + lIndelList = [] + for dIndel in self.lIndelsOverAllLines: + lIndelList = self.clusteriseIndels(dIndel, self.lIndelsOverAllLines) + + for dIndel in lIndelList: + oldAllele = dIndel['allele'] + start = dIndel['start'] + stop = dIndel['end'] + lineName = dIndel['lineName'] + + LineBioSeq = self._wrapper._iLinesBioseqDB.fetch(lineName) + dIndel = self.updateAllele(oldAllele, start, stop, LineBioSeq, dIndel) + dSubSNPResult = self.createSubSNPFromAMissingPolym(dIndel, lineName) + if(not self.subSNPExistsInSubSNPList(dSubSNPResult, self._lSubSNPFileResults)): + self._lSubSNPFileResults.append(dSubSNPResult) + + def updateAllele(self, oldAllele, start, stop, LineBioSeq, dIndel): + #TODO: creer le test + newAllele = LineBioSeq.subseq(start, stop).sequence + if newAllele != oldAllele: + dIndel['allele'] = newAllele + self._dAlleleFileResults = self._completeAlleleSetWithCurrentAllele(self._dAlleleFileResults, newAllele) + return dIndel + + + def getFlanksOfASubSNP(self, lineName, subsnpPosition, polymLength, flankLength): + bioSeqOfTheLine = self._wrapper._iLinesBioseqDB.fetch(lineName) + flank5Prime = bioSeqOfTheLine.get5PrimeFlank(subsnpPosition, flankLength) + flank3Prime = bioSeqOfTheLine.get3PrimeFlank(subsnpPosition, flankLength, polymLength) + + return flank5Prime, flank3Prime + + def createSubSNPFromAMissingPolym(self, dIndel, lineName): + if(dIndel['type'] == Multifasta2SNPFile.POLYM_TYPE_4_INSERTION): + start = self.getUngappedPositionInRefSeq(dIndel['start']-1) + else: + start = self.getUngappedPositionInRefSeq(dIndel['start']) + + subSNPName = self._formatSubSNPName(dIndel['lineName'], start, dIndel['type']) + + iAlleleNumber = self._dAlleleFileResults[dIndel['allele']] + + iPolymLength = self.getAnIndelLength(dIndel) + + flank5Prime, flank3Prime = self.getFlanksOfASubSNP(lineName, dIndel['start'], iPolymLength, Multifasta2SNPFile.FLANK_LENGTH) + + dSubSNPResult = {'subSNPName':subSNPName, 'position':start, 'lineName':self._dIndividualNumbers4SubSNPResults[lineName], 'allele':iAlleleNumber, + 'batchNumber': self._currentBatchNumber, 'confidenceValue':self._sPolymConfidenceValue, 'type':dIndel['type'], + 'length': iPolymLength, '5flank':flank5Prime, '3flank':flank3Prime} + + return dSubSNPResult + + def clusteriseIndels(self, dIndel, lIndelsOverAllLines): + iIndice = 0 + for dIndel in lIndelsOverAllLines: + iIndice2Compare = 0 + for dIndel2Compare in lIndelsOverAllLines: + dIndel, dIndel2Compare = self.mergeBoundsForTwoOverlappingIndels(dIndel, dIndel2Compare) + lIndelsOverAllLines = self.updateBoundsForAnIndelInAnIndelList(lIndelsOverAllLines, dIndel) + lIndelsOverAllLines = self.updateBoundsForAnIndelInAnIndelList(lIndelsOverAllLines, dIndel2Compare) + iIndice2Compare = iIndice2Compare + 1 + iIndice = iIndice + 1 + + return lIndelsOverAllLines + + def mergeBoundsForTwoOverlappingIndels(self, dIndel1, dIndel2): + if((dIndel2['start'] <= dIndel1['start']) and (dIndel2['end'] >= dIndel1['start']) or + (dIndel1['start'] <= dIndel2['start']) and (dIndel1['end'] >= dIndel2['start'])): + if(dIndel1['start'] <= dIndel2['start']): + iStart = dIndel1['start'] + else: + iStart = dIndel2['start'] + + if(dIndel1['end'] >= dIndel2['end']): + iEnd = dIndel1['end'] + else: + iEnd = dIndel2['end'] + + dIndel1['start'] = iStart + dIndel1['end'] = iEnd + dIndel2['start'] = iStart + dIndel2['end'] = iEnd + + return dIndel1, dIndel2 + + def updateBoundsForAnIndelInAnIndelList(self, lIndelsList, dIndelWithNewBounds): + name = dIndelWithNewBounds['name'] + dIndelInTheList, iIndice = self.findAnIndelInAListWithHisName(name, lIndelsList) + lIndelsList.remove(dIndelInTheList) + lIndelsList.insert(iIndice, dIndelWithNewBounds) + return lIndelsList + + + def findASubSNPInAListWithHisName(self, name, lSubSNPList): + dSubSNP2Find = {} + indice = 0 + indice2Find = -1 + for dSubSNP in lSubSNPList: + if(dSubSNP['subSNPName'] == name): + dSubSNP2Find = dSubSNP + indice2Find = indice + indice = indice + 1 + + if dSubSNP2Find == {} or indice2Find == -1: + msg = "trying to find a SubSNP not existing: " + name + self._logFile.error(msg) + raise Exception ("trying to find a SubSNP not existing: " + name) + else: + return dSubSNP2Find, indice2Find + + def subSNPExistsInSubSNPList(self, dSubSNP2Find, lSubSNPList): + flag = 0 + for dSubSNP in lSubSNPList: + if(dSubSNP2Find['subSNPName'] == dSubSNP['subSNPName']): + flag = 1 + + if flag == 1: + return True + else: + return False + + + def findAnIndelInAListWithHisName(self, name, lIndelList): + dIndel2Find = {} + indice = 0 + indice2Find = -1 + for dIndel in lIndelList: + if(dIndel['name'] == name): + dIndel2Find = dIndel + indice2Find = indice + indice = indice + 1 + + if dIndel2Find == {} or indice2Find == -1: + msg = "trying to find an indel not existing: " + name + self._logFile.error(msg) + raise Exception (msg) + else: + return dIndel2Find, indice2Find + + def _addMissingsAllelesAndSubSNPs(self): + for dIndel in self.lIndelsOverAllLines: + start = dIndel['start'] + end = dIndel['end'] + type = dIndel['type'] + self.addMissingAllelesAndSubSNPsForOnePolym(start, end, type) + + for position in self.dSNPsPositions: + self.addMissingAllelesAndSubSNPsForOnePolym(position, position, "SNP") + + def addMissingAllelesAndSubSNPsForOnePolym(self, start, end, polymType): + refSeqAllele = self._wrapper._iReferenceBioseq.subseq(start, end).sequence + BioSeqDb = self._wrapper.getLinesBioseqInstances() + lBioSeqDbAlleles = self.getAllelesOfASubSeq(BioSeqDb, start, end) + for subSequence in lBioSeqDbAlleles: + if(subSequence['allele'] == refSeqAllele): + lineName = subSequence['header'] + dMissingPolym = {'lineName': lineName, 'start': start,'end' :end, + 'allele': subSequence['allele'], 'type':polymType} + self._dAlleleFileResults = self._completeAlleleSetWithCurrentAllele(self._dAlleleFileResults, subSequence['allele']) + dSubSNPResult = self.createSubSNPFromAMissingPolym(dMissingPolym, lineName) + if(not self.subSNPExistsInSubSNPList(dSubSNPResult, self._lSubSNPFileResults)): + self._lSubSNPFileResults.append(dSubSNPResult) + + def addOnePolymorphicPosition(self, position): + if(not self.dSNPsPositions.has_key(position)): + self.dSNPsPositions[position] = 1 + + def getUngappedPositionInRefSeq(self, position): + if(position ==1): + nbOfGaps = 0 + else: + seqIn5Prime = self._wrapper._iReferenceBioseq.subseq(1, position-1).sequence + nbOfGaps = seqIn5Prime.count("-") + + return position - nbOfGaps + + def getAllelesOfASubSeq(self, BioSeqDb, start, end): + lAlleles = [] + for iBioSeq in BioSeqDb: + dAlleles = {} + dAlleles['header'] = iBioSeq.header + dAlleles['allele'] = iBioSeq.subseq(start, end).sequence + lAlleles.append(dAlleles) + + return lAlleles + + def getAnIndelLength(self, dIndel): + length = 0 + if(dIndel['type'] == Multifasta2SNPFile.POLYM_TYPE_4_DELETION): + length = dIndel['end'] - dIndel['start'] + 1 + else: + length = len(dIndel['allele']) + + return length + + def createWrapperFromFile(self, inFileName): + faF = open(inFileName, "r") + iBioSeqDB = self._extractSequencesFromInputFile(faF) + faF.close() + + iBioSeqDB.upCase() + referenceBioseq = iBioSeqDB[0] + linesBioSeqDB = iBioSeqDB.extractPart(1, iBioSeqDB.getSize() - 1) + + try: + if(FileUtils.isEmpty(inFileName)): + msg = "The input file is empty!" + self._logFile.error(self._prefixeWithLineNumber (msg)) + raise Exception (self._prefixeWithFileName (msg)) + if(self.isHeaderInRefSeqList(referenceBioseq.header)): + msg = "This reference sequence already exists in one previous file!" + self._logFile.error(self._prefixeWithLineNumber (msg)) + raise Exception (self._prefixeWithLineNumber (msg)) + except Exception, e : + raise Exception ("Problem with one input file: \n" + str(e)) + + self._lRefSequences.append(referenceBioseq) + + return ReferenceBioseqAndLinesBioseqDBWrapper(referenceBioseq, linesBioSeqDB, self._logFile, inFileName) + + def isHeaderInRefSeqList(self, header): + isHeader = False + for item in self._lRefSequences: + if item.header == header: + isHeader = True + return isHeader + + def completeBatchList(self): + dBatchResults = {'BatchNumber' : self._currentBatchNumber, 'BatchName' : self._batchName, 'GeneName' : self._geneName,'ContactNumber' : "1", + 'ProtocolNumber' : "1", 'ThematicNumber' : "1", 'RefSeqName': self._wrapper._iReferenceBioseq.header} + + self._lBatchFileResults.append(dBatchResults) + + return self._lBatchFileResults + + def getLineAsAHeader(self, lineToBeCheck, lineNumber = 0): + """ + header line begin with the tag(or token) '>' tag + ended with an carriage return + contain The name of sequence must respect this alphabet [a-zA-Z0-9_-:] + """ + obsHeader = lineToBeCheck + if obsHeader[0]!=">" : + msg = "tag '>' omitted before header" + self._logFile.error(self._prefixeWithLineNumber (msg)) + raise Exception (self._prefixeWithLineNumber (msg)) + else : + obsHeader = obsHeader[1:] + obsHeader = obsHeader.replace ("\n","") + obsHeader = self._removeRepeatedBlanksInAStr(obsHeader) + obsHeader = self._replaceBlankByUnderScoreInAStr(obsHeader) + if self.checkHeaderAlphabet(obsHeader) : + return obsHeader + self._logFile.error(self._prefixeWithLineNumber ("fatal error on header")) + raise Exception (self._prefixeWithLineNumber ("fatal error on header")) + + def getLineAsASeq(self, lineToBeCheck): + """ + Sequence line + ended with an carriage return + contain only character of the IUPAC alphabet + """ + obsSeq = str.upper(lineToBeCheck) + obsSeq = obsSeq.replace ("\n","") + obsSeq = obsSeq.replace ("\r","") + obsLine = obsSeq.replace("-","") + if not self.isIUPAC_bases(obsLine) : + msg = "the sequence contain a non nucleic character " + self._logFile.error(self._prefixeWithLineNumber (msg)) + raise Exception (self._prefixeWithLineNumber (msg)) + return obsSeq + + def checkHeaderAlphabet( self, strToCheck): + """ + Check the string + the string is not a header when founding a pattern not corresponding to the regexp + \W Matches any non-alphanumeric character; this is equivalent to the class [^a-zA-Z0-9_-:]. + """ + if strToCheck=="": + return False + p = re.compile('[^a-zA-Z0-9_:\-]', re.IGNORECASE) #p = re.compile('(\W|-|:)+', re.IGNORECASE) + errList=p.findall(strToCheck) + if len( errList ) > 0 : + return False + else: + return True + + ## Check the string is nucleotides sequence from the DNA_ALPHABET_WITH_N = ["A","T","G","C","N"] of IUPAC nomenclature. + # @return True if sequence contain A, T, G, C or N False otherwise + # + def isDNA_bases( self, sequence): + if sequence == "" : + return False + + setFromString = set() + + for nt in sequence : + setFromString.add(nt) + + return setFromString.issubset(DNA_ALPHABET_WITH_N_AND_DELS) + + ## Check if the string is nucleotides sequence from the IUPAC ALPHABET . + # @return True if sequence contain IUPAC letters False otherwise + # + def isIUPAC_bases( self, sequence): + if sequence == "" : + return False + + setFromString = set() + + for nt in sequence : + setFromString.add(nt) + + return setFromString.issubset(IUPAC) + + def _writeAllOutputFiles(self): + writer = Multifasta2SNPFileWriter() + writer.write(self) + + def _sortSubSNPResultByBatchPositionAndLineName(self, lSubSNPResults): + return sorted(lSubSNPResults, key=lambda SNPresults: (SNPresults['batchNumber'], SNPresults['position'], SNPresults['lineName'])) + + def _formatSubSNPName(self, currentLineHeader, position, polymType): + shortPolymType = polymType[:3] + return self._batchName + "_" + shortPolymType + "_" + str(position) + "_" + currentLineHeader + + def _isSNPDetected(self, referenceSequence, index, nt): + if((nt != referenceSequence[index]) and (nt.upper() != "N") and (referenceSequence[index].upper() != "N")): + return True + else: + return False + + def _extractSequencesFromInputFile(self, inFile): + # attention : DNA_ALPHABET_WITH_N_AND_DELS = Set (['A','T','G','C','N']) no including "gap" + lInFileLines = inFile.readlines() + nbOfLines = len(lInFileLines) - 1 + #premiere lecture + self._iCurrentLineNumber = 0 + isSameSeq = False + newSeq = "" + bioseqDB = BioseqDB () + while self._iCurrentLineNumber < nbOfLines : + bioseq = Bioseq() + bioseq.header = self.getLineAsAHeader( lInFileLines[self._iCurrentLineNumber] ) + isSameSeq = True + while isSameSeq and (self._iCurrentLineNumber < nbOfLines) : + self._iCurrentLineNumber +=1 + if lInFileLines[self._iCurrentLineNumber][0] == ">" : + isSameSeq = False + else : + newSeq = newSeq + self.getLineAsASeq( lInFileLines[self._iCurrentLineNumber] ) + isSameSeq = True + bioseq.setSequence(newSeq) + newSeq = "" + bioseqDB.add(bioseq) + return bioseqDB + + def _removeRepeatedBlanksInAStr (self, StrToClean ): + resStr=StrToClean.expandtabs(2) + compResStr=resStr.replace (" "," ") + while compResStr != resStr : + resStr=compResStr + compResStr=resStr.replace (" "," ") + return resStr + + def _replaceBlankByUnderScoreInAStr (self, StrToClean ): + resStr = StrToClean.replace (" ","_") + return resStr + + def _prefixeWithLineNumber (self, strMsg): + resStr = "File: " + self._currentFileName + "\t" + resStr += "Line %i " % (self._iCurrentLineNumber+1 ) + strMsg + return resStr + + def _prefixeWithFileName (self, strMsg): + resStr = "File: " + self._currentFileName + "\n" + strMsg + return resStr + + + def _completeAlleleSetWithCurrentAllele(self, dAlleleFileResults, dnaBase): + if dAlleleFileResults.has_key(dnaBase): + return dAlleleFileResults + else: + iAlleleNumber = len(dAlleleFileResults) + 1 + dAlleleFileResults[dnaBase] = iAlleleNumber + return dAlleleFileResults + + def _completeIndividualListWithCurrentIndividual(self, lIndividualResults, lineName): + if lIndividualResults == []: + iIndividualNumber = 1 + else: + iIndividualNumber = len(lIndividualResults) + 1 + + #TODO: transformer la liste de dictionnaire en liste d'objets + if not (self._checkIfALineExistInList(lIndividualResults, lineName)): + dIndividual2Add = {'individualNumber': iIndividualNumber, 'individualName': lineName, 'scientificName': self._taxon} + lIndividualResults.append(dIndividual2Add) + + return lIndividualResults + + def _completeBatchLineListWithCurrentIndividual(self, lBatchLineResults, lIndividualResults, lineName): + lineDict = self._getALineDictFromADictListWithALineName(lIndividualResults, lineName) + + if len(lineDict) != 0: + if(lineDict.has_key('individualNumber')): + indivNumberOfTheLineDict = lineDict['individualNumber'] + else: + msg = "Problem with the batchLine results construction: individual named " + lineName + " has no individual number!" + self._logFile.error(msg) + raise Exception (msg) + else: + msg = "Problem with the batchLine results construction: individual named " + lineName + " not in individual list!" + self._logFile.error(msg) + raise Exception (msg) + + dResults2Add = {'IndividualNumber': str(indivNumberOfTheLineDict), 'BatchNumber' : self._currentBatchNumber} + lBatchLineResults.append(dResults2Add) + return lBatchLineResults + + def _getALineDictFromADictListWithALineName(self, lDictList, lineName): + dictToReturn = {} + for myDict in lDictList: + if myDict['individualName'] == lineName: + dictToReturn = myDict + + return dictToReturn + + def _checkIfALineExistInList(self, lDictList, lineName): + for myDict in lDictList: + if myDict['individualName'] == lineName: + return True + return False + + def _getCurrentBatchResult(self): + return self._lBatchFileResults[self._currentBatchNumber-1] + + + + +class ReferenceBioseqAndLinesBioseqDBWrapper (object): + + def __init__ (self, iReferenceBioseq, iLinesBioSeqDB, logger, fileName): + self._iReferenceBioseq = iReferenceBioseq + self._iLinesBioseqDB = iLinesBioSeqDB + self._logger = logger + self._currentFileName = fileName + self._checkAllSeqs() + + + def _checkAllSeqs(self): + self._iReferenceBioseq.checkEOF() + refSeqLen = self._iReferenceBioseq.getLength() + + for seq in self._iLinesBioseqDB.db: + seq.checkEOF() + if(not seq.getLength() == refSeqLen): + msg = "File: " + self._currentFileName + ", problem with the sequence " + seq.header + ": its length is different from the reference seq! All the sequences must have the same length.\n" + msg += "refseq length: " + str(refSeqLen) + "\n" + msg += "seq length: " + str(seq.getLength()) + "\n" + self._logger.error(msg) + raise Exception (msg) + + def getLinesBioseqInstances(self): + return self._iLinesBioseqDB.db + + def getReferenceSequence(self): + return self._iReferenceBioseq.sequence + +class Multifasta2SNPFileWriter(object): + + SUB_SNP_FILE_HEADER = ["SubSNPName","ConfidenceValue","Type","Position","5flank", + "3flank","Length","BatchNumber","IndividualNumber","PrimerType","PrimerNumber","Forward_or_Reverse","AlleleNumber"] + + ALLELE_FILE_HEADER = ["AlleleNumber","Value","Motif","NbCopy","Comment"] + + INDIVIDUAL_FILE_HEADER = ["IndividualNumber","IndividualName","Description","AberrAneuploide", + "FractionLength","DeletionLineSynthesis","UrlEarImage","TypeLine","ChromNumber","ArmChrom","DeletionBin","ScientificName", + "local_germplasm_name","submitter_code","local_institute","donor_institute","donor_acc_id"] + + SEQUENCE_CSV_FILE_HEADER = ["SequenceName","SeqType","BankName","BankVersion","ACNumber","Locus","ScientificName"] + + BATCH_TXT_FILE_HEADER = ["BatchNumber", "BatchName", "GeneName", "Description", "ContactNumber", "ProtocolNumber", "ThematicNumber", "RefSeqName", "AlignmentFileName", "SeqName"] + + BATCH_LINE_FILE_HEADER = ["IndividualNumber", "Pos5", "Pos3", "BatchNumber", "Sequence"] + + def __init__(self): + self._csvFieldSeparator = ";" + self._txtSubFieldSeparator = ": " + self._txtFieldSeparator = "\n" + self._primerType = "Sequence" + self._csvLineSeparator = "\n" + self._txtLineSeparator = "//\n" + + def write(self, iMultifasta2SNPFile): + self._writeSubSNPFile(iMultifasta2SNPFile._outSubSNPFileName, iMultifasta2SNPFile._lSubSNPFileResults) + self._writeAlleleFile(iMultifasta2SNPFile._outAlleleFileName, iMultifasta2SNPFile._dAlleleFileResults) + self._writeIndividualFile(iMultifasta2SNPFile._outIndividualFileName, iMultifasta2SNPFile._lIndividualFileResults) + self._writeSequenceFiles(iMultifasta2SNPFile._outSequenceFSAFileName, iMultifasta2SNPFile._outSequenceCSVFileName, iMultifasta2SNPFile._lRefSequences, iMultifasta2SNPFile._taxon) + self._writeBatchFile(iMultifasta2SNPFile._outBatchFileName, iMultifasta2SNPFile._lBatchFileResults) + self._writeBatchLineFile(iMultifasta2SNPFile._outBatchLineFileName, iMultifasta2SNPFile._lBatchLineFileResults) + + def sortAlleleResultByAlleleNumber(self, dAlleleFileResults): + return sorted(dAlleleFileResults.items(), key=lambda(k,v):(v,k)) + + def _writeSubSNPFile(self, subSNPFileName, lSNP2Write): + outF = open(subSNPFileName, "w") + self._writeSNPFileHeader(outF) + for dSNP in lSNP2Write: + self._writeSNPFileLine(outF, dSNP) + outF.close() + + def _writeAlleleFile(self, alleleFileName, dAllele2Write): + outF = open(alleleFileName, "w") + self._writeAlleleFileHeader(outF) + lAlleleSortedResults = self.sortAlleleResultByAlleleNumber(dAllele2Write) + for tAllele in lAlleleSortedResults: + self._writeAlleleFileLine(outF, tAllele[0], tAllele[1]) + + outF.close() + + def _writeIndividualFile(self, individualFileName, lIndividual2Write): + sorted(lIndividual2Write, key=lambda Individual: (Individual['individualNumber'])) + outF = open(individualFileName, "w") + self._writeIndividualFileHeader(outF) + + for dIndiv in lIndividual2Write: + self._writeIndividualFileLine(outF, dIndiv) + + outF.close() + + def _writeSequenceFiles(self, sequenceFSAFileName, sequenceCSVFileName, lRefSequences, taxon): + outFSA = open(sequenceFSAFileName, "w") + outCSV = open(sequenceCSVFileName, "w") + self._writeSequenceCSVHeader(outCSV) + + for refSeq in lRefSequences: + refSeq.cleanGap() + self._writeSequenceFSAFile(outFSA, refSeq) + self._writeSequenceCSVLine(outCSV, refSeq, taxon) + + outFSA.close() + outCSV.close() + + def _writeSequenceFSAFile(self, outF, refSeq): + outF.write( ">%s\n" % ( refSeq.header ) ) + outF.write( "%s\n" % ( refSeq.sequence[0:refSeq.getLength()] ) ) + + + def _writeBatchFile(self, batchFileName, lBatchResults): + outF = open(batchFileName, "w") + for dBatchResults in lBatchResults: + for head in Multifasta2SNPFileWriter.BATCH_TXT_FILE_HEADER[:]: + if dBatchResults.has_key(head): + outF.write(head + self._txtSubFieldSeparator + str(dBatchResults[head]) + self._txtFieldSeparator) + else: + outF.write(head + self._txtSubFieldSeparator + self._txtFieldSeparator) + + outF.write(self._txtLineSeparator) + + outF.close() + + def _writeBatchLineFile(self, batchLineFileName, lBatchLineResults): + outF = open(batchLineFileName, "w") + self._writeBatchLineFileHeader(outF) + for dResult in lBatchLineResults: + self._writeBatchLineFileLine(outF, dResult) + outF.close() + + def _writeSNPFileHeader(self, outF): + for head in Multifasta2SNPFileWriter.SUB_SNP_FILE_HEADER[:-1]: + outF.write(head + self._csvFieldSeparator) + outF.write(Multifasta2SNPFileWriter.SUB_SNP_FILE_HEADER[-1] + self._csvLineSeparator) + + def _writeAlleleFileHeader(self, outF): + for head in Multifasta2SNPFileWriter.ALLELE_FILE_HEADER[:-1]: + outF.write(head + self._csvFieldSeparator) + outF.write(Multifasta2SNPFileWriter.ALLELE_FILE_HEADER[-1] + self._csvLineSeparator) + + def _writeIndividualFileHeader(self, outF): + for head in Multifasta2SNPFileWriter.INDIVIDUAL_FILE_HEADER[:-1]: + outF.write(head + self._csvFieldSeparator) + outF.write(Multifasta2SNPFileWriter.INDIVIDUAL_FILE_HEADER[-1] + self._csvLineSeparator) + + def _writeSequenceCSVHeader(self, outF): + for head in Multifasta2SNPFileWriter.SEQUENCE_CSV_FILE_HEADER[:-1]: + outF.write(head + self._csvFieldSeparator) + outF.write(Multifasta2SNPFileWriter.SEQUENCE_CSV_FILE_HEADER[-1] + self._csvLineSeparator) + + def _writeBatchLineFileHeader(self, outF): + for head in Multifasta2SNPFileWriter.BATCH_LINE_FILE_HEADER[:-1]: + outF.write(head + self._csvFieldSeparator) + outF.write(Multifasta2SNPFileWriter.BATCH_LINE_FILE_HEADER[-1] + self._csvLineSeparator) + + def _writeSNPFileLine(self, outF, dSNP): + outF.write(dSNP['subSNPName'] + self._csvFieldSeparator) + outF.write(dSNP['confidenceValue'] + self._csvFieldSeparator + dSNP['type'] + self._csvFieldSeparator) + outF.write(str(dSNP['position']) + self._csvFieldSeparator + dSNP['5flank'] + self._csvFieldSeparator + dSNP['3flank'] + self._csvFieldSeparator) + outF.write(str(dSNP['length']) + self._csvFieldSeparator + str(dSNP['batchNumber']) + self._csvFieldSeparator) + outF.write(str(dSNP['lineName']) + self._csvFieldSeparator) + outF.write(self._primerType + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator + str(dSNP['allele']) + self._csvLineSeparator) + + def _writeAlleleFileLine(self, outF, sAllele2Write, iAlleleNumber): + outF.write(str(iAlleleNumber) + self._csvFieldSeparator) + outF.write(sAllele2Write + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator + self._csvLineSeparator) + + def _writeIndividualFileLine(self, outF, dIndividual): + outF.write(str(dIndividual['individualNumber']) + self._csvFieldSeparator) + outF.write(dIndividual['individualName'] + self._csvFieldSeparator + self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator+ self._csvFieldSeparator) + outF.write(dIndividual['scientificName'] + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator+ self._csvFieldSeparator + self._csvFieldSeparator + self._csvLineSeparator) + + def _writeSequenceCSVLine(self, outF, refSeq, taxon): + outF.write(refSeq.header + self._csvFieldSeparator) + outF.write("Reference" + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator) + outF.write(taxon + self._csvLineSeparator) + + def _writeBatchLineFileLine(self, outF, dResult): + outF.write(str(dResult['IndividualNumber']) + self._csvFieldSeparator + self._csvFieldSeparator + self._csvFieldSeparator) + outF.write(str(dResult['BatchNumber']) + self._csvFieldSeparator + self._csvLineSeparator)