Mercurial > repos > urgi-team > teiso
diff TEisotools-1.1.a/commons/core/seq/SequenceModificationsCollection.py @ 16:836ce3d9d47a draft default tip
Uploaded
| author | urgi-team |
|---|---|
| date | Thu, 21 Jul 2016 07:42:47 -0400 |
| parents | 255c852351c5 |
| children |
line wrap: on
line diff
--- a/TEisotools-1.1.a/commons/core/seq/SequenceModificationsCollection.py Thu Jul 21 07:36:44 2016 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,312 +0,0 @@ -#!/usr/bin/env python - -# Copyright INRA (Institut National de la Recherche Agronomique) -# http://www.inra.fr -# http://urgi.versailles.inra.fr -# -# This software is governed by the CeCILL license under French law and -# abiding by the rules of distribution of free software. You can use, -# modify and/ or redistribute the software under the terms of the CeCILL -# license as circulated by CEA, CNRS and INRIA at the following URL -# "http://www.cecill.info". -# -# As a counterpart to the access to the source code and rights to copy, -# modify and redistribute granted by the license, users are provided only -# with a limited warranty and the software's author, the holder of the -# economic rights, and the successive licensors have only limited -# liability. -# -# In this respect, the user's attention is drawn to the risks associated -# with loading, using, modifying and/or developing or reproducing the -# software by the user in light of its specific status of free software, -# that may mean that it is complicated to manipulate, and that also -# therefore means that it is reserved for developers and experienced -# professionals having in-depth computer knowledge. Users are therefore -# encouraged to load and test the software's suitability as regards their -# requirements in conditions enabling the security of their systems and/or -# data to be ensured and, more generally, to use and operate it in the -# same conditions as regards security. -# -# The fact that you are presently reading this means that you have had -# knowledge of the CeCILL license and that you accept its terms. - -import os -import time -import shutil -from commons.core.seq.BioseqDB import BioseqDB -from commons.core.seq.SequenceModifications import SequenceModifications -from commons.core.checker.RepetException import RepetException - -class SequenceModificationsCollection(object): - - def __init__(self): - self._lSeqModif = [] - - def __str__(self): - result = "" - for iSeqModif in self._lSeqModif: - result += "%s\n" % iSeqModif.__str__() - return result - - def __eq__(self, o): - if type(o) is type(self): - self.sort() - o.sort() - return self._lSeqModif == o._lSeqModif - return False - - def __ne__(self, o): - return not self.__eq__(o) - - def clear(self): - self._lSeqModif = [] - - def add(self, iSeqModif, override = False): - for seqModif in self._lSeqModif: - if seqModif.getOriginalHeader() == iSeqModif.getOriginalHeader(): - if override: - self._lSeqModif.pop(self._lSeqModif.index(seqModif)) - else: - raise RepetException("ERROR: '%s' already in SequenceModificationsCollection" % iSeqModif.getOriginalHeader()) - - self._lSeqModif.append(iSeqModif) - - def get(self, header, mutated = False): - for iSeqModif in self._lSeqModif: - if mutated: - linkToGoodMethod = iSeqModif.getMutatedHeader - else: - linkToGoodMethod = iSeqModif.getOriginalHeader - - if linkToGoodMethod() == header: - return iSeqModif - return None - - def getHeadersList(self, mutated = False): - lHeaders = [] - if mutated: - for iSeqModif in self._lSeqModif: - lHeaders.append(iSeqModif.getMutatedHeader()) - else: - for iSeqModif in self._lSeqModif: - lHeaders.append(iSeqModif.getOriginalHeader()) - lHeaders.sort(key = lambda header: header.lower()) - return lHeaders - - def sort(self): - self._lSeqModif.sort(key = lambda seqMod: seqMod.getOriginalHeader().lower(), reverse = False) - - def writeMutations(self, fileName, outFormat = ""): - self.sort() - with open(fileName, "w") as fH: - if outFormat.lower() in ["gff", "gff3"]: - fH.write("##gff-version 3\n") - for iSeqModif in self._lSeqModif: - for mutation in iSeqModif.getMutations(): - pos = mutation[0] - old = mutation[1] - new = mutation[2] - fH.write("%s\tMutateSequence\tSNP\t%i\t%i\t.\t.\t.\tName=SNP_%i;REF=%s;ALT=%s\n" % (iSeqModif.getOriginalHeader(), pos, pos, pos, old, new)) - else: - fH.write("#Mutations:\n") - fH.write("seqName\tposition\toldNt\tnewNt\n") - for iSeqModif in self._lSeqModif: - for mutation in iSeqModif.getMutations(): - fH.write("%s\t%i\t%s\t%s\n" % (iSeqModif.getOriginalHeader(), mutation[0], mutation[1], mutation[2])) - - def writeInsertions(self, fileName, outFormat = ""): - self.sort() - with open(fileName, "w") as fH: - if outFormat.lower() in ["gff", "gff3"]: - fH.write("##gff-version 3\n") - for iSeqModif in self._lSeqModif: - for iRange in iSeqModif.getInsertions(): - if iRange.getSeqname() != ".": - fH.write("%s\tMutateSequence\tinsertion\t%s\t%s\t.\t.\t.\tName=insertion_%s-%s;insert=%s\n" % (iSeqModif.getOriginalHeader(), iRange.getStart(), iRange.getEnd(), iRange.getStart(), iRange.getEnd(), iRange.getSeqname())) - else: - fH.write("%s\tMutateSequence\tinsertion\t%s\t%s\t.\t.\t.\tName=insertion_%s-%s\n" % (iSeqModif.getOriginalHeader(), iRange.getStart(), iRange.getEnd(), iRange.getStart(), iRange.getEnd())) - else: - fH.write("#Insertions:\n") - fH.write("seqName\tstart\tend\tinsertedSeqName\n") - for iSeqModif in self._lSeqModif: - for iRange in iSeqModif.getInsertions(): - fH.write("%s\t%i\t%i\t%s\n" % (iSeqModif.getOriginalHeader(), iRange.getStart(), iRange.getEnd(), iRange.getSeqname())) - - def writeDeletions(self, fileName, outFormat = ""): - self.sort() - with open(fileName, "w") as fH: - if outFormat.lower() in ["gff", "gff3"]: - fH.write("##gff-version 3\n") - for iSeqModif in self._lSeqModif: - for iRange in iSeqModif.getDeletions(): - fH.write("%s\tMutateSequence\tdeletion\t%s\t%s\t.\t.\t.\tName=deletion_%s-%s\n" % (iSeqModif.getOriginalHeader(), iRange.getStart(), iRange.getEnd(), iRange.getStart(), iRange.getEnd())) - else: - fH.write("#Deletions:\n") - fH.write("seqName\tstart\tend\n") - for iSeqModif in self._lSeqModif: - for iRange in iSeqModif.getDeletions(): - fH.write("%s\t%i\t%i\n" % (iSeqModif.getOriginalHeader(), iRange.getStart(), iRange.getEnd())) - - def write(self, mutationsFileName = "", insertionsFileName = "", deletionsFileName = "", outFormat = ""): - self.sort() - self.writeMutations(mutationsFileName, outFormat) - self.writeInsertions(insertionsFileName, outFormat) - self.writeDeletions(deletionsFileName, outFormat) - - def writeVCF(self, VCFFileName, fastaFileName, software = "MutateSequences"): - self.sort() - tmpVCFFileName = "%s.tmp" % VCFFileName - VCFFH = open(tmpVCFFileName, "w") - VCFFH.write("##fileformat=VCFv4.1\n") - VCFFH.write("##fileDate=%s\n" % time.strftime("%Y%m%d")) - VCFFH.write("##reference=%s\n" % os.path.abspath(fastaFileName)) - VCFFH.write("##INFO=<ID=SVLEN,Number=.,Type=Integer,Description=\"Difference in length between REF and ALT alleles\">\n") - VCFFH.write("##INFO=<ID=SVTYPE,Number=1,Type=String,Description=\"Type of structural variant\">\n") - VCFFH.write("##INFO=<ID=ALTSTART,Number=1,Type=Integer,Description=\"ALT start position on query sequence\">\n") - VCFFH.write("##INFO=<ID=SOFTWARE,Number=1,Type=String,Description=\"Software used to generate this VCF\">\n") - VCFFH.write("##INFO=<ID=INSERTED,Number=1,Type=String,Description=\"Inserted sequence name\">\n") - VCFFH.write("#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\n") - - iBSDB = BioseqDB(fastaFileName) - - for iSeqModif in self._lSeqModif: - for mutation in iSeqModif.getMutations(): - pos = mutation[0] - old = mutation[1] - new = mutation[2] - VCFFH.write("%s\t%s\t.\t%s\t%s\t.\t.\tAN=2;REF=%s;ALT=%s;SOFTWARE=%s\n" % (iSeqModif.getOriginalHeader(), pos, old, new, old, new, software)) - - for insRange in iSeqModif.getInsertions(): - if insRange.getStart() != 1: - refSeq = iBSDB.fetch(iSeqModif.getOriginalHeader()).getNtFromPosition(insRange.getStart() - 1) - altSeq = "." - - INFO = "SVTYPE=INS;AN=2;SVLEN=%d;SOFTWARE=%s" % (insRange.getEnd() - insRange.getStart() + 1, software) - if insRange.getSeqname() != ".": - INFO += ";INSERTED=%s" % insRange.getSeqname() - VCFLine = "%s\t%d\t.\t%s\t%s\t%s\t%s\t%s\n" % (iSeqModif.getOriginalHeader(), insRange.getStart() - 1, refSeq, altSeq, ".", ".", INFO) - - else: - refSeq = iBSDB.fetch(iSeqModif.getOriginalHeader()).getNtFromPosition(insRange.getStart()) - refSeq = "." - altSeq = "." - - INFO = "SVTYPE=INS;AN=2;SVLEN=%d;SOFTWARE=%s" % (insRange.getEnd() - insRange.getStart() + 1, software) - if insRange.getSeqname() != ".": - INFO += ";INSERTED=%s" % insRange.getSeqname() - VCFLine = "%s\t%d\t.\t%s\t%s\t%s\t%s\t%s\n" % (iSeqModif.getOriginalHeader(), insRange.getStart(), refSeq, altSeq, ".", ".", INFO) - - VCFFH.write(VCFLine) - - for delRange in iSeqModif.getDeletions(): - if delRange.getStart() != 1: - refSeq = iBSDB.fetch(iSeqModif.getOriginalHeader()).subseq(delRange.getStart() - 1, delRange.getEnd()).getSequence() - altSeq = refSeq[0] - - INFO = "SVTYPE=DEL;AN=2;SVLEN=-%d;SOFTWARE=%s" % (len(refSeq)-1, software) - VCFLine = "%s\t%d\t.\t%s\t%s\t%s\t%s\t%s\n" % (iSeqModif.getOriginalHeader(), delRange.getStart() - 1, refSeq, altSeq, ".", ".", INFO) - - else: - refSeq = iBSDB.fetch(iSeqModif.getOriginalHeader()).subseq(delRange.getStart(), delRange.getEnd() + 1).getSequence() - altSeq = refSeq[-1] - altSeq = "." - - INFO = "SVTYPE=DEL;AN=2;SVLEN=-%d;SOFTWARE=%s" % (len(refSeq)-1, software) - VCFLine = "%s\t%d\t.\t%s\t%s\t%s\t%s\t%s\n" % (iSeqModif.getOriginalHeader(), delRange.getStart(), refSeq, altSeq, ".", ".", INFO) - - VCFFH.write(VCFLine) - - #This command line can sort this VCF file properly. But can't manage to launch it properly through os.system or subprocess... -# cmd = "(head -n 9 %s && tail -n +10 %s | head -n -1 | sort -f -k1,1 -k2,2n) > %s" % (tmpVCFFileName, tmpVCFFileName, VCFFileName) - shutil.move(tmpVCFFileName, VCFFileName) - - def getCollectionBasedOnMutatedSequence(self): - transformedSeqModifCollec = SequenceModificationsCollection() - - for header in self.getHeadersList(): - currentSeqModif = self.get(header) - - lModifsTuples = [("insertion", iRange) for iRange in currentSeqModif.getInsertions()] - for iRange in currentSeqModif.getDeletions(): - lModifsTuples.append(("deletion", iRange)) - lModifsTuples.sort(key = lambda modifTuple: modifTuple[1].getStart(), reverse = False) - - sumIns = 0 - sumDel = 0 - - iseqModif = SequenceModifications(currentSeqModif.getMutatedHeader(), currentSeqModif.getOriginalHeader()) - for modifTuple in lModifsTuples: - varType = modifTuple[0] - varRange = modifTuple[1] - - if varType == "insertion": - iseqModif.addDeletion(varRange.getStart() + sumIns - sumDel, varRange.getEnd() + sumIns - sumDel) - sumIns += varRange.getLength() - - if varType == "deletion": - iseqModif.addInsertion(varRange.getStart() + sumIns - sumDel, varRange.getEnd() + sumIns - sumDel) - sumDel += varRange.getLength() - - for tSnp in currentSeqModif.getMutations(): - iseqModif.addMutation((tSnp[0], tSnp[2], tSnp[1])) - - iseqModif.sort() - transformedSeqModifCollec.add(iseqModif) - - transformedSeqModifCollec.sort() - - return transformedSeqModifCollec - - def loadSeqModifCollectionFromFiles(self, inInsertionsFileName, inDeletionsFileName, inSNPsFileName, SNPsrate = "0.020000"): - self.clear() - - with open(inInsertionsFileName, "r") as f: - line = f.readline() - while line: - if "seqName" not in line and "#" not in line: - splittedLine = line.split() - seqname = splittedLine[0] - start = int(splittedLine[1]) - end = int(splittedLine[2]) - insertedSeqName = splittedLine[3] - - if self.get(seqname) is None: - self.add(SequenceModifications(seqname)) - self.get(seqname).setMutatedHeader("%s_mutated_%s" % (seqname, SNPsrate)) - self.get(seqname).addInsertion(start, end, insertedSeqName) - line = f.readline() - - with open(inDeletionsFileName, "r") as f: - line = f.readline() - while line: - if "seqName" not in line and "#" not in line: - splittedLine = line.split() - seqname = splittedLine[0] - start = int(splittedLine[1]) - end = int(splittedLine[2]) - - if self.get(seqname) is None: - self.add(SequenceModifications(seqname)) - self.get(seqname).setMutatedHeader("%s_mutated_%s" % (seqname, SNPsrate)) - self.get(seqname).addDeletion(start, end) - line = f.readline() - - with open(inSNPsFileName, "r") as f: - line = f.readline() - while line: - if "seqName" not in line and "#" not in line: - splittedLine = line.split() - seqname = splittedLine[0] - position = int(splittedLine[1]) - oldNt = splittedLine[2] - newNt = splittedLine[3] - - if self.get(seqname) is None: - self.add(SequenceModifications(seqname)) - self.get(seqname).setMutatedHeader("%s_mutated_%s" % (seqname, SNPsrate)) - self.get(seqname).addMutation((position, oldNt, newNt)) - line = f.readline() - - for header in self.getHeadersList(): - self.get(header).sort() - self.sort() \ No newline at end of file
