diff smart_toolShed/SMART/Java/Python/structure/Interval.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/smart_toolShed/SMART/Java/Python/structure/Interval.py	Thu Jan 17 10:52:14 2013 -0500
@@ -0,0 +1,707 @@
+#
+# Copyright INRA-URGI 2009-2010
+# 
+# This software is governed by the CeCILL license under French law and
+# abiding by the rules of distribution of free software. You can use,
+# modify and/ or redistribute the software under the terms of the CeCILL
+# license as circulated by CEA, CNRS and INRIA at the following URL
+# "http://www.cecill.info".
+# 
+# As a counterpart to the access to the source code and rights to copy,
+# modify and redistribute granted by the license, users are provided only
+# with a limited warranty and the software's author, the holder of the
+# economic rights, and the successive licensors have only limited
+# liability.
+# 
+# In this respect, the user's attention is drawn to the risks associated
+# with loading, using, modifying and/or developing or reproducing the
+# software by the user in light of its specific status of free software,
+# that may mean that it is complicated to manipulate, and that also
+# therefore means that it is reserved for developers and experienced
+# professionals having in-depth computer knowledge. Users are therefore
+# encouraged to load and test the software's suitability as regards their
+# requirements in conditions enabling the security of their systems and/or
+# data to be ensured and, more generally, to use and operate it in the
+# same conditions as regards security.
+# 
+# The fact that you are presently reading this means that you have had
+# knowledge of the CeCILL license and that you accept its terms.
+#
+
+from SMART.Java.Python.structure.Bins import *
+from commons.core.coord.Range import Range
+
+class Interval(Range):
+    """
+    Store a genomic interval
+    @ivar name:          name of the interval [optional]
+    @type name:          string
+    @ivar id:            id of the interval [optional]
+    @type id:            int
+    @ivar bin:           bin in which the interval should be if stored in a database [computed]
+    @type bin:           int 
+    @ival tags:          information about the transcript [optional]
+    @type tags:          dict
+    @ivar verbosity:     verbosity
+    @type verbosity:     int [default: 0]
+    """
+
+    def __init__(self, interval = None, verbosity = 0):
+        """
+        Constructor
+        @param interval:    interval to be copied
+        @type    interval:    class L{Interval<Interval>}
+        @param verbosity: verbosity
+        @type    verbosity: int
+        """
+        Range.__init__(self)
+        self.name          = None
+        self.id            = None
+        self.bin           = None
+        self.verbosity     = verbosity
+        self.tags          = {}
+        if interval != None:
+            self.copy(interval)
+
+    #!!!! Warning: two methods getStart() and getEnd() give the information maximum and minimum in interval.!!!!#
+    #In case strand = "+", start < end; strand = "-", start > end        
+    def getStart(self):
+        if self.start == -1:
+            return -1
+        if self.end == -1:
+            return self.start
+        return self.getMin()
+
+    
+    def getEnd(self):
+        if self.end == -1:
+            return -1
+        if self.start == -1:
+            return self.end
+        return self.getMax()
+
+
+    def getChromosome(self):
+        return self.getSeqname()
+
+
+    def getDirection(self):
+        return 1 if self.getStrand() == "+" else -1
+
+
+    def getName(self):
+        return self.name
+
+
+    def isSet(self):
+        """
+        Check if the interval is set
+        """
+        return self.getStart() == None and self.getEnd() == None
+
+
+    def copy(self, interval):
+        """
+        Copy method
+        @param interval: interval to be copied
+        @type    interval: class L{Interval<Interval>}
+        """
+        self.setStart(interval.getStart())
+        self.setEnd(interval.getEnd())
+        self.setChromosome(interval.getChromosome())
+        self.setDirection(interval.getDirection())
+        self.name      = interval.name     
+        self.id        = interval.id
+        self.bin       = interval.bin
+        self.tags      = {}
+        for tag in interval.tags:
+            self.tags[tag] = interval.tags[tag]
+        self.verbosity     = interval.verbosity
+
+
+    def setName(self, name):
+        """
+        Set the name
+        @param name: name of the interval
+        @type    name: string
+        """
+        if len(name) > 100:
+            name = name[:100]
+        self.name = name
+
+
+    def setChromosome(self, chromosome=""):
+        """
+        Set the chromosome
+        @param chromosome: chromosome on which the interval is
+        @type    chromosome: string
+        """
+        if not chromosome:
+            self.seqname = None
+        else:
+            self.seqname = chromosome.replace(".", "_").replace("|", "_")
+
+
+    def setStart(self, start):
+        """
+        Set the start point
+        Possibly reset bin
+        @param start: start point of the interval
+        @type    start: int
+        """
+        self.bin  = None
+        direction = self.getDirection()
+        if self.start == -1:
+            self.start = start
+        elif self.end == -1:
+            self.end = start
+        else:
+            if direction == 1:
+                self.start = start
+            else:
+                self.end = start
+        if direction == 1:
+            self.start, self.end = min(self.start, self.end), max(self.start, self.end)
+        else:
+            self.start, self.end = max(self.start, self.end), min(self.start, self.end)
+
+
+    def setEnd(self, end):
+        """
+        Set the end point
+        Possibly reset bin
+        @param end: end point of the interval of the interval
+        @type    end: int
+        """
+        self.bin  = None
+        direction = self.getDirection()
+        if self.end == -1:
+            self.end = end
+        elif self.start == -1:
+            self.start = end
+        else:
+            if direction == 1:
+                self.end = end
+            else:
+                self.start = end
+        if direction == 1:
+            self.start, self.end = min(self.start, self.end), max(self.start, self.end)
+        else:
+            self.start, self.end = max(self.start, self.end), min(self.start, self.end)
+
+
+    def setSize(self, size):
+        """
+        Possibly modify the end point
+        @param size: size of the transcript
+        @type    size: int
+        """
+        if self.end == None and self.start != None:
+            self.setEnd(self.start + self.getSize() - 1)
+        elif self.start == None and self.end != None:
+            self.setStart(self.end - self.getSize() + 1)
+
+
+    def getSize(self):
+        """
+        Get the size
+        """
+        return self.getEnd() - self.getStart() + 1
+
+
+    def _setDirection(self, direction):
+        """
+        Set the direction of the interval (connection to Range)
+        @param direction: direction of the transcript (+ / -)
+        @type  direction: int (1 or -1)
+        """
+        if direction * self.getDirection() < 0:
+            self.reverse()
+
+
+    def setDirection(self, direction):
+        """
+        Set the direction of the interval
+        Possibly parse different formats
+        @param direction: direction of the transcript (+ / -)
+        @type    direction: int or string
+        """
+        if type(direction).__name__ == 'int':
+            self._setDirection(direction / abs(direction))
+        elif type(direction).__name__ == 'str':
+            if direction == "+":
+                self._setDirection(1)
+            elif direction == "-":
+                self._setDirection(-1)
+            elif direction == "1" or direction == "-1":
+                self._setDirection(int(direction))
+            elif direction.lower() == "plus":
+                self._setDirection(1)
+            elif direction.lower() == "minus":
+                self._setDirection(-1)
+            else:
+                raise Exception("Cannot understand direction %s" % (direction))
+        else:
+            raise Exception("Cannot understand direction %s" % (direction))
+
+
+    def extendStart(self, size):
+        """
+        Extend the interval by the 5' end
+        @param size: the size to be exended
+        @type    size: int
+        """
+        if self.getDirection() == 1:
+            self.setStart(max(0, self.getStart() - size))
+        else:
+            self.setEnd(self.getEnd() + size)
+        self.bin  = None
+        
+        
+    def extendEnd(self, size):
+        """
+        Extend the interval by the 3' end
+        @param size: the size to be exended
+        @type    size: int
+        """
+        if self.getDirection() == 1:
+            self.setEnd(self.getEnd() + size)
+        else:
+            self.setStart(max(0, self.getStart() - size))
+        self.bin  = None
+        
+        
+    def restrictStart(self, size = 1):
+        """
+        Restrict the interval by some nucleotides, start from its start position
+        Remove the exons
+        @param size: the size to be restricted to
+        @type    size: int
+        """
+        if self.getDirection() == 1:
+            self.setEnd(min(self.getEnd(), self.getStart() + size - 1))
+        else:
+            self.setStart(max(self.getStart(), self.getEnd() - size + 1))
+        self.bin  = None
+        
+        
+    def restrictEnd(self, size = 1):
+        """
+        Restrict the interval by some nucleotides, end from its end position
+        Remove the exons
+        @param size: the size to be restricted to
+        @type    size: int
+        """
+        if self.getDirection() == 1:
+            self.setStart(max(self.getStart(), self.getEnd() - size + 1))
+        else:
+            self.setEnd(min(self.getEnd(), self.getStart() + size - 1))
+        self.bin  = None
+
+            
+    
+    def setTagValue(self, name, value):
+        """
+        Set a tag
+        @param name:    name of the tag
+        @type    name:    string
+        @param value: value of the tag
+        @type    value: int or string
+        """
+        self.tags[name] = value
+    
+    
+    def getTagNames(self):
+        """
+        Get all the names of the tags
+        """
+        return self.tags.keys()
+
+
+    def getTagValue(self, tag):
+        """
+        Get the value of a tag
+        @param tag: name of a tag
+        @type    tag: string
+        """
+        if tag not in self.tags:
+            return None
+        return self.tags[tag]
+
+
+    def getTagValues(self, tagSep = "; ", fieldSep = " ", surrounder = ""):
+        """
+        Get the formatted tag values
+        @param tagSep:     separator between tags
+        @type  tagSep:     string
+        @param fieldSep:   separator between tag name and tag value
+        @type  fieldSep:   string
+        @param surrounder: string which optionally surround values
+        @type  surrounder: string
+        """
+        tags = []
+        for name in self.tags:
+            value = self.tags[name]
+            if value == None:
+                continue
+            if isinstance(value, basestring):
+                tags.append("%s%s%s%s%s" % (name, fieldSep, surrounder, self.tags[name], surrounder))
+            elif type(value) is int:
+                tags.append("%s%s%s%i%s" % (name, fieldSep, surrounder, self.tags[name], surrounder))
+            elif type(value) is float:
+                tags.append("%s%s%s%f%s" % (name, fieldSep, surrounder, self.tags[name], surrounder))
+            else:
+                raise Exception("Do not know how to print '" + value + "'.")
+        if self.getName() != None:
+            tags.append("%s%s%s%s%s" % ("Name", fieldSep, surrounder, self.getName(), surrounder))
+        return tagSep.join(tags)
+
+    
+    def setTagValues(self, tags, tagSep = "; ", fieldSep = " "):
+        """
+        Set the tag values using given string
+        @param tags:     the tags, concatenated
+        @type  tags:     string
+        @param tagSep:   separator between tags
+        @type  tagSep:   string
+        @param fieldSep: separator between tag name and tag value
+        @type  fieldSep: string
+        """
+        if tags == "":
+            self.tags = {}
+            return
+        for splittedTag in tags.split(tagSep):
+            if fieldSep not in splittedTag:
+                raise Exception("Weird field '%s' in tags '%s'" % (splittedTag, tags))
+            tag, value = splittedTag.split(fieldSep, 1)
+            if tag == "Name":
+                self.setName(value)
+                continue
+            try:
+                intValue       = int(value)
+                self.tags[tag] = intValue
+            except ValueError:
+                try:
+                    floatValue     = float(value)
+                    self.tags[tag] = floatValue
+                except ValueError:
+                    self.tags[tag] = value
+
+
+    def deleteTag(self, tag):
+        """
+        Remove a tag
+        @param tag: the tag to be removed
+        @type    tag: string
+        """
+        if tag in self.tags:
+            del self.tags[tag]
+
+    
+    def setNbOccurrences(self, nbOccurrences):
+        """
+        Set the number of occurrences of the interval
+        @param nbOccurrences: number of occurrences of the interval
+        @type    nbOccurrences: int
+        """
+        self.setTagValue("nbOccurrences", nbOccurrences)
+    
+        
+    def setOccurrence(self, occurrence):
+        """
+        Set the occurrence of this interval
+        @param occurrence: an occurrence for this transcript
+        @type    occurrence: int
+        """
+        self.setTagValue("occurrence", occurrence)
+    
+    def __eq__(self, interval):
+        """
+        Whether two intervals are equal (start and end at same position)
+        @param interval: object to be compared to
+        @type    interval: class L{Interval<Interval>}
+        """
+        if not interval:
+            return False
+        return self.getChromosome() == interval.getChromosome() and self.getStart() == interval.getStart() and self.getEnd() == interval.getEnd() and self.getDirection() == interval.getDirection()
+
+
+    def overlapWith(self, interval, nbNucleotides = 1):
+        """
+        Whether two intervals overlap
+        @param interval:        object to be compared to
+        @type    interval:        class L{Interval<Interval>}
+        @param nbNucleotides: minimum number of nucleotides to declare and overlap
+        @type    nbNucleotides: int
+        """    
+        if self.getChromosome() != interval.getChromosome():
+            return False
+        return (min(self.getEnd(), interval.getEnd()) - max(self.getStart(), interval.getStart()) + 1 >= nbNucleotides)
+
+    def isIncludeIn(self, interval):
+        return interval.include(self)
+
+
+    def include(self, interval):
+        """
+        Whether this interval includes the other one
+        @param interval:      object to be compared to
+        @type  interval:      class L{Interval<Interval>}
+        """    
+        if self.getChromosome() != interval.getChromosome():
+            return False
+        return ((self.getStart() <= interval.getStart()) and (self.getEnd() >= interval.getEnd()))
+    
+    
+    def getDifference(self, interval, sameStrand = False):
+        """
+        Get the difference between this cluster and another one
+        @param interval:   object to be compared to
+        @type  interval:   class L{Interval<Interval>}
+        @param sameStrand: do the comparison iff the intervals are on the same strand
+        @type  sameStrand: boolean
+        @return:           a (possibly empty) list of intervals
+        """    
+        newInterval = Interval()
+        newInterval.copy(self)
+        if self.getChromosome() != interval.getChromosome():
+            return [newInterval]
+        if not self.overlapWith(interval):
+            return [newInterval]
+        if sameStrand and self.getDirection() != interval.getDirection():
+            return [newInterval]
+        intervals = []
+        if self.getStart() < interval.getStart():
+            newInterval = Interval()
+            newInterval.copy(self)
+            newInterval.setEnd(min(self.getEnd(), interval.getStart() - 1))
+            intervals.append(newInterval)
+        if self.getEnd() > interval.getEnd():
+            newInterval = Interval()
+            newInterval.copy(self)
+            newInterval.setStart(max(self.getStart(), interval.getEnd() + 1))
+            intervals.append(newInterval)
+        return intervals
+    
+    
+    def getIntersection(self, interval):
+        """
+        Get the intersection between this interval and another one
+        @param interval: object to be compared to
+        @type  interval: class L{Interval<Interval>}
+        @return:         an other interval
+        """    
+        if not self.overlapWith(interval):
+            return None
+        newInterval = Interval()
+        newInterval.setChromosome(self.getChromosome())
+        newInterval.setDirection(self.getDirection())
+        newInterval.setName("%s_intersect_%s" % (self.getName(), interval.getName()))
+        newInterval.setStart(max(self.getStart(), interval.getStart()))
+        newInterval.setEnd(min(self.getEnd(), interval.getEnd()))
+        return newInterval
+    
+    
+    def getDistance(self, interval):
+        """
+        Get the distance between two intervals (a non-negative value)
+        @param interval: another interval
+        @type    interval: class L{Interval<Interval>}
+        """    
+        if self.overlapWith(interval):
+            return 0
+        if self.getChromosome() != interval.getChromosome():
+            raise Exception("Cannot get the distance between %s and %s" % (str(self), str(interval)))
+        return min(abs(self.getStart() - interval.getEnd()), abs(self.getEnd() - interval.getStart()))
+
+
+    def getRelativeDistance(self, interval):
+        """
+        Get the distance between two intervals (negative if first interval is before)
+        @param interval: another interval
+        @type    interval: class L{Interval<Interval>}
+        """    
+        if self.overlapWith(interval):
+            return 0
+        if self.getChromosome() != interval.getChromosome():
+            raise Exception("Cannot get the distance between %s and %s" % (str(self), str(interval)))
+        if self.getEnd() < interval.getStart():
+            distance = interval.getStart() - self.getEnd()
+        else:
+            distance = interval.getEnd() - self.getStart()
+        distance *= self.getDirection()
+        return distance
+
+
+    def merge(self, interval, normalization = False):
+        """
+        Merge two intervals
+        @param interval:        another interval
+        @type    interval:        class L{Interval<Interval>}
+        @param normalization: whether the sum of the merge should be normalized wrt the number of mappings of each elements
+        @type    normalization: boolean
+        """
+        if self.getChromosome() != interval.getChromosome():
+            raise Exception("Cannot merge '%s' and '%s' for they are on different chromosomes." % (str(self), str(interval)))
+        direction = None
+        if self.getStart() == self.getEnd():
+            direction = interval.getDirection()
+        elif interval.getStart() == interval.getEnd():
+            direction = self.getDirection()
+        elif self.getDirection() != interval.getDirection():
+            raise Exception("Cannot merge '%s' and '%s' for they are on different strands." % (str(self), str(interval)))
+        self.setStart(min(self.getStart(), interval.getStart()))
+        self.setEnd(max(self.getEnd(), interval.getEnd()))
+        if direction != None:
+            self.setDirection(direction)
+        nbElements = 0.0
+        for element in (self, interval):
+            for tagName in ("nbElements", "nbOccurrences"):
+                if tagName not in element.getTagNames():
+                    element.setTagValue(tagName, 1)
+            nbElements += float(element.getTagValue("nbElements")) / float(element.getTagValue("nbOccurrences")) if normalization else float(element.getTagValue("nbElements"))
+        self.setTagValue("nbElements", nbElements)
+        self.bin = None
+        for tagName in ("identity", "nbOccurrences", "occurrence", "nbMismatches", "nbGaps", "rank", "evalue", "bestRegion"):
+            if tagName in self.getTagNames():
+                del self.tags[tagName]
+
+
+    def getBin(self):
+        """
+        Get the bin of the interval
+        Computed on the fly
+        """
+        if self.bin == None:
+            self.bin = getBin(self.getStart(), self.getEnd())
+        return self.bin
+
+
+    def getBins(self):
+        """
+        Get all the bin this interval could fall into
+        """
+        return getOverlappingBins(self.getStart(), self.getEnd())
+
+
+    def getSqlVariables(cls):
+        """
+        Get the properties of the object that should be saved in a database
+        """
+        variables = ["name", "chromosome", "start", "end", "direction", "tags", "bin"]
+        return variables
+    getSqlVariables = classmethod(getSqlVariables)
+
+
+    def setSqlValues(self, array):
+        """
+        Set the values of the properties of this object as given by a results line of a SQL query
+        """
+        self.id         = array[0]
+        self.name       = array[1].strip("'")
+        self.setChromosome(array[2].strip("'"))
+        self.setStart(array[3])
+        self.setEnd(array[4])
+        self.setDirection(array[5])
+        self.setTagValues(array[6].strip("'"), ";", "=")
+        self.bin        = array[7]
+
+
+    def getSqlValues(self):
+        """
+        Get the values of the properties that should be saved in a database
+        """
+        values = dict()
+        values["name"]       = self.name
+        values["chromosome"] = self.getChromosome()
+        values["start"]      = self.getStart()
+        values["end"]        = self.getEnd()
+        values["direction"]  = self.getDirection()
+        values["tags"]       = self.getTagValues(";", "=")
+        values["bin"]        = self.getBin()
+        return values
+
+
+    def getSqlTypes(cls):
+        """
+        Get the values of the properties that should be saved in a database
+        """
+        types = dict()
+        types["name"]       = "varchar"
+        types["chromosome"] = "varchar"
+        types["start"]      = "int"
+        types["end"]        = "int"
+        types["direction"]  = "tinyint"
+        types["tags"]       = "varchar"
+        types["bin"]        = "int"
+        return types
+    getSqlTypes = classmethod(getSqlTypes)
+    
+
+    def getSqlSizes(cls):
+        """
+        Get the sizes of the properties that should be saved in a database
+        """
+        sizes = dict()
+        sizes["name"]       = 255
+        sizes["chromosome"] = 255
+        sizes["start"]      = 11
+        sizes["end"]        = 11
+        sizes["direction"]  = 4
+        sizes["tags"]       = 1023
+        sizes["bin"]        = 11
+        return sizes
+    getSqlSizes = classmethod(getSqlSizes)
+    
+
+    def printCoordinates(self):
+        """
+        Print start and end positions (depending on the direction of the interval)
+        """
+        if self.getDirection() == 1:
+            return "%d-%d" % (self.getStart(), self.getEnd())
+        else:
+            return "%d-%d" % (self.getEnd(), self.getStart())
+
+    
+    def extractSequence(self, parser):
+        """
+        Get the sequence corresponding to this interval
+        @param parser: a parser to a FASTA file
+        @type    parser: class L{SequenceListParser<SequenceListParser>}
+        @return        : a instance of L{Sequence<Sequence>}
+        """
+        return parser.getSubSequence(self.getChromosome(), self.getStart(), self.getEnd(), self.getDirection(), self.name)
+    
+    
+    def extractWigData(self, parser):
+        """
+        Get the data retrieved from a wig file
+        @param parser: a parser class to a WIG file
+        @type    parser: class L{WigParser<WigParser>}
+        """
+        data = parser.getRange(self.getChromosome(), self.getStart(), self.getEnd())
+        if self.getDirection() == -1:
+            if parser.strands:
+                newData = {}
+                for strand in data:
+                    data[strand].reverse()
+                    newData[-strand] = data[strand]
+                data = newData
+            else:
+                data.reverse()
+        return data
+
+
+    def __str__(self):
+        """
+        Output a simple representation of this interval
+        """
+        direction = "+"
+        if self.getDirection() == -1:
+            direction = "-"
+        string = "%s:%d-%d (%s)" % (self.getChromosome(), self.getStart(), self.getEnd(), direction)
+        if self.name != "":
+            string = "(%s) %s" % (self.name, string)
+        return string
+