view SMART/Java/Python/toolLauncher/RnaFoldLauncher.py @ 38:2c0c0a89fad7

Uploaded
author m-zytnicki
date Thu, 02 May 2013 09:56:47 -0400
parents 769e306b7933
children
line wrap: on
line source

#
# Copyright INRA-URGI 2009-2010
# 
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
# 
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
# 
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
# 
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
#
import os
import sys
import random
import subprocess
from SMART.Java.Python.structure.TranscriptList import TranscriptList
from SMART.Java.Python.structure.Transcript import Transcript
from SMART.Java.Python.misc.Progress import Progress
from commons.core.parsing.FastaParser import FastaParser


class RnaFoldStructure(object):
    """
    A structure to store the output of RNAFold
    @ivar name:         the name of the sequence
    @type name:         string
    @ivar sequence:     the sequence (with gaps)
    @type sequence:     string
    @ivar structure:    the bracket structure
    @type structure:    string
    @ivar energy:       the energy of the fold
    @type energy:       float
    @ivar interactions: the interactions inside the structure
    @ivar interactions: the interactions inside the structure
    """

    def __init__(self, name, sequence, structure, energy):
        """
        Initialize the structure
        @param name       the name of the sequence
        @type  name:      string
        @param sequence:  the sequence (with gaps)
        @type  sequence:  string
        @param structure: the bracket structure
        @type  structure: string
        @param energy:    the energy of the fold
        @type  energy:    float
        """
        self.name         = name        
        self.sequence     = sequence
        self.structure    = structure
        self.energy       = energy
        self.interactions = None

    
    def analyze(self):
        """
        Analyze the output, assign the interactions
        """
        if len(self.sequence) != len(self.structure):
            sys.exit("Sizes of sequence and structure differ ('%s' and '%s')!\n" % (self.sequence, self.structure))
        stack                         = []
        self.interactions = [None for i in range(len(self.sequence))]
        for i in range(len(self.sequence)):
            if self.structure[i] == "(":
                stack.append(i)
            elif self.structure[i] == ")":
                if not stack:
                    sys.exit("Something wrong in the interaction line '%s'!\n" % (self.structure))
                otherI = stack.pop()
                self.interactions[i]      = otherI
                self.interactions[otherI] = i
        if stack:
            sys.exit("Something wrong in the interaction line '%s'!\n" % (self.structure))


    def getNbBulges(self, start = None, end = None):
        """
        Get the number of insertions in a given range (in number of letters)
        @param start: where to start the count
        @type    start: int
        @param end:     where to end the co
        @type    end:     int
        """
        if start == None:
            start = 0
        if end == None:
            end = len(self.sequence)
        previousInt = None
        nbBulges    = 0
        inRegion    = False
        for i in range(len(self.sequence)):
            if i == start:
                inRegion = True
            if i > end:
                return nbBulges
            if inRegion:
                if self.interactions[i] == None:
                    nbBulges += 1
                elif previousInt != None and abs(self.interactions[i] - previousInt) != 1:
                    nbBulges += 1
            previousInt = self.interactions[i]
        return nbBulges


    def getStar(self, start = None, end = None):
        """
        Get the supposed miRNA*
        @param start: where to start the count
        @type    start: int
        @param end:     where to end the co
        @type    end:     int
        """
        if start == None:
            start = 0
        if end == None:
            end = len(self.sequence)
        minStar  = 1000000
        maxStar  = 0
        inRegion = False
        for i in range(len(self.sequence)):
            if i == start:
                inRegion = True
            if i > end:
                return (minStar, maxStar)
            if inRegion:
                if self.interactions[i] != None:
                    minStar = min(minStar, self.interactions[i])
                    maxStar = max(maxStar, self.interactions[i])
        return (minStar, maxStar)



class RnaFoldLauncher(object):
    """
    Start and parse a RNAFold instance
    @ivar inputTranscriptList:  a list of transcripts
    @type inputTranscriptList:  class L{TranscriptList<TranscriptList>}
    @ivar genomeFileParser:     a parser to the genome file
    @type genomeFileParser:     class L{SequenceListParser<SequenceListParser>}
    @ivar bothStrands:          whether folding is done on both strands
    @type bothStrands:          bool
    @ivar fivePrimeExtension:   extension towards the 5' end
    @type fivePrimeExtension:   int
    @ivar threePrimeExtension:  extension towards the 3' end
    @type threePrimeExtension:  int
    @ivar inputTranscriptList:  the input list of transcripts
    @type inputTranscriptList:  class L{TranscriptList<TranscriptList>}
    @ivar outputTranscriptList: the output list of transcripts
    @type outputTranscriptList: class L{TranscriptList<TranscriptList>}
    @ivar tmpInputFileName:     the name of the temporary input file for RNAFold
    @type tmpInputFileName:     string
    @ivar tmpOutputFileName:    the name of the temporary output file for RNAFold
    @type tmpOutputFileName:    string
    @ivar verbosity:            verbosity
    @type verbosity:            int [default: 0]
    """

    def __init__(self, verbosity = 0):
        """
        Constructor
        @param verbosity: verbosity
        @type  verbosity: int
        """
        self.verbosity            = verbosity
        self.transcriptNames      = []
        randomNumber              = random.randint(0, 100000)
        self.bothStrands          = True
        self.tmpInputFileName     = "tmpInput_%d.fas" % (randomNumber)
        self.tmpOutputFileName    = "tmpOutput_%d.fas" % (randomNumber)
        self.outputTranscriptList = None
        self.fivePrimeExtension   = 0
        self.threePrimeExtension  = 0


    def __del__(self):
        for file in (self.tmpInputFileName, self.tmpOutputFileName):
            if os.path.isfile(file):
                os.remove(file)


    def setTranscriptList(self, inputTranscriptList):
        """
        Set the list of transcripts
        @ivar inputTranscriptList: a list of transcripts
        @type inputTranscriptList: class L{TranscriptList<TranscriptList>}
        """
        self.inputTranscriptList = inputTranscriptList


    def setExtensions(self, fivePrime, threePrime):
        """
        Set extension sizes
        @ivar fivePrime:  extension towards the 5' end
        @type fivePrime:  int
        @ivar threePrime: extension towards the 3' end
        @type threePrime: int
        """
        self.fivePrimeExtension  = fivePrime
        self.threePrimeExtension = threePrime


    def setNbStrands(self, nbStrands):
        """
        Set number of strands
        @ivar nbStrands: if 2, the work is done on both strands
        @type nbStrands: int
        """
        self.nbStrands = nbStrands


    def setGenomeFile(self, fileName):
        """
        Set the genome file
        @ivar genomeFileName: the multi-FASTA file containing the genome
        @type genomeFileName: a string
        """
        self.genomeFileParser = FastaParser(fileName, self.verbosity)


    def writeInputFile(self, transcript, reverse, fivePrimeExtension, threePrimeExtension):
        """
        Write the RNAFold input file, containing the sequence corresponding to the transcript
        @ivar transcript: a transcript
        @type transcript: class L{Transcript<Transcript>}
        @ivar reverse:    invert the extensions
        @type reverse:    bool
        """
        transcriptCopy = Transcript(transcript)
        transcriptCopy.removeExons()
        if not reverse:
            transcriptCopy.extendStart(fivePrimeExtension)
            transcriptCopy.extendEnd(threePrimeExtension)
        else:
            transcriptCopy.extendStart(threePrimeExtension)
            transcriptCopy.extendEnd(fivePrimeExtension)
        sequence = transcriptCopy.extractSequence(self.genomeFileParser)
        handle   = open(self.tmpInputFileName, "w")
        handle.write(">%s\n%s\n" % (sequence.getName().replace(":", "_").replace(".", "_"), sequence.getSequence()))
        handle.close()


    def startRnaFold(self):
        """
        Start RNAFold
        """
        command = "RNAfold < %s > %s" % (self.tmpInputFileName, self.tmpOutputFileName)
        if self.verbosity > 100:
            print "Launching command '%s'" % (command)
        status = subprocess.call(command, shell=True)
        if status != 0:
            raise Exception("Problem with RNAFold! Input file is %s, status is: %s" % (self.tmpInputFileName, status))

    
    def parseRnaFoldOutput(self):
        """
        Parse an output file of RNAFold
        @return: an RnaFoldStructure
        """
        lines = open(self.tmpOutputFileName).readlines()
        if len(lines) != 3:
            raise Exception("Problem in RNAfold output! '%s'" % (lines))
        name      = lines[0].strip()[1:].split()[0]
        sequence  = lines[1].strip()
        structure = lines[2].strip().split()[0]
        energy    = float(lines[2].strip().split(" ", 1)[1][1:-1].strip())
        if self.verbosity > 100:
            print "Getting sequence %s, structure %s" % (sequence, structure)
        return RnaFoldStructure(name, sequence, structure, energy)


    def analyzeRnaFoldOutput(self, transcript, rnaFoldOutput, reverse, fivePrimeExtension, threePrimeExtension):
        """
        Analyze the RNAFold
        @ivar transcript:    a transcript
        @type transcript:    class L{Transcript<Transcript>}
        @ivar rnaFoldOutput: the output of RNAFold
        @type rnaFoldOutput: class L{RnaFoldStructure<RnaFoldStructure>}
        @ivar reverse:       invert the extensions
        @type reverse:       bool
        @return:             a t-uple of energy, number of insertions, number of bulges, strand
        """
        rnaFoldOutput.analyze()
        transcriptSize     = transcript.end - transcript.start + 1
        start              = fivePrimeExtension if not reverse else threePrimeExtension
        end                = start + transcriptSize
        energy             = rnaFoldOutput.energy
        nbBulges           = rnaFoldOutput.getNbBulges(start, end)
        (minStar, maxStar) = rnaFoldOutput.getStar(start, end)
        minStar           += transcript.start - start
        maxStar           += transcript.start - start
        if self.verbosity > 100:
            print "Getting structure with energy %d, nbBulges %d, miRna* %d-%d, strand %s" % (energy, nbBulges, minStar, maxStar, "-" if reverse else "+")
        return (energy, nbBulges, minStar, maxStar, reverse)

    
    def fold(self, transcript):
        """
        Fold a transcript (in each strand)
        @ivar transcript: a transcript
        @type transcript: class L{Transcript<Transcript>}
        @return:          a t-uple of energy, number of insertions, number of bulges, strand
        """
        results     = [None] * self.nbStrands
        strands     = [False, True] if self.nbStrands == 2 else [False]
        minNbBulges = 1000000
        for i, reverse in enumerate(strands):
            self.writeInputFile(transcript, reverse, self.fivePrimeExtension, self.threePrimeExtension)
            self.startRnaFold()
            output = self.parseRnaFoldOutput()
            results[i]  = self.analyzeRnaFoldOutput(transcript, output, reverse, self.fivePrimeExtension, self.threePrimeExtension)
            minNbBulges = min(minNbBulges, results[i][1])
        for result in results:
            if result[1] == minNbBulges:
                return result
        return None


    def refold(self, transcript):
        """
        Fold a transcript, knowing where the miRNA starts and end
        @ivar transcript: a transcript
        @type transcript: class L{Transcript<Transcript>}
        @return:          the energy
        """
        miStar              = transcript.getTagValue("miRnaStar")
        startMiStar         = int(miStar.split("-")[0])
        endMiStart          = int(miStar.split("-")[1])
        fivePrimeExtension  = max(0, transcript.start - startMiStar) + 5
        threePrimeExtension = max(0, endMiStart - transcript.end) + 5
        self.writeInputFile(transcript, False, fivePrimeExtension, threePrimeExtension)
        self.startRnaFold()
        output = self.parseRnaFoldOutput()
        result = self.analyzeRnaFoldOutput(transcript, output, False, fivePrimeExtension, threePrimeExtension)
        return result[0]


    def computeResults(self):
        """
        Fold all and fill an output transcript list with the values
        """
        progress = Progress(self.inputTranscriptList.getNbTranscripts(), "Handling transcripts", self.verbosity)
        self.outputTranscriptList = TranscriptList()
        for transcript in self.inputTranscriptList.getIterator():
            result = self.fold(transcript)
            transcript.setTagValue("nbBulges", result[1])
            transcript.setTagValue("miRnaStar", "%d-%d" % (result[2], result[3]))
            transcript.setTagValue("miRNAstrand", result[4])
            transcript.setTagValue("energy", self.refold(transcript))
            self.outputTranscriptList.addTranscript(transcript)
            progress.inc()
        progress.done()


    def getResults(self):
        """
        Get an output transcript list with the values
        """
        if self.outputTranscriptList == None:
            self.computeResults()
        return self.outputTranscriptList