Mercurial > repos > yufei-luo > s_mart
view commons/tools/GFF3Maker.py @ 19:9bcfa7936eec
Deleted selected files
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:23:29 -0400 |
parents | 94ab73e8a190 |
children |
line wrap: on
line source
#!/usr/bin/env python ##@file GFF3Maker.py # Copyright INRA (Institut National de la Recherche Agronomique) # http://www.inra.fr # http://urgi.versailles.inra.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. from commons.core.utils.RepetOptionParser import RepetOptionParser from commons.core.utils.FileUtils import FileUtils from commons.core.sql.DbFactory import DbFactory from commons.core.sql.TableSeqAdaptator import TableSeqAdaptator from commons.core.sql.TablePathAdaptator import TablePathAdaptator import sys import os ## GFF3Maker exports annotations from a 'path' table into a GFF3 file. # class GFF3Maker(object): def __init__(self, inFastaName = "", tablesFileName = "", classifTableName = "", isChado = False, isGFF3WithoutAnnotation = False, isWithSequence = False, areMatchPartsCompulsory = False, configFileName = "", verbose = 0, doMergeIdenticalMatches = False, doSplit = False): self._inFastaName = inFastaName self._tablesFileName = tablesFileName self._classifTableName = classifTableName self._isChado = isChado self._isGFF3WithoutAnnotation = isGFF3WithoutAnnotation self._isWithSequence = isWithSequence self._areMatchPartsCompulsory = areMatchPartsCompulsory self._configFileName = configFileName self._doMergeIdenticalMatches = doMergeIdenticalMatches self._doSplit = doSplit self._iDB = None self._verbose = verbose def setAttributesFromCmdLine(self): description = "GFF3Maker exports annotations from 'path', 'set' and/or 'classif' tables into a GFF3 file\n" parser = RepetOptionParser(description = description) parser.add_option("-f", "--inseq", dest = "inFastaName", action = "store", type = "string", help = "'seq' table recording the input sequences", default = "") parser.add_option("-t", "--tablesfile", dest = "tablesFileName", action = "store", type = "string", help = "tabulated file of table name to use to create the gff3 files (fields: tier name, format, table name)", default = "") parser.add_option("-w", "--withSequence", dest = "isWithSequence", action = "store_true", help = "write the sequence at the end of GFF3 file", default = False) parser.add_option("-a", "--withoutAnnotation", dest = "isGFF3WithoutAnnotation", action = "store_true", help = "write GFF3 files even if no annotation", default = False) parser.add_option("-p", "--matchPart", dest = "areMatchPartsCompulsory", action = "store_true", help = "always associate a match_part to a match", default = False) parser.add_option("-i", "--classifTable", dest = "classifTable", action = "store", type = "string", help = "name of the TE library classification table [optional]", default = "") parser.add_option("-c", "--chado", dest = "isChado", action = "store_true", help = "Chado compliance", default = False) parser.add_option("-m", "--doMergeIdenticalMatches", dest = "doMergeIdenticalMatches", action = "store_true", help = "merge identical matches based on query start, query end, score", default = False) parser.add_option("-s", "--doSplit", dest = "doSplit", action = "store_true", help = "split each GFF3 per annotation type", default = False) parser.add_option("-C", "--config", dest = "configFileName", action = "store", type = "string", help = "configuration file for database connection", default = "") parser.add_option("-v", "--verbose", dest = "verbose", action = "store", type = "int", help = "verbosity level (default=0, else 1 or 2)", default = 0) options = parser.parse_args()[0] self._setAttributesFromOptions(options) #TODO: write a "setAttributesFromConfig" def _setAttributesFromOptions(self, options): self.setInFastaName(options.inFastaName) self.setTablesFileName(options.tablesFileName) self.setClassifTable(options.classifTable) self.setIsChado(options.isChado) self.setDoMergeIdenticalMatches(options.doMergeIdenticalMatches) self.setIsWithSequence(options.isWithSequence) self.setIsGFF3WithoutAnnotation(options.isGFF3WithoutAnnotation) self.setAreMatchPartCompulsory(options.areMatchPartsCompulsory) self.setDoSplit(options.doSplit) self.setConfigFileName(options.configFileName) self.setVerbose(options.verbose) def setInFastaName(self, inFastaName): self._inFastaName = inFastaName def setTablesFileName(self, tablesFileName): self._tablesFileName = tablesFileName def setIsWithSequence(self, isWithSequence): self._isWithSequence = isWithSequence def setIsGFF3WithoutAnnotation(self, isGFF3WithoutAnnotation): self._isGFF3WithoutAnnotation = isGFF3WithoutAnnotation def setAreMatchPartCompulsory(self, areMatchPartsCompulsory): self._areMatchPartsCompulsory = areMatchPartsCompulsory def setClassifTable(self, classifTable): self._classifTableName = classifTable def setIsChado(self, isChado): self._isChado = isChado def setDoMergeIdenticalMatches(self, doMergeIdenticalMatches): self._doMergeIdenticalMatches = doMergeIdenticalMatches def setDoSplit(self, doSplit): self._doSplit = doSplit def setConfigFileName(self, configFileName): self._configFileName = configFileName def setVerbose(self, verbose): self._verbose = verbose def checkOptions(self): if self._inFastaName == "": raise Exception("ERROR: options -f required") if self._configFileName != "": if not FileUtils.isRessourceExists(self._configFileName): raise Exception("ERROR: configuration file does not exist!") if self._classifTableName and not self._iDB.doesTableExist(self._classifTableName): raise Exception("ERROR: classification table '%s' does not exist!" % self._classifTableName) ## Retrieve the features to write in the GFF3 file. # # @param pathTable string name of the table recording the annotations (i.e. the features) # @param seqName string name of the sequence (the source feature) on which we want to visualize the matches (the features) # @param source string the program that generated the feature (i.e. REPET) # @param frame string "." by default (or "+", "-") # @return pathString string which will be printed in path file # def _getPathFeatures(self, pathTable, seqTable, seqName, source, feature, frame): pathString = "" iTPA = TablePathAdaptator(self._iDB, pathTable) lPaths = iTPA.getPathListSortedByQueryCoordAndScoreFromQuery(seqName) # organise them into 'match' and 'match_part' if lPaths: dPathID2Data = self._gatherSamePathFeatures(lPaths) # build the output string for pathID in dPathID2Data: pathString += self._organizeEachPathFeature(pathID, dPathID2Data[pathID], seqName, source, frame, seqTable) return pathString ## Gather matches with the same path ID. # # @param data list of string lists results of a SQL request # @return dPathID2Matchs dict whose keys are path IDs and values are matches data # def _gatherSamePathFeatures(self, lPaths): dPathID2Matchs = {} iPreviousPath = lPaths[0] iPreviousPath.otherTargets = [] for iPath in lPaths[1:]: if self._doMergeIdenticalMatches and iPreviousPath.getQueryStart() == iPath.getQueryStart() and iPreviousPath.getQueryEnd() == iPath.getQueryEnd() and iPreviousPath.getScore() == iPath.getScore() and iPreviousPath.getSubjectName() != iPath.getSubjectName(): iPreviousPath.otherTargets.append("%s %d %d" % (iPath.getSubjectName(), iPath.getSubjectStart(), iPath.getSubjectEnd())) else: self._addPathToMatchDict(dPathID2Matchs, iPreviousPath) iPreviousPath = iPath iPreviousPath.otherTargets = [] self._addPathToMatchDict(dPathID2Matchs, iPreviousPath) return dPathID2Matchs def _getClassifEvidenceBySeqName(self, seqName): lseqName = seqName.split('_') seqNameStructure = '%s_%%' % lseqName[0] if lseqName[-1] == 'reversed': seqNameStructure += '%s_%s' % (lseqName[-2],lseqName[-1]) else: seqNameStructure += lseqName[-1] qry = "SELECT evidence FROM %s WHERE seq_name like \"%s\"" % (self._classifTableName, seqNameStructure) self._iDB.execute(qry) result = "" try: result = "".join(self._iDB.fetchall()[0]) except: pass return result def _addPathToMatchDict(self, dPathID2Matchs, iPreviousPath): pathId = iPreviousPath.getIdentifier() subjectStart = iPreviousPath.getSubjectStart() subjectEnd = iPreviousPath.getSubjectEnd() strand = iPreviousPath.getSubjectStrand() if subjectStart > subjectEnd: tmp = subjectStart subjectStart = subjectEnd subjectEnd = tmp queryStart = iPreviousPath.getQueryStart() queryEnd = iPreviousPath.getQueryEnd() subjectName = iPreviousPath.getSubjectName() eValue = iPreviousPath.getEvalue() identity = iPreviousPath.getIdentity() otherTargets = iPreviousPath.otherTargets if dPathID2Matchs.has_key(pathId): dPathID2Matchs[pathId].append([queryStart, queryEnd, strand, subjectName, subjectStart, subjectEnd, eValue, identity, otherTargets]) else: dPathID2Matchs[pathId] = [[queryStart, queryEnd, strand, subjectName, subjectStart, subjectEnd, eValue, identity, otherTargets]] def _getConsensusLengthByTargetName(self, targetName, seqTableName): iTableSeqAdaptator = TableSeqAdaptator(self._iDB, seqTableName) return iTableSeqAdaptator.getSeqLengthFromDescription(targetName) ## For a specific path ID, organize match data according to the GFF3 format. # # @param pathID string path ID # @param lMatches match list # @param seqName string name of the source feature # @param source string 'source' field for GFF3 format # @param frame string 'frame' field for GFF3 format # @return lines string to write in the GFF3 file # def _organizeEachPathFeature(self, pathID, lMatches, seqName, source, frame, seqTable = ""): lines = "" minStart = lMatches[0][0] maxEnd = lMatches[0][1] minStartSubject = lMatches[0][4] maxEndSubject = lMatches[0][5] strand = lMatches[0][2] # for each match for i in lMatches: if i[0] < minStart: minStart = i[0] if i[1] > maxEnd: maxEnd = i[1] if i[4] < minStartSubject: minStartSubject = i[4] if i[5] > maxEndSubject: maxEndSubject = i[5] target = lMatches[0][3] targetDescTag = "" if self._classifTableName != "": targetDescTag = self._getClassifEvidenceBySeqName(target) if targetDescTag != "": targetDescTag = ";TargetDescription=%s" % targetDescTag.replace('=', ':').replace(';', '').replace(',', ' |') targetLengthTag = "" if seqTable != "": targetLengthTag = self._getConsensusLengthByTargetName(target, seqTable) targetLengthTag = ";TargetLength=%i" % targetLengthTag attributes = "ID=ms%s_%s_%s" % (pathID, seqName, target) if self._isChado: attributes += ";Target=%s+%s+%s" % (target, minStartSubject, maxEndSubject) else: attributes += ";Target=%s %s %s" % (target, minStartSubject, maxEndSubject) if lMatches[0][8]: otherTargets = ", ".join(lMatches[0][8]) attributes += ";OtherTargets=%s" % otherTargets else: attributes += targetLengthTag attributes += targetDescTag if len(lMatches) == 1: attributes += ";Identity=%s" % lMatches[0][7] lines += "%s\t%s\tmatch\t%s\t%s\t0.0\t%s\t%s\t%s\n" % (seqName, source, minStart, maxEnd, strand, frame, attributes) if len(lMatches) > 1 or self._areMatchPartsCompulsory: count = 1 for i in lMatches: attributes = "ID=mp%s-%i_%s_%s" % (pathID, count, seqName, target) attributes += ";Parent=ms%s%s%s%s%s" % (pathID, "_", seqName, "_", target) if self._isChado: attributes += ";Target=%s+%s+%s" % (target, i[4], i[5]) else: attributes += ";Target=%s %s %s" % (target, i[4], i[5]) if not i[8]: attributes += ";Identity=%s" % i[7] lines += "%s\t%s\tmatch_part\t%s\t%s\t%s\t%s\t%s\t%s\n" % (seqName, source, i[0], i[1], i[6], i[2], frame, attributes) count += 1 return lines ## Retrieve the features to write in the GFF3 file. # # @param table string name of the table recording the annotations (i.e. the features) # @param key string name of the sequence (the source feature) on which we want to visualize the matches (the features) # @param source string the program that generated the feature (i.e. REPET) # @param frame string "." by default (or "+", "-") # @return setString string which will be printed in set file # def _getSetFeatures(self, table, key, source, feature, frame): setString = "" # retrieve all the data about the matches qry = "SELECT DISTINCT path,name,start,end FROM %s WHERE chr=\"%s\"" % (table, key) self._iDB.execute(qry) data = self._iDB.fetchall() # organise them into 'match' and 'match_part' dPathID2Data = self._gatherSameSetFeatures(data) # build the output string for pathID in dPathID2Data: setString += self._organizeEachSetFeature(pathID, dPathID2Data[pathID], key, source, frame) return setString ## Gather matches with the same path ID. # # @param data list of string lists results of a SQL request # @return dSetID2Matchs dict whose keys are set IDs and values are matches data # def _gatherSameSetFeatures(self, data): dSetID2Matchs = {} for i in data: setID = i[0] name = i[1] start = i[2] end = i[3] matchStart = min(start, end) matchEnd = max(start, end) strand = self._getStrand(start, end) if dSetID2Matchs.has_key(setID): dSetID2Matchs[setID].append([name, matchStart, matchEnd, strand]) else: dSetID2Matchs[setID] = [[name, matchStart, matchEnd, strand]] return dSetID2Matchs ## For a specific set ID, organize match data according to the GFF3 format. # # @param setID string path ID # @param lMatches match list # @param seqName string name of the source feature # @param source string 'source' field for GFF3 format # @param frame string 'frame' field for GFF3 format # @return lines string to write in the GFF3 file # def _organizeEachSetFeature(self, setID, lMatches, seqName, source, frame): lines = "" minStart = min(lMatches[0][1], lMatches[0][2]) maxEnd = max(lMatches[0][1], lMatches[0][2]) strand = lMatches[0][3] # for each match for i in lMatches: start = min(i[1],i[2]) if start < minStart: minStart = start end = max(i[1],i[2]) if end > maxEnd: maxEnd = end target = lMatches[0][0].replace("(","").replace(")","").replace("#","") attributes = "ID=ms%s_%s_%s" % (setID, seqName, target) if self._isChado: attributes += ";Target=%s+%s+%s" % (target, "1", abs(minStart-maxEnd)+1) else: attributes += ";Target=%s %s %s" % (target, "1", abs(minStart-maxEnd)+1) lines += "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n" % (seqName, source, "match", minStart, maxEnd, "0.0", strand, frame, attributes) if len(lMatches) > 1 or self._areMatchPartsCompulsory: count = 1 for i in lMatches: attributes = "ID=mp%s-%i_%s_%s" % (setID, count, seqName, target) attributes += ";Parent=ms%s%s%s%s%s" % (setID, "_", seqName, "_", target) if self._isChado: attributes += ";Target=%s+%s+%s" % (target, "1", abs(i[1]-i[2])+1) else: attributes += ";Target=%s %s %s" % (target, "1", abs(i[1]-i[2])+1) lines += "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n" % (seqName, source, "match_part", i[1], i[2], "0.0", i[3], frame, attributes) count += 1 return lines ## Return the strand ('+' if start < end, '-' otherwise). # # @param start integer start coordinate # @param end integer end coordinate # @return strand string "+" or "-" # def _getStrand(self, start, end): if start < end: return "+" else: return "-" def run(self): #TODO: cat all gff in one gff file in option if self._configFileName == "": self._iDB = DbFactory.createInstance() else: self._iDB = DbFactory.createInstance(self._configFileName) self.checkOptions() if self._verbose > 0: print "START GFF3Maker" sys.stdout.flush() tablesFile = open(self._tablesFileName, "r") linesFromAnnotationTablesFile = tablesFile.readlines() tablesFile.close() feature = "region" frame = "." iTSA = TableSeqAdaptator(self._iDB, self._inFastaName) lTuples = iTSA.getAccessionAndLengthList() for seqName, length in lTuples : if not self._doSplit: fileName = "%s.gff3" % seqName outFile = open(fileName, "w") outFile.write("##gff-version 3\n") outFile.write("##sequence-region %s 1 %s\n" % (seqName, length)) for line in linesFromAnnotationTablesFile: if line[0] == "#": continue tok = line.split() if len(tok) == 0: break source = tok[0] format = tok[1] table = tok[2] tableseq = "" if len(tok) == 4: tableseq = tok[3] if format == 'path' : annotations = self._getPathFeatures(table, tableseq, seqName, source, feature, frame) elif format == 'set' : annotations = self._getSetFeatures(table, seqName, source, feature, frame) else: raise Exception("Wrong format : %s" % format) outFile.write(annotations) outFile.close() #TODO: check getNbLinesInSingleFile() to handle big files if not self._isGFF3WithoutAnnotation and FileUtils.getNbLinesInSingleFile(fileName) == 2: os.remove(fileName) elif self._isWithSequence: outFile = open(fileName, "a") outFile.write("##FASTA\n") iBioseq = iTSA.getBioseqFromHeader(seqName) iBioseq.write(outFile) outFile.close() else: count = 1 for line in linesFromAnnotationTablesFile: if line[0] == "#": continue tok = line.split() if len(tok) == 0: break source = tok[0] format = tok[1] table = tok[2] tableseq = "" if len(tok) == 4: tableseq = tok[3] fileName = "%s_Annot%i.gff3" % (seqName, count) outFile = open(fileName, "w") outFile.write("##gff-version 3\n") outFile.write("##sequence-region %s 1 %s\n" % (seqName, length)) if format == 'path' : annotations = self._getPathFeatures(table, tableseq, seqName, source, feature, frame) elif format == 'set' : annotations = self._getSetFeatures(table, seqName, source, feature, frame) else: raise Exception("Wrong format : %s" % format) outFile.write(annotations) outFile.close() #TODO: check getNbLinesInSingleFile() to handle big files if not self._isGFF3WithoutAnnotation and FileUtils.getNbLinesInSingleFile(fileName) == 2: os.remove(fileName) elif self._isWithSequence: outFile = open(fileName, "a") outFile.write("##FASTA\n") iBioseq = iTSA.getBioseqFromHeader(seqName) iBioseq.write(outFile) outFile.close() count += 1 self._iDB.close() if self._verbose > 0: print "END GFF3Maker" sys.stdout.flush() if __name__ == "__main__": iGFF3Maker = GFF3Maker() iGFF3Maker.setAttributesFromCmdLine() iGFF3Maker.run()