view SMART/Java/Python/structure/Interval.py @ 11:2da30502c2f1

Updated CompareOverlappingSmallQuery.xml
author m-zytnicki
date Thu, 14 Mar 2013 05:37:08 -0400
parents 769e306b7933
children 94ab73e8a190
line wrap: on
line source

#
# Copyright INRA-URGI 2009-2010
# 
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
# 
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
# 
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
# 
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
#

from SMART.Java.Python.structure.Bins import *
from commons.core.coord.Range import Range

class Interval(Range):
    """
    Store a genomic interval
    @ivar name:          name of the interval [optional]
    @type name:          string
    @ivar id:            id of the interval [optional]
    @type id:            int
    @ivar bin:           bin in which the interval should be if stored in a database [computed]
    @type bin:           int 
    @ival tags:          information about the transcript [optional]
    @type tags:          dict
    @ivar verbosity:     verbosity
    @type verbosity:     int [default: 0]
    """

    def __init__(self, interval = None, verbosity = 0):
        """
        Constructor
        @param interval:    interval to be copied
        @type    interval:    class L{Interval<Interval>}
        @param verbosity: verbosity
        @type    verbosity: int
        """
        Range.__init__(self)
        self.name          = None
        self.id            = None
        self.bin           = None
        self.verbosity     = verbosity
        self.tags          = {}
        if interval != None:
            self.copy(interval)

    #!!!! Warning: two methods getStart() and getEnd() give the information maximum and minimum in interval.!!!!#
    #In case strand = "+", start < end; strand = "-", start > end        
    def getStart(self):
        if self.start == -1:
            return -1
        if self.end == -1:
            return self.start
        return self.getMin()

    
    def getEnd(self):
        if self.end == -1:
            return -1
        if self.start == -1:
            return self.end
        return self.getMax()


    def getChromosome(self):
        return self.getSeqname()


    def getDirection(self):
        return 1 if self.getStrand() == "+" else -1


    def getName(self):
        return self.name


    def isSet(self):
        """
        Check if the interval is set
        """
        return self.getStart() == None and self.getEnd() == None


    def copy(self, interval):
        """
        Copy method
        @param interval: interval to be copied
        @type    interval: class L{Interval<Interval>}
        """
        self.setStart(interval.getStart())
        self.setEnd(interval.getEnd())
        self.setChromosome(interval.getChromosome())
        self.setDirection(interval.getDirection())
        self.name      = interval.name     
        self.id        = interval.id
        self.bin       = interval.bin
        self.tags      = {}
        for tag in interval.tags:
            self.tags[tag] = interval.tags[tag]
        self.verbosity     = interval.verbosity


    def setName(self, name):
        """
        Set the name
        @param name: name of the interval
        @type    name: string
        """
        if len(name) > 100:
            name = name[:100]
        self.name = name


    def setChromosome(self, chromosome=""):
        """
        Set the chromosome
        @param chromosome: chromosome on which the interval is
        @type    chromosome: string
        """
        if not chromosome:
            self.seqname = None
        else:
            self.seqname = chromosome.replace(".", "_").replace("|", "_")


    def setStart(self, start):
        """
        Set the start point
        Possibly reset bin
        @param start: start point of the interval
        @type    start: int
        """
        self.bin  = None
        direction = self.getDirection()
        if self.start == -1:
            self.start = start
        elif self.end == -1:
            self.end = start
        else:
            if direction == 1:
                self.start = start
            else:
                self.end = start
        if direction == 1:
            self.start, self.end = min(self.start, self.end), max(self.start, self.end)
        else:
            self.start, self.end = max(self.start, self.end), min(self.start, self.end)


    def setEnd(self, end):
        """
        Set the end point
        Possibly reset bin
        @param end: end point of the interval of the interval
        @type    end: int
        """
        self.bin  = None
        direction = self.getDirection()
        if self.end == -1:
            self.end = end
        elif self.start == -1:
            self.start = end
        else:
            if direction == 1:
                self.end = end
            else:
                self.start = end
        if direction == 1:
            self.start, self.end = min(self.start, self.end), max(self.start, self.end)
        else:
            self.start, self.end = max(self.start, self.end), min(self.start, self.end)


    def setSize(self, size):
        """
        Possibly modify the end point
        @param size: size of the transcript
        @type    size: int
        """
        if self.end == None and self.start != None:
            self.setEnd(self.start + self.getSize() - 1)
        elif self.start == None and self.end != None:
            self.setStart(self.end - self.getSize() + 1)


    def getSize(self):
        """
        Get the size
        """
        return self.getEnd() - self.getStart() + 1


    def _setDirection(self, direction):
        """
        Set the direction of the interval (connection to Range)
        @param direction: direction of the transcript (+ / -)
        @type  direction: int (1 or -1)
        """
        if direction * self.getDirection() < 0:
            self.reverse()


    def setDirection(self, direction):
        """
        Set the direction of the interval
        Possibly parse different formats
        @param direction: direction of the transcript (+ / -)
        @type    direction: int or string
        """
        if type(direction).__name__ == 'int':
            self._setDirection(direction / abs(direction))
        elif type(direction).__name__ == 'str':
            if direction == "+":
                self._setDirection(1)
            elif direction == "-":
                self._setDirection(-1)
            elif direction == "1" or direction == "-1":
                self._setDirection(int(direction))
            elif direction.lower() == "plus":
                self._setDirection(1)
            elif direction.lower() == "minus":
                self._setDirection(-1)
            else:
                raise Exception("Cannot understand direction %s" % (direction))
        else:
            raise Exception("Cannot understand direction %s" % (direction))


    def extendStart(self, size):
        """
        Extend the interval by the 5' end
        @param size: the size to be exended
        @type    size: int
        """
        if self.getDirection() == 1:
            self.setStart(max(0, self.getStart() - size))
        else:
            self.setEnd(self.getEnd() + size)
        self.bin  = None
        
        
    def extendEnd(self, size):
        """
        Extend the interval by the 3' end
        @param size: the size to be exended
        @type    size: int
        """
        if self.getDirection() == 1:
            self.setEnd(self.getEnd() + size)
        else:
            self.setStart(max(0, self.getStart() - size))
        self.bin  = None
        
        
    def restrictStart(self, size = 1):
        """
        Restrict the interval by some nucleotides, start from its start position
        Remove the exons
        @param size: the size to be restricted to
        @type    size: int
        """
        if self.getDirection() == 1:
            self.setEnd(min(self.getEnd(), self.getStart() + size - 1))
        else:
            self.setStart(max(self.getStart(), self.getEnd() - size + 1))
        self.bin  = None
        
        
    def restrictEnd(self, size = 1):
        """
        Restrict the interval by some nucleotides, end from its end position
        Remove the exons
        @param size: the size to be restricted to
        @type    size: int
        """
        if self.getDirection() == 1:
            self.setStart(max(self.getStart(), self.getEnd() - size + 1))
        else:
            self.setEnd(min(self.getEnd(), self.getStart() + size - 1))
        self.bin  = None

            
    
    def setTagValue(self, name, value):
        """
        Set a tag
        @param name:    name of the tag
        @type    name:    string
        @param value: value of the tag
        @type    value: int or string
        """
        self.tags[name] = value
    
    
    def getTagNames(self):
        """
        Get all the names of the tags
        """
        return self.tags.keys()


    def getTagValue(self, tag):
        """
        Get the value of a tag
        @param tag: name of a tag
        @type    tag: string
        """
        if tag not in self.tags:
            return None
        return self.tags[tag]


    def getTagValues(self, tagSep = "; ", fieldSep = " ", surrounder = ""):
        """
        Get the formatted tag values
        @param tagSep:     separator between tags
        @type  tagSep:     string
        @param fieldSep:   separator between tag name and tag value
        @type  fieldSep:   string
        @param surrounder: string which optionally surround values
        @type  surrounder: string
        """
        tags = []
        for name in self.tags:
            value = self.tags[name]
            if value == None:
                continue
            if isinstance(value, basestring):
                tags.append("%s%s%s%s%s" % (name, fieldSep, surrounder, self.tags[name], surrounder))
            elif type(value) is int:
                tags.append("%s%s%s%i%s" % (name, fieldSep, surrounder, self.tags[name], surrounder))
            elif type(value) is float:
                tags.append("%s%s%s%f%s" % (name, fieldSep, surrounder, self.tags[name], surrounder))
            else:
                raise Exception("Do not know how to print '" + value + "'.")
        if self.getName() != None:
            tags.append("%s%s%s%s%s" % ("Name", fieldSep, surrounder, self.getName(), surrounder))
        return tagSep.join(tags)

    
    def setTagValues(self, tags, tagSep = "; ", fieldSep = " "):
        """
        Set the tag values using given string
        @param tags:     the tags, concatenated
        @type  tags:     string
        @param tagSep:   separator between tags
        @type  tagSep:   string
        @param fieldSep: separator between tag name and tag value
        @type  fieldSep: string
        """
        if tags == "":
            self.tags = {}
            return
        for splittedTag in tags.split(tagSep):
            if fieldSep not in splittedTag:
                raise Exception("Weird field '%s' in tags '%s'" % (splittedTag, tags))
            tag, value = splittedTag.split(fieldSep, 1)
            if tag == "Name":
                self.setName(value)
                continue
            try:
                intValue       = int(value)
                self.tags[tag] = intValue
            except ValueError:
                try:
                    floatValue     = float(value)
                    self.tags[tag] = floatValue
                except ValueError:
                    self.tags[tag] = value


    def deleteTag(self, tag):
        """
        Remove a tag
        @param tag: the tag to be removed
        @type    tag: string
        """
        if tag in self.tags:
            del self.tags[tag]

    
    def setNbOccurrences(self, nbOccurrences):
        """
        Set the number of occurrences of the interval
        @param nbOccurrences: number of occurrences of the interval
        @type    nbOccurrences: int
        """
        self.setTagValue("nbOccurrences", nbOccurrences)
    
        
    def setOccurrence(self, occurrence):
        """
        Set the occurrence of this interval
        @param occurrence: an occurrence for this transcript
        @type    occurrence: int
        """
        self.setTagValue("occurrence", occurrence)
    
    def __eq__(self, interval):
        """
        Whether two intervals are equal (start and end at same position)
        @param interval: object to be compared to
        @type    interval: class L{Interval<Interval>}
        """
        if not interval:
            return False
        return self.getChromosome() == interval.getChromosome() and self.getStart() == interval.getStart() and self.getEnd() == interval.getEnd() and self.getDirection() == interval.getDirection()


    def overlapWith(self, interval, nbNucleotides = 1):
        """
        Whether two intervals overlap
        @param interval:        object to be compared to
        @type    interval:        class L{Interval<Interval>}
        @param nbNucleotides: minimum number of nucleotides to declare and overlap
        @type    nbNucleotides: int
        """    
        if self.getChromosome() != interval.getChromosome():
            return False
        return (min(self.getEnd(), interval.getEnd()) - max(self.getStart(), interval.getStart()) + 1 >= nbNucleotides)

    def isIncludeIn(self, interval):
        return interval.include(self)


    def include(self, interval):
        """
        Whether this interval includes the other one
        @param interval:      object to be compared to
        @type  interval:      class L{Interval<Interval>}
        """    
        if self.getChromosome() != interval.getChromosome():
            return False
        return ((self.getStart() <= interval.getStart()) and (self.getEnd() >= interval.getEnd()))
    
    
    def getDifference(self, interval, sameStrand = False):
        """
        Get the difference between this cluster and another one
        @param interval:   object to be compared to
        @type  interval:   class L{Interval<Interval>}
        @param sameStrand: do the comparison iff the intervals are on the same strand
        @type  sameStrand: boolean
        @return:           a (possibly empty) list of intervals
        """    
        newInterval = Interval()
        newInterval.copy(self)
        if self.getChromosome() != interval.getChromosome():
            return [newInterval]
        if not self.overlapWith(interval):
            return [newInterval]
        if sameStrand and self.getDirection() != interval.getDirection():
            return [newInterval]
        intervals = []
        if self.getStart() < interval.getStart():
            newInterval = Interval()
            newInterval.copy(self)
            newInterval.setEnd(min(self.getEnd(), interval.getStart() - 1))
            intervals.append(newInterval)
        if self.getEnd() > interval.getEnd():
            newInterval = Interval()
            newInterval.copy(self)
            newInterval.setStart(max(self.getStart(), interval.getEnd() + 1))
            intervals.append(newInterval)
        return intervals
    
    
    def getIntersection(self, interval):
        """
        Get the intersection between this interval and another one
        @param interval: object to be compared to
        @type  interval: class L{Interval<Interval>}
        @return:         an other interval
        """    
        if not self.overlapWith(interval):
            return None
        newInterval = Interval()
        newInterval.setChromosome(self.getChromosome())
        newInterval.setDirection(self.getDirection())
        newInterval.setName("%s_intersect_%s" % (self.getName(), interval.getName()))
        newInterval.setStart(max(self.getStart(), interval.getStart()))
        newInterval.setEnd(min(self.getEnd(), interval.getEnd()))
        return newInterval
    
    
    def getDistance(self, interval):
        """
        Get the distance between two intervals (a non-negative value)
        @param interval: another interval
        @type    interval: class L{Interval<Interval>}
        """    
        if self.overlapWith(interval):
            return 0
        if self.getChromosome() != interval.getChromosome():
            raise Exception("Cannot get the distance between %s and %s" % (str(self), str(interval)))
        return min(abs(self.getStart() - interval.getEnd()), abs(self.getEnd() - interval.getStart()))


    def getRelativeDistance(self, interval):
        """
        Get the distance between two intervals (negative if first interval is before)
        @param interval: another interval
        @type    interval: class L{Interval<Interval>}
        """    
        if self.overlapWith(interval):
            return 0
        if self.getChromosome() != interval.getChromosome():
            raise Exception("Cannot get the distance between %s and %s" % (str(self), str(interval)))
        if self.getEnd() < interval.getStart():
            distance = interval.getStart() - self.getEnd()
        else:
            distance = interval.getEnd() - self.getStart()
        distance *= self.getDirection()
        return distance


    def merge(self, interval, normalization = False):
        """
        Merge two intervals
        @param interval:        another interval
        @type    interval:        class L{Interval<Interval>}
        @param normalization: whether the sum of the merge should be normalized wrt the number of mappings of each elements
        @type    normalization: boolean
        """
        if self.getChromosome() != interval.getChromosome():
            raise Exception("Cannot merge '%s' and '%s' for they are on different chromosomes." % (str(self), str(interval)))
        direction = None
        if self.getStart() == self.getEnd():
            direction = interval.getDirection()
        elif interval.getStart() == interval.getEnd():
            direction = self.getDirection()
        elif self.getDirection() != interval.getDirection():
            raise Exception("Cannot merge '%s' and '%s' for they are on different strands." % (str(self), str(interval)))
        self.setStart(min(self.getStart(), interval.getStart()))
        self.setEnd(max(self.getEnd(), interval.getEnd()))
        if direction != None:
            self.setDirection(direction)
        nbElements = 0.0
        for element in (self, interval):
            for tagName in ("nbElements", "nbOccurrences"):
                if tagName not in element.getTagNames():
                    element.setTagValue(tagName, 1)
            nbElements += float(element.getTagValue("nbElements")) / float(element.getTagValue("nbOccurrences")) if normalization else float(element.getTagValue("nbElements"))
        self.setTagValue("nbElements", nbElements)
        self.bin = None
        for tagName in ("identity", "nbOccurrences", "occurrence", "nbMismatches", "nbGaps", "rank", "evalue", "bestRegion"):
            if tagName in self.getTagNames():
                del self.tags[tagName]


    def getBin(self):
        """
        Get the bin of the interval
        Computed on the fly
        """
        if self.bin == None:
            self.bin = getBin(self.getStart(), self.getEnd())
        return self.bin


    def getBins(self):
        """
        Get all the bin this interval could fall into
        """
        return getOverlappingBins(self.getStart(), self.getEnd())


    def getSqlVariables(cls):
        """
        Get the properties of the object that should be saved in a database
        """
        variables = ["name", "chromosome", "start", "end", "direction", "tags", "bin"]
        return variables
    getSqlVariables = classmethod(getSqlVariables)


    def setSqlValues(self, array):
        """
        Set the values of the properties of this object as given by a results line of a SQL query
        """
        self.id         = array[0]
        self.name       = array[1].strip("'")
        self.setChromosome(array[2].strip("'"))
        self.setStart(array[3])
        self.setEnd(array[4])
        self.setDirection(array[5])
        self.setTagValues(array[6].strip("'"), ";", "=")
        self.bin        = array[7]


    def getSqlValues(self):
        """
        Get the values of the properties that should be saved in a database
        """
        values = dict()
        values["name"]       = self.name
        values["chromosome"] = self.getChromosome()
        values["start"]      = self.getStart()
        values["end"]        = self.getEnd()
        values["direction"]  = self.getDirection()
        values["tags"]       = self.getTagValues(";", "=")
        values["bin"]        = self.getBin()
        return values


    def getSqlTypes(cls):
        """
        Get the values of the properties that should be saved in a database
        """
        types = dict()
        types["name"]       = "varchar"
        types["chromosome"] = "varchar"
        types["start"]      = "int"
        types["end"]        = "int"
        types["direction"]  = "tinyint"
        types["tags"]       = "varchar"
        types["bin"]        = "int"
        return types
    getSqlTypes = classmethod(getSqlTypes)
    

    def getSqlSizes(cls):
        """
        Get the sizes of the properties that should be saved in a database
        """
        sizes = dict()
        sizes["name"]       = 255
        sizes["chromosome"] = 255
        sizes["start"]      = 11
        sizes["end"]        = 11
        sizes["direction"]  = 4
        sizes["tags"]       = 1023
        sizes["bin"]        = 11
        return sizes
    getSqlSizes = classmethod(getSqlSizes)
    

    def printCoordinates(self):
        """
        Print start and end positions (depending on the direction of the interval)
        """
        if self.getDirection() == 1:
            return "%d-%d" % (self.getStart(), self.getEnd())
        else:
            return "%d-%d" % (self.getEnd(), self.getStart())

    
    def extractSequence(self, parser):
        """
        Get the sequence corresponding to this interval
        @param parser: a parser to a FASTA file
        @type    parser: class L{SequenceListParser<SequenceListParser>}
        @return        : a instance of L{Sequence<Sequence>}
        """
        return parser.getSubSequence(self.getChromosome(), self.getStart(), self.getEnd(), self.getDirection(), self.name)
    
    
    def extractWigData(self, parser):
        """
        Get the data retrieved from a wig file
        @param parser: a parser class to a WIG file
        @type    parser: class L{WigParser<WigParser>}
        """
        data = parser.getRange(self.getChromosome(), self.getStart(), self.getEnd())
        if self.getDirection() == -1:
            if parser.strands:
                newData = {}
                for strand in data:
                    data[strand].reverse()
                    newData[-strand] = data[strand]
                data = newData
            else:
                data.reverse()
        return data


    def __str__(self):
        """
        Output a simple representation of this interval
        """
        direction = "+"
        if self.getDirection() == -1:
            direction = "-"
        string = "%s:%d-%d (%s)" % (self.getChromosome(), self.getStart(), self.getEnd(), direction)
        if self.name != "":
            string = "(%s) %s" % (self.name, string)
        return string