Mercurial > repos > yufei-luo > s_mart
diff commons/tools/AnnotationStats.py @ 18:94ab73e8a190
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:20:15 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/tools/AnnotationStats.py Mon Apr 29 03:20:15 2013 -0400 @@ -0,0 +1,374 @@ +#!/usr/bin/env python + +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +##@file +# Give summary information on a TE annotation table. +# options: +# -h: this help +# -t: analysis type (default = 1, 1: per transposable element (TE), 2: per cluster, 3: per classification, 4: with map input file) +# -p: name of the table (_path) or file (.path) with the annotated TE copies +# -s: name of the table (_seq) or file (.fasta or .fa) with the TE reference sequences +# -g: length of the genome (in bp) +# -m: name of the file with the group and the corresponding TE names (format = 'map') +# -o: name of the output file (default = pathTableName + '_stats.txt') +# -C: name of the configuration file to access MySQL (e.g. 'TEannot.cfg') +# -c: remove map files and blastclust file (if analysis type is 2 or 3) +# -I: identity coverage threshold (default = 0) +# -L: length coverage threshold (default=0.8) +# -v: verbosity level (default = 0) + +from commons.core.LoggerFactory import LoggerFactory +from commons.core.stat.Stat import Stat +from commons.core.sql.DbFactory import DbFactory +from commons.core.sql.TablePathAdaptator import TablePathAdaptator +from commons.core.sql.TableSeqAdaptator import TableSeqAdaptator +from commons.tools.getCumulLengthFromTEannot import getCumulLengthFromTEannot + +LOG_DEPTH = "repet.tools" + +#TODO: use templating engine instead of raw strings for AnnotationStatsWriter +class AnnotationStats( object ): + + def __init__(self, analysisName="TE", clusterFileName="",seqTableName="", pathTableName="", genomeLength=0, statsFileName="", globalStatsFileName="", verbosity=3): + self._analysisName = analysisName + self._clusterFileName = clusterFileName + self._seqTableName = seqTableName + self._pathTableName = pathTableName + self._genomeLength = genomeLength + self._statsFileName = statsFileName + self._globalStatsFileName = globalStatsFileName + self._iDb = None + self._iTablePathAdaptator = None + self._iTableSeqAdaptator = None + self._save = False + self._clean = False + self._verbosity = verbosity + self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), self._verbosity) + + def _logAndRaise(self, errorMsg): + self._log.error(errorMsg) + raise Exception(errorMsg) + + def setCoverageThreshold( self, lengthThresh ): + self._coverageThreshold = float(lengthThresh) + + def setIdentityThreshold( self, identityThresh ): + self._identityThreshold = int(identityThresh) + + def setAnalyseType(self, analyseType): + self._analyseType = str(analyseType) + + def setPathTableName(self, pathTableName): + self._pathTableName = pathTableName + + def setDBInstance(self, iDb): + self._iDb = iDb + + def setTablePathAdaptator(self, iTablePathAdaptator): + self._iTablePathAdaptator = iTablePathAdaptator + + def setTableSeqAdaptator(self, iTableSeqAdaptator): + self._iTableSeqAdaptator = iTableSeqAdaptator + + ## Get the coverage of TE copies for a given family (using 'mapOp') + # + # @param consensus string name of a TE family ('subject_name' in the 'path' table) + # @return cumulCoverage integer cumulative coverage + # + def getCumulCoverage( self, consensus = "" ): + gclft = getCumulLengthFromTEannot() + gclft.setInputTable( self._pathTableName ) + gclft.setTErefseq( consensus ) + gclft.setClean() + gclft._db = self._iDb + gclft._tpA = self._iTablePathAdaptator + mapFileName = gclft.getAllSubjectsAsMapOfQueries() + mergeFileName = gclft.mergeRanges( mapFileName ) + cumulCoverage = gclft.getCumulLength( mergeFileName ) #self._iTablePathAdaptator.getCumulPathLength_from_subject( consensus ) + return cumulCoverage + + ## Get the number of full-lengths (95% <= L =< 105%) + # + # @param consensusLength integer + # @param lLengths list of integers + # @return fullLengthConsensusNb integer + # + def getNbFullLengths( self, consensusLength, lLengths ): + fullLengthConsensusNb = 0 + for i in lLengths: + if i / float(consensusLength ) >= 0.95 and i / float(consensusLength ) <= 1.05: + fullLengthConsensusNb += 1 + return fullLengthConsensusNb + + def getStatPerTE(self, consensusName): + dConsensusStats = {} + lLengthPerFragment = self._iTablePathAdaptator.getPathLengthListFromSubject(consensusName) + lLengthPerCopy = self._iTablePathAdaptator.getChainLengthListFromSubject(consensusName) + lIdentityPerCopy = self._iTablePathAdaptator.getChainIdentityListFromSubject(consensusName) + dConsensusStats["length"] = self._iTableSeqAdaptator.getSeqLengthFromAccession(consensusName) + dConsensusStats["cumulCoverage"] = self.getCumulCoverage(consensusName) + dConsensusStats["nbFragments"] = len(lLengthPerFragment) + dConsensusStats["nbFullLengthFragments"] = self.getNbFullLengths(dConsensusStats["length"], lLengthPerFragment) + dConsensusStats["nbCopies"] = len(lLengthPerCopy) + dConsensusStats["nbFullLengthCopies"] = self.getNbFullLengths(dConsensusStats["length"], lLengthPerCopy) + dConsensusStats["statsIdentityPerChain"] = Stat() + dConsensusStats["statsLengthPerChain"] = Stat() + dConsensusStats["statsLengthPerChainPerc"] = Stat() + self._statsForIdentityAndLength(dConsensusStats, lLengthPerCopy, lIdentityPerCopy) + return dConsensusStats + + def getStatPerCluster(self, lConsensusNames): + dConsensusClusterStats = {} + lLengthPerFragment = [] + lLengthPerCopy = [] + cumulCoverageLength = 0 + for consensusName in lConsensusNames: + cumulCoverageLength += self.getCumulCoverage(consensusName) + lLengthPerFragment.extend(self._iTablePathAdaptator.getPathLengthListFromSubject(consensusName)) + lLengthPerCopy.extend(self._iTablePathAdaptator.getChainLengthListFromSubject(consensusName)) + dConsensusClusterStats["cumulCoverage"] = cumulCoverageLength + dConsensusClusterStats["nbFragments"] = len(lLengthPerFragment) + dConsensusClusterStats["nbCopies"] = len(lLengthPerCopy) + return dConsensusClusterStats + + def getClusterListFromFile(self): + lClusters = [] + with open(self._clusterFileName) as fCluster: + for line in fCluster: + lConsensusNames = line.rstrip().split("\t") + lClusters.append(lConsensusNames) + return lClusters + + def run(self): + LoggerFactory.setLevel(self._log, self._verbosity) + self._iDb = DbFactory.createInstance() + self._iTablePathAdaptator = TablePathAdaptator(self._iDb, self._pathTableName) + self._iTableSeqAdaptator = TableSeqAdaptator(self._iDb, self._seqTableName) + + iASW = AnnotationStatsWriter() + if self._analysisName == "TE": + with open(self._statsFileName, "w") as fStats: + string = "%s\tlength\tcovg\tfrags\tfullLgthFrags\tcopies\tfullLgthCopies\tmeanId\tmeanLgth\tmeanLgthPerc\n" % self._analysisName + fStats.write(string) + + lNamesTErefseq = self._iTableSeqAdaptator.getAccessionsList() + lDistinctSubjects = self._iTablePathAdaptator.getSubjectList() + totalCumulCoverage = self.getCumulCoverage() + + with open(self._globalStatsFileName, "w") as fG: + fG.write("%s\n" % iASW.printResume(lNamesTErefseq, lDistinctSubjects, totalCumulCoverage, self._genomeLength)) + for consensusName in lNamesTErefseq: + self._log.debug("processing '%s'..." % consensusName) + dStatForOneConsensus = self.getStatPerTE(consensusName) + iASW.addCalculsOfOneTE(dStatForOneConsensus) + fStats.write("%s\n" % iASW.getStatAsString(consensusName, dStatForOneConsensus)) + fG.write(iASW.printStatsForAllTEs(len(lNamesTErefseq))) + + elif self._analysisName == "Cluster": + lClusters = self.getClusterListFromFile() + lClusters.sort(key=lambda k: len(k), reverse=True) + with open(self._statsFileName, "w") as fStats: + string = "%s\tcovg\tfrags\tcopies\n" % self._analysisName + #TODO: add fullLgthFrags and fullLgthCopies ? Is addition of previous results significant ? + fStats.write(string) + for index, lConsensus in enumerate(lClusters): + self._log.debug("processing '%s'..." % lConsensus) + dStatForOneCluster = self.getStatPerCluster(lConsensus) + fStats.write("%s\n" % iASW.getStatAsStringForCluster(str(index + 1), dStatForOneCluster)) + + if self._save: + outTableName = "%s_statsPer%s" % (self._pathTableName, self._analysisName) + self._iDb.createTable(outTableName, "pathstat", self._statsFileName) + + self._iDb.close() + self._log.info("END %s" % type(self).__name__) + + def _statsForIdentityAndLength(self, dStat, lLengthPerCopy, lIdentityPerCopy): + for i in lIdentityPerCopy: + dStat["statsIdentityPerChain"].add(i) + lLengthPercPerCopy = [] + for i in lLengthPerCopy: + dStat["statsLengthPerChain"].add(i) + lperc = 100 * i / float(dStat["length"]) + lLengthPercPerCopy.append(lperc) + dStat["statsLengthPerChainPerc"].add(lperc) + +class AnnotationStatsWriter(object): + + def __init__(self): + self._dAllTErefseqs = { "sumCumulCoverage": 0, + "totalNbFragments": 0, + "totalNbFullLengthFragments": 0, + "totalNbCopies": 0, + "totalNbFullLengthCopies": 0, + "nbFamWithFullLengthFragments": 0, + "nbFamWithOneFullLengthFragment": 0, + "nbFamWithTwoFullLengthFragments": 0, + "nbFamWithThreeFullLengthFragments": 0, + "nbFamWithMoreThanThreeFullLengthFragments": 0, + "nbFamWithFullLengthCopies": 0, + "nbFamWithOneFullLengthCopy": 0, + "nbFamWithTwoFullLengthCopies": 0, + "nbFamWithThreeFullLengthCopies": 0, + "nbFamWithMoreThanThreeFullLengthCopies": 0, + "statsAllCopiesMedIdentity": Stat(), + "statsAllCopiesMedLengthPerc": Stat() + } + + def getAllTEsRefSeqDict(self): + return self._dAllTErefseqs + + def getStatAsString( self, name, d ): + """ + Return a string with all data properly formatted. + """ + string = "" + string += "%s" % name + string += "\t%i" % d["length"] + string += "\t%i" % d["cumulCoverage"] + string += "\t%i" % d["nbFragments"] + string += "\t%i" % d["nbFullLengthFragments"] + string += "\t%i" % d["nbCopies"] + string += "\t%i" % d["nbFullLengthCopies"] + + if d["statsIdentityPerChain"].getValuesNumber() != 0: + string += "\t%.2f" % d["statsIdentityPerChain"].mean() + else: + string += "\tNA" + + if d["statsLengthPerChain"].getValuesNumber() != 0: + string += "\t%.2f" % d["statsLengthPerChain"].mean() + else: + string += "\tNA" + + if d["statsLengthPerChainPerc"].getValuesNumber() != 0: + string += "\t%.2f" % d["statsLengthPerChainPerc"].mean() + else: + string += "\tNA" + + return string + + def getStatAsStringForCluster( self, name, d ): + """ + Return a string with all data properly formatted. + """ + string = "" + string += "%s" % name + string += "\t%i" % d["cumulCoverage"] + string += "\t%i" % d["nbFragments"] + string += "\t%i" % d["nbCopies"] + + return string + + def addCalculsOfOneTE(self, dOneTErefseq): + self._dAllTErefseqs[ "sumCumulCoverage" ] += dOneTErefseq[ "cumulCoverage" ] + + self._dAllTErefseqs[ "totalNbFragments" ] += dOneTErefseq[ "nbFragments" ] + self._dAllTErefseqs[ "totalNbFullLengthFragments" ] += dOneTErefseq[ "nbFullLengthFragments" ] + if dOneTErefseq[ "nbFullLengthFragments" ] > 0: + self._dAllTErefseqs[ "nbFamWithFullLengthFragments" ] += 1 + if dOneTErefseq[ "nbFullLengthFragments" ] == 1: + self._dAllTErefseqs[ "nbFamWithOneFullLengthFragment" ] += 1 + elif dOneTErefseq[ "nbFullLengthFragments" ] == 2: + self._dAllTErefseqs[ "nbFamWithTwoFullLengthFragments" ] += 1 + elif dOneTErefseq[ "nbFullLengthFragments" ] == 3: + self._dAllTErefseqs[ "nbFamWithThreeFullLengthFragments" ] += 1 + elif dOneTErefseq[ "nbFullLengthFragments" ] > 3: + self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthFragments" ] += 1 + + self._dAllTErefseqs[ "totalNbCopies" ] += dOneTErefseq[ "nbCopies" ] + self._dAllTErefseqs[ "totalNbFullLengthCopies" ] += dOneTErefseq[ "nbFullLengthCopies" ] + if dOneTErefseq[ "nbFullLengthCopies" ] > 0: + self._dAllTErefseqs[ "nbFamWithFullLengthCopies" ] += 1 + if dOneTErefseq[ "nbFullLengthCopies" ] == 1: + self._dAllTErefseqs[ "nbFamWithOneFullLengthCopy" ] += 1 + elif dOneTErefseq[ "nbFullLengthCopies" ] == 2: + self._dAllTErefseqs[ "nbFamWithTwoFullLengthCopies" ] += 1 + elif dOneTErefseq[ "nbFullLengthCopies" ] == 3: + self._dAllTErefseqs[ "nbFamWithThreeFullLengthCopies" ] += 1 + elif dOneTErefseq[ "nbFullLengthCopies" ] > 3: + self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthCopies" ] += 1 + + if dOneTErefseq[ "statsIdentityPerChain" ].getValuesNumber() != 0: + self._dAllTErefseqs[ "statsAllCopiesMedIdentity" ].add( dOneTErefseq[ "statsIdentityPerChain" ].median() ) + + if dOneTErefseq[ "statsLengthPerChainPerc" ].getValuesNumber() != 0: + self._dAllTErefseqs[ "statsAllCopiesMedLengthPerc" ].add( dOneTErefseq[ "statsLengthPerChainPerc" ].median() ) + + def printStatsForAllTEs(self, TEnb): +# statString += "(sum of cumulative coverages: %i bp)" % ( self._dAllTErefseqs[ "sumCumulCoverage" ] ) + statString = "total nb of TE fragments: %i\n" % ( self._dAllTErefseqs[ "totalNbFragments" ] ) + + if self._dAllTErefseqs[ "totalNbFragments" ] != 0: + + statString += "total nb full-length fragments: %i (%.2f%%)\n" % \ + ( self._dAllTErefseqs[ "totalNbFullLengthFragments" ], \ + 100*self._dAllTErefseqs[ "totalNbFullLengthFragments" ] / float(self._dAllTErefseqs[ "totalNbFragments" ]) ) + + statString += "total nb of TE copies: %i\n" % ( self._dAllTErefseqs[ "totalNbCopies" ] ) + + statString += "total nb full-length copies: %i (%.2f%%)\n" % \ + ( self._dAllTErefseqs[ "totalNbFullLengthCopies" ], \ + 100*self._dAllTErefseqs[ "totalNbFullLengthCopies" ] / float(self._dAllTErefseqs[ "totalNbCopies" ]) ) + + statString += "families with full-length fragments: %i (%.2f%%)\n" % \ + ( self._dAllTErefseqs[ "nbFamWithFullLengthFragments" ], \ + 100*self._dAllTErefseqs[ "nbFamWithFullLengthFragments" ] / float(TEnb) ) + statString += " with only one full-length fragment: %i\n" % ( self._dAllTErefseqs[ "nbFamWithOneFullLengthFragment" ] ) + statString += " with only two full-length fragments: %i\n" % ( self._dAllTErefseqs[ "nbFamWithTwoFullLengthFragments" ] ) + statString += " with only three full-length fragments: %i\n" % ( self._dAllTErefseqs[ "nbFamWithThreeFullLengthFragments" ] ) + statString += " with more than three full-length fragments: %i\n" % ( self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthFragments" ] ) + + statString += "families with full-length copies: %i (%.2f%%)\n" % \ + ( self._dAllTErefseqs[ "nbFamWithFullLengthCopies" ], \ + 100*self._dAllTErefseqs[ "nbFamWithFullLengthCopies" ] / float(TEnb) ) + statString += " with only one full-length copy: %i\n" % ( self._dAllTErefseqs[ "nbFamWithOneFullLengthCopy" ] ) + statString += " with only two full-length copies: %i\n" % ( self._dAllTErefseqs[ "nbFamWithTwoFullLengthCopies" ] ) + statString += " with only three full-length copies: %i\n" % ( self._dAllTErefseqs[ "nbFamWithThreeFullLengthCopies" ] ) + statString += " with more than three full-length copies: %i\n" % ( self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthCopies" ] ) + + statString += "mean of median identity of all families: %.2f +- %.2f\n" % \ + ( self._dAllTErefseqs[ "statsAllCopiesMedIdentity" ].mean(), \ + self._dAllTErefseqs[ "statsAllCopiesMedIdentity" ].sd() ) + + statString += "mean of median length percentage of all families: %.2f +- %.2f\n" % \ + ( self._dAllTErefseqs[ "statsAllCopiesMedLengthPerc" ].mean(), \ + self._dAllTErefseqs[ "statsAllCopiesMedLengthPerc" ].sd() ) + return statString + + def printResume(self, lNamesTErefseq, lDistinctSubjects, totalCumulCoverage, genomeLength): + statString = "nb of sequences: %i\n" % len(lNamesTErefseq) + statString += "nb of matched sequences: %i\n" % len(lDistinctSubjects) + statString += "cumulative coverage: %i bp\n" % totalCumulCoverage + statString += "coverage percentage: %.2f%%\n" % ( 100 * totalCumulCoverage / float(genomeLength) ) +# statString += "processing the %i TE families..." % len(lNamesTErefseq) + return statString