Mercurial > repos > yufei-luo > s_mart
diff commons/core/parsing/VarscanToVCF.py @ 18:94ab73e8a190
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:20:15 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/core/parsing/VarscanToVCF.py Mon Apr 29 03:20:15 2013 -0400 @@ -0,0 +1,152 @@ +#!/usr/bin/env python + +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +import math +from commons.core.LoggerFactory import LoggerFactory +from commons.core.utils.RepetOptionParser import RepetOptionParser +from commons.core.utils.FileUtils import FileUtils +from commons.core.parsing.VarscanFile import VarscanFile +from commons.core.seq.Bioseq import Bioseq + +LOG_DEPTH = "core.parsing" + +##Reference launcher implementation +# +class VarscanToVCF(object): + + def __init__(self, varscanFileName = "", vcfFileName = "", doClean = False, verbosity = 0): + self._varscanFileName = varscanFileName + self.setvcfFileName(vcfFileName) + self._doClean = doClean + self._verbosity = verbosity + + self._vcfRevision = "VCFv4.1" + self._vcfHeader = "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO" + + self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), self._verbosity) + + def setAttributesFromCmdLine(self): + description = "Conver Varscan file to VCF file." + epilog = "\t$ python VarscanToVCF.py -i varscanFileName -v 2" + parser = RepetOptionParser(description = description, epilog = epilog) + parser.add_option("-i", "--Varscan", dest = "varscanFileName", action = "store", type = "string", help = "input Varscan file name [compulsory] [format: varscan2.2.8]", default = "") + parser.add_option("-o", "--vcfFileName",dest = "vcfFileName", action = "store", type = "string", help = "vcfFileName file name [default: <input>.vcf]", default = "") + parser.add_option("-c", "--clean", dest = "doClean", action = "store_true", help = "clean temporary files [optional] [default: False]", default = False) + parser.add_option("-v", "--verbosity", dest = "verbosity", action = "store", type = "int", help = "verbosity [optional] [default: 1]", default = 1) + options = parser.parse_args()[0] + self._setAttributesFromOptions(options) + + def _setAttributesFromOptions(self, options): + self.setvarscanFileName(options.varscanFileName) + self.setvcfFileName(options.vcfFileName) + self.setDoClean(options.doClean) + self.setVerbosity(options.verbosity) + + def setvarscanFileName(self, varscanFileName): + self._varscanFileName = varscanFileName + + def setvcfFileName(self, vcfFileName): + if vcfFileName == "": + self._vcfFileName = "%s.vcf" % self._varscanFileName + else: + self._vcfFileName = vcfFileName + + def setDoClean(self, doClean): + self._doClean = doClean + + def setVerbosity(self, verbosity): + self._verbosity = verbosity + + def _checkOptions(self): + if self._varscanFileName == "": + self._logAndRaise("ERROR: Missing input file name") + else: + if not FileUtils.isRessourceExists(self._varscanFileName): + self._logAndRaise("ERROR: Input Varscan file '%s' does not exist!" % self._varscanFileName) + + def _logAndRaise(self, errorMsg): + self._log.error(errorMsg) + raise Exception(errorMsg) + + def _convertVarscanLineToVCFRecord(self, varscanLine, lineNumber): + iVarscanFile = VarscanFile() + iVarscanFile.setTypeOfVarscanFile("Varscan_2_2_8") + iVarscanHit = iVarscanFile.createVarscanObjectFromLine(varscanLine, lineNumber) + Chrom = iVarscanHit.getChrom() + Pos = int(iVarscanHit.getPosition()) + #ID = str(lineNumber) + ID = "." + Ref = iVarscanHit.getRef() + Alt = iVarscanHit.getVar() + Qual = -10*math.log10(float(iVarscanHit.getPValue())) + Filter = "." + AF = float(iVarscanHit.getVarFreq()[:-1])/100 + DP = int(iVarscanHit.getReadsRef()) + int(iVarscanHit.getReadsVar()) + RBQ = iVarscanHit.getQualRef() + ABQ = iVarscanHit.getQualVar() + #MQ = iVarscanHit.getMapQualRef() + Info = ";".join(["AF=%.4f" %AF,"DP=%d" %DP,"RBQ=%s" %RBQ, "ABQ=%s" %ABQ]) + + allel = Bioseq().getATGCNFromIUPACandATGCN(iVarscanHit.getCns(), Ref) + if allel != Alt: + self._log.warning("'VarAllele' attribute of Varscan file line '%d' was not correct. Correcting using '%s' instead of '%s'." % (lineNumber, allel, Alt)) + Alt = allel + + vcfLine = "%s\t%s\t%s\t%s\t%s\t%.9f\t%s\t%s\n" % (Chrom, Pos, ID, Ref, Alt, Qual, Filter, Info) + return vcfLine + + def run(self): + LoggerFactory.setLevel(self._log, self._verbosity) + self._checkOptions() + self._log.info("START Varscan To VCF") + self._log.debug("Input file name: %s" % self._varscanFileName) + + with open(self._vcfFileName, "w") as fVCF: + fVCF.write("##fileformat=%s\n" % self._vcfRevision) + fVCF.write("%s\n" % self._vcfHeader) + + with open(self._varscanFileName, "r") as fVarscan: + lineNumber = 1 + line = fVarscan.readline() + while line: + if line[0] != "#" and "Chrom\tPosition\tRef\tCons" not in line: + vcfLine = self._convertVarscanLineToVCFRecord(line, lineNumber) + fVCF.write(vcfLine) + line = fVarscan.readline() + lineNumber += 1 + + self._log.info("END Varscan To VCF") + +if __name__ == "__main__": + iLaunch = VarscanToVCF() + iLaunch.setAttributesFromCmdLine() + iLaunch.run() \ No newline at end of file