Mercurial > repos > yufei-luo > s_mart
view SMART/Java/Python/toolLauncher/RnaFoldLauncher.py @ 47:b6481845eb0d
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 30 Sep 2013 05:51:28 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
# # Copyright INRA-URGI 2009-2010 # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. # import os import sys import random import subprocess from SMART.Java.Python.structure.TranscriptList import TranscriptList from SMART.Java.Python.structure.Transcript import Transcript from SMART.Java.Python.misc.Progress import Progress from commons.core.parsing.FastaParser import FastaParser class RnaFoldStructure(object): """ A structure to store the output of RNAFold @ivar name: the name of the sequence @type name: string @ivar sequence: the sequence (with gaps) @type sequence: string @ivar structure: the bracket structure @type structure: string @ivar energy: the energy of the fold @type energy: float @ivar interactions: the interactions inside the structure @ivar interactions: the interactions inside the structure """ def __init__(self, name, sequence, structure, energy): """ Initialize the structure @param name the name of the sequence @type name: string @param sequence: the sequence (with gaps) @type sequence: string @param structure: the bracket structure @type structure: string @param energy: the energy of the fold @type energy: float """ self.name = name self.sequence = sequence self.structure = structure self.energy = energy self.interactions = None def analyze(self): """ Analyze the output, assign the interactions """ if len(self.sequence) != len(self.structure): sys.exit("Sizes of sequence and structure differ ('%s' and '%s')!\n" % (self.sequence, self.structure)) stack = [] self.interactions = [None for i in range(len(self.sequence))] for i in range(len(self.sequence)): if self.structure[i] == "(": stack.append(i) elif self.structure[i] == ")": if not stack: sys.exit("Something wrong in the interaction line '%s'!\n" % (self.structure)) otherI = stack.pop() self.interactions[i] = otherI self.interactions[otherI] = i if stack: sys.exit("Something wrong in the interaction line '%s'!\n" % (self.structure)) def getNbBulges(self, start = None, end = None): """ Get the number of insertions in a given range (in number of letters) @param start: where to start the count @type start: int @param end: where to end the co @type end: int """ if start == None: start = 0 if end == None: end = len(self.sequence) previousInt = None nbBulges = 0 inRegion = False for i in range(len(self.sequence)): if i == start: inRegion = True if i > end: return nbBulges if inRegion: if self.interactions[i] == None: nbBulges += 1 elif previousInt != None and abs(self.interactions[i] - previousInt) != 1: nbBulges += 1 previousInt = self.interactions[i] return nbBulges def getStar(self, start = None, end = None): """ Get the supposed miRNA* @param start: where to start the count @type start: int @param end: where to end the co @type end: int """ if start == None: start = 0 if end == None: end = len(self.sequence) minStar = 1000000 maxStar = 0 inRegion = False for i in range(len(self.sequence)): if i == start: inRegion = True if i > end: return (minStar, maxStar) if inRegion: if self.interactions[i] != None: minStar = min(minStar, self.interactions[i]) maxStar = max(maxStar, self.interactions[i]) return (minStar, maxStar) class RnaFoldLauncher(object): """ Start and parse a RNAFold instance @ivar inputTranscriptList: a list of transcripts @type inputTranscriptList: class L{TranscriptList<TranscriptList>} @ivar genomeFileParser: a parser to the genome file @type genomeFileParser: class L{SequenceListParser<SequenceListParser>} @ivar bothStrands: whether folding is done on both strands @type bothStrands: bool @ivar fivePrimeExtension: extension towards the 5' end @type fivePrimeExtension: int @ivar threePrimeExtension: extension towards the 3' end @type threePrimeExtension: int @ivar inputTranscriptList: the input list of transcripts @type inputTranscriptList: class L{TranscriptList<TranscriptList>} @ivar outputTranscriptList: the output list of transcripts @type outputTranscriptList: class L{TranscriptList<TranscriptList>} @ivar tmpInputFileName: the name of the temporary input file for RNAFold @type tmpInputFileName: string @ivar tmpOutputFileName: the name of the temporary output file for RNAFold @type tmpOutputFileName: string @ivar verbosity: verbosity @type verbosity: int [default: 0] """ def __init__(self, verbosity = 0): """ Constructor @param verbosity: verbosity @type verbosity: int """ self.verbosity = verbosity self.transcriptNames = [] randomNumber = random.randint(0, 100000) self.bothStrands = True self.tmpInputFileName = "tmpInput_%d.fas" % (randomNumber) self.tmpOutputFileName = "tmpOutput_%d.fas" % (randomNumber) self.outputTranscriptList = None self.fivePrimeExtension = 0 self.threePrimeExtension = 0 def __del__(self): for file in (self.tmpInputFileName, self.tmpOutputFileName): if os.path.isfile(file): os.remove(file) def setTranscriptList(self, inputTranscriptList): """ Set the list of transcripts @ivar inputTranscriptList: a list of transcripts @type inputTranscriptList: class L{TranscriptList<TranscriptList>} """ self.inputTranscriptList = inputTranscriptList def setExtensions(self, fivePrime, threePrime): """ Set extension sizes @ivar fivePrime: extension towards the 5' end @type fivePrime: int @ivar threePrime: extension towards the 3' end @type threePrime: int """ self.fivePrimeExtension = fivePrime self.threePrimeExtension = threePrime def setNbStrands(self, nbStrands): """ Set number of strands @ivar nbStrands: if 2, the work is done on both strands @type nbStrands: int """ self.nbStrands = nbStrands def setGenomeFile(self, fileName): """ Set the genome file @ivar genomeFileName: the multi-FASTA file containing the genome @type genomeFileName: a string """ self.genomeFileParser = FastaParser(fileName, self.verbosity) def writeInputFile(self, transcript, reverse, fivePrimeExtension, threePrimeExtension): """ Write the RNAFold input file, containing the sequence corresponding to the transcript @ivar transcript: a transcript @type transcript: class L{Transcript<Transcript>} @ivar reverse: invert the extensions @type reverse: bool """ transcriptCopy = Transcript(transcript) transcriptCopy.removeExons() if not reverse: transcriptCopy.extendStart(fivePrimeExtension) transcriptCopy.extendEnd(threePrimeExtension) else: transcriptCopy.extendStart(threePrimeExtension) transcriptCopy.extendEnd(fivePrimeExtension) sequence = transcriptCopy.extractSequence(self.genomeFileParser) handle = open(self.tmpInputFileName, "w") handle.write(">%s\n%s\n" % (sequence.getName().replace(":", "_").replace(".", "_"), sequence.getSequence())) handle.close() def startRnaFold(self): """ Start RNAFold """ command = "RNAfold < %s > %s" % (self.tmpInputFileName, self.tmpOutputFileName) if self.verbosity > 100: print "Launching command '%s'" % (command) status = subprocess.call(command, shell=True) if status != 0: raise Exception("Problem with RNAFold! Input file is %s, status is: %s" % (self.tmpInputFileName, status)) def parseRnaFoldOutput(self): """ Parse an output file of RNAFold @return: an RnaFoldStructure """ lines = open(self.tmpOutputFileName).readlines() if len(lines) != 3: raise Exception("Problem in RNAfold output! '%s'" % (lines)) name = lines[0].strip()[1:].split()[0] sequence = lines[1].strip() structure = lines[2].strip().split()[0] energy = float(lines[2].strip().split(" ", 1)[1][1:-1].strip()) if self.verbosity > 100: print "Getting sequence %s, structure %s" % (sequence, structure) return RnaFoldStructure(name, sequence, structure, energy) def analyzeRnaFoldOutput(self, transcript, rnaFoldOutput, reverse, fivePrimeExtension, threePrimeExtension): """ Analyze the RNAFold @ivar transcript: a transcript @type transcript: class L{Transcript<Transcript>} @ivar rnaFoldOutput: the output of RNAFold @type rnaFoldOutput: class L{RnaFoldStructure<RnaFoldStructure>} @ivar reverse: invert the extensions @type reverse: bool @return: a t-uple of energy, number of insertions, number of bulges, strand """ rnaFoldOutput.analyze() transcriptSize = transcript.end - transcript.start + 1 start = fivePrimeExtension if not reverse else threePrimeExtension end = start + transcriptSize energy = rnaFoldOutput.energy nbBulges = rnaFoldOutput.getNbBulges(start, end) (minStar, maxStar) = rnaFoldOutput.getStar(start, end) minStar += transcript.start - start maxStar += transcript.start - start if self.verbosity > 100: print "Getting structure with energy %d, nbBulges %d, miRna* %d-%d, strand %s" % (energy, nbBulges, minStar, maxStar, "-" if reverse else "+") return (energy, nbBulges, minStar, maxStar, reverse) def fold(self, transcript): """ Fold a transcript (in each strand) @ivar transcript: a transcript @type transcript: class L{Transcript<Transcript>} @return: a t-uple of energy, number of insertions, number of bulges, strand """ results = [None] * self.nbStrands strands = [False, True] if self.nbStrands == 2 else [False] minNbBulges = 1000000 for i, reverse in enumerate(strands): self.writeInputFile(transcript, reverse, self.fivePrimeExtension, self.threePrimeExtension) self.startRnaFold() output = self.parseRnaFoldOutput() results[i] = self.analyzeRnaFoldOutput(transcript, output, reverse, self.fivePrimeExtension, self.threePrimeExtension) minNbBulges = min(minNbBulges, results[i][1]) for result in results: if result[1] == minNbBulges: return result return None def refold(self, transcript): """ Fold a transcript, knowing where the miRNA starts and end @ivar transcript: a transcript @type transcript: class L{Transcript<Transcript>} @return: the energy """ miStar = transcript.getTagValue("miRnaStar") startMiStar = int(miStar.split("-")[0]) endMiStart = int(miStar.split("-")[1]) fivePrimeExtension = max(0, transcript.start - startMiStar) + 5 threePrimeExtension = max(0, endMiStart - transcript.end) + 5 self.writeInputFile(transcript, False, fivePrimeExtension, threePrimeExtension) self.startRnaFold() output = self.parseRnaFoldOutput() result = self.analyzeRnaFoldOutput(transcript, output, False, fivePrimeExtension, threePrimeExtension) return result[0] def computeResults(self): """ Fold all and fill an output transcript list with the values """ progress = Progress(self.inputTranscriptList.getNbTranscripts(), "Handling transcripts", self.verbosity) self.outputTranscriptList = TranscriptList() for transcript in self.inputTranscriptList.getIterator(): result = self.fold(transcript) transcript.setTagValue("nbBulges", result[1]) transcript.setTagValue("miRnaStar", "%d-%d" % (result[2], result[3])) transcript.setTagValue("miRNAstrand", result[4]) transcript.setTagValue("energy", self.refold(transcript)) self.outputTranscriptList.addTranscript(transcript) progress.inc() progress.done() def getResults(self): """ Get an output transcript list with the values """ if self.outputTranscriptList == None: self.computeResults() return self.outputTranscriptList