Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/SMART/Java/Python/structure/Interval.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/SMART/Java/Python/structure/Interval.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,707 @@ +# +# Copyright INRA-URGI 2009-2010 +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. +# + +from SMART.Java.Python.structure.Bins import * +from commons.core.coord.Range import Range + +class Interval(Range): + """ + Store a genomic interval + @ivar name: name of the interval [optional] + @type name: string + @ivar id: id of the interval [optional] + @type id: int + @ivar bin: bin in which the interval should be if stored in a database [computed] + @type bin: int + @ival tags: information about the transcript [optional] + @type tags: dict + @ivar verbosity: verbosity + @type verbosity: int [default: 0] + """ + + def __init__(self, interval = None, verbosity = 0): + """ + Constructor + @param interval: interval to be copied + @type interval: class L{Interval<Interval>} + @param verbosity: verbosity + @type verbosity: int + """ + Range.__init__(self) + self.name = None + self.id = None + self.bin = None + self.verbosity = verbosity + self.tags = {} + if interval != None: + self.copy(interval) + + #!!!! Warning: two methods getStart() and getEnd() give the information maximum and minimum in interval.!!!!# + #In case strand = "+", start < end; strand = "-", start > end + def getStart(self): + if self.start == -1: + return -1 + if self.end == -1: + return self.start + return self.getMin() + + + def getEnd(self): + if self.end == -1: + return -1 + if self.start == -1: + return self.end + return self.getMax() + + + def getChromosome(self): + return self.getSeqname() + + + def getDirection(self): + return 1 if self.getStrand() == "+" else -1 + + + def getName(self): + return self.name + + + def isSet(self): + """ + Check if the interval is set + """ + return self.getStart() == None and self.getEnd() == None + + + def copy(self, interval): + """ + Copy method + @param interval: interval to be copied + @type interval: class L{Interval<Interval>} + """ + self.setStart(interval.getStart()) + self.setEnd(interval.getEnd()) + self.setChromosome(interval.getChromosome()) + self.setDirection(interval.getDirection()) + self.name = interval.name + self.id = interval.id + self.bin = interval.bin + self.tags = {} + for tag in interval.tags: + self.tags[tag] = interval.tags[tag] + self.verbosity = interval.verbosity + + + def setName(self, name): + """ + Set the name + @param name: name of the interval + @type name: string + """ + if len(name) > 100: + name = name[:100] + self.name = name + + + def setChromosome(self, chromosome=""): + """ + Set the chromosome + @param chromosome: chromosome on which the interval is + @type chromosome: string + """ + if not chromosome: + self.seqname = None + else: + self.seqname = chromosome.replace(".", "_").replace("|", "_") + + + def setStart(self, start): + """ + Set the start point + Possibly reset bin + @param start: start point of the interval + @type start: int + """ + self.bin = None + direction = self.getDirection() + if self.start == -1: + self.start = start + elif self.end == -1: + self.end = start + else: + if direction == 1: + self.start = start + else: + self.end = start + if direction == 1: + self.start, self.end = min(self.start, self.end), max(self.start, self.end) + else: + self.start, self.end = max(self.start, self.end), min(self.start, self.end) + + + def setEnd(self, end): + """ + Set the end point + Possibly reset bin + @param end: end point of the interval of the interval + @type end: int + """ + self.bin = None + direction = self.getDirection() + if self.end == -1: + self.end = end + elif self.start == -1: + self.start = end + else: + if direction == 1: + self.end = end + else: + self.start = end + if direction == 1: + self.start, self.end = min(self.start, self.end), max(self.start, self.end) + else: + self.start, self.end = max(self.start, self.end), min(self.start, self.end) + + + def setSize(self, size): + """ + Possibly modify the end point + @param size: size of the transcript + @type size: int + """ + if self.end == None and self.start != None: + self.setEnd(self.start + self.getSize() - 1) + elif self.start == None and self.end != None: + self.setStart(self.end - self.getSize() + 1) + + + def getSize(self): + """ + Get the size + """ + return self.getEnd() - self.getStart() + 1 + + + def _setDirection(self, direction): + """ + Set the direction of the interval (connection to Range) + @param direction: direction of the transcript (+ / -) + @type direction: int (1 or -1) + """ + if direction * self.getDirection() < 0: + self.reverse() + + + def setDirection(self, direction): + """ + Set the direction of the interval + Possibly parse different formats + @param direction: direction of the transcript (+ / -) + @type direction: int or string + """ + if type(direction).__name__ == 'int': + self._setDirection(direction / abs(direction)) + elif type(direction).__name__ == 'str': + if direction == "+": + self._setDirection(1) + elif direction == "-": + self._setDirection(-1) + elif direction == "1" or direction == "-1": + self._setDirection(int(direction)) + elif direction.lower() == "plus": + self._setDirection(1) + elif direction.lower() == "minus": + self._setDirection(-1) + else: + raise Exception("Cannot understand direction %s" % (direction)) + else: + raise Exception("Cannot understand direction %s" % (direction)) + + + def extendStart(self, size): + """ + Extend the interval by the 5' end + @param size: the size to be exended + @type size: int + """ + if self.getDirection() == 1: + self.setStart(max(0, self.getStart() - size)) + else: + self.setEnd(self.getEnd() + size) + self.bin = None + + + def extendEnd(self, size): + """ + Extend the interval by the 3' end + @param size: the size to be exended + @type size: int + """ + if self.getDirection() == 1: + self.setEnd(self.getEnd() + size) + else: + self.setStart(max(0, self.getStart() - size)) + self.bin = None + + + def restrictStart(self, size = 1): + """ + Restrict the interval by some nucleotides, start from its start position + Remove the exons + @param size: the size to be restricted to + @type size: int + """ + if self.getDirection() == 1: + self.setEnd(min(self.getEnd(), self.getStart() + size - 1)) + else: + self.setStart(max(self.getStart(), self.getEnd() - size + 1)) + self.bin = None + + + def restrictEnd(self, size = 1): + """ + Restrict the interval by some nucleotides, end from its end position + Remove the exons + @param size: the size to be restricted to + @type size: int + """ + if self.getDirection() == 1: + self.setStart(max(self.getStart(), self.getEnd() - size + 1)) + else: + self.setEnd(min(self.getEnd(), self.getStart() + size - 1)) + self.bin = None + + + + def setTagValue(self, name, value): + """ + Set a tag + @param name: name of the tag + @type name: string + @param value: value of the tag + @type value: int or string + """ + self.tags[name] = value + + + def getTagNames(self): + """ + Get all the names of the tags + """ + return self.tags.keys() + + + def getTagValue(self, tag): + """ + Get the value of a tag + @param tag: name of a tag + @type tag: string + """ + if tag not in self.tags: + return None + return self.tags[tag] + + + def getTagValues(self, tagSep = "; ", fieldSep = " ", surrounder = ""): + """ + Get the formatted tag values + @param tagSep: separator between tags + @type tagSep: string + @param fieldSep: separator between tag name and tag value + @type fieldSep: string + @param surrounder: string which optionally surround values + @type surrounder: string + """ + tags = [] + for name in self.tags: + value = self.tags[name] + if value == None: + continue + if isinstance(value, basestring): + tags.append("%s%s%s%s%s" % (name, fieldSep, surrounder, self.tags[name], surrounder)) + elif type(value) is int: + tags.append("%s%s%s%i%s" % (name, fieldSep, surrounder, self.tags[name], surrounder)) + elif type(value) is float: + tags.append("%s%s%s%f%s" % (name, fieldSep, surrounder, self.tags[name], surrounder)) + else: + raise Exception("Do not know how to print '" + value + "'.") + if self.getName() != None: + tags.append("%s%s%s%s%s" % ("Name", fieldSep, surrounder, self.getName(), surrounder)) + return tagSep.join(tags) + + + def setTagValues(self, tags, tagSep = "; ", fieldSep = " "): + """ + Set the tag values using given string + @param tags: the tags, concatenated + @type tags: string + @param tagSep: separator between tags + @type tagSep: string + @param fieldSep: separator between tag name and tag value + @type fieldSep: string + """ + if tags == "": + self.tags = {} + return + for splittedTag in tags.split(tagSep): + if fieldSep not in splittedTag: + raise Exception("Weird field '%s' in tags '%s'" % (splittedTag, tags)) + tag, value = splittedTag.split(fieldSep, 1) + if tag == "Name": + self.setName(value) + continue + try: + intValue = int(value) + self.tags[tag] = intValue + except ValueError: + try: + floatValue = float(value) + self.tags[tag] = floatValue + except ValueError: + self.tags[tag] = value + + + def deleteTag(self, tag): + """ + Remove a tag + @param tag: the tag to be removed + @type tag: string + """ + if tag in self.tags: + del self.tags[tag] + + + def setNbOccurrences(self, nbOccurrences): + """ + Set the number of occurrences of the interval + @param nbOccurrences: number of occurrences of the interval + @type nbOccurrences: int + """ + self.setTagValue("nbOccurrences", nbOccurrences) + + + def setOccurrence(self, occurrence): + """ + Set the occurrence of this interval + @param occurrence: an occurrence for this transcript + @type occurrence: int + """ + self.setTagValue("occurrence", occurrence) + + def __eq__(self, interval): + """ + Whether two intervals are equal (start and end at same position) + @param interval: object to be compared to + @type interval: class L{Interval<Interval>} + """ + if not interval: + return False + return self.getChromosome() == interval.getChromosome() and self.getStart() == interval.getStart() and self.getEnd() == interval.getEnd() and self.getDirection() == interval.getDirection() + + + def overlapWith(self, interval, nbNucleotides = 1): + """ + Whether two intervals overlap + @param interval: object to be compared to + @type interval: class L{Interval<Interval>} + @param nbNucleotides: minimum number of nucleotides to declare and overlap + @type nbNucleotides: int + """ + if self.getChromosome() != interval.getChromosome(): + return False + return (min(self.getEnd(), interval.getEnd()) - max(self.getStart(), interval.getStart()) + 1 >= nbNucleotides) + + def isIncludeIn(self, interval): + return interval.include(self) + + + def include(self, interval): + """ + Whether this interval includes the other one + @param interval: object to be compared to + @type interval: class L{Interval<Interval>} + """ + if self.getChromosome() != interval.getChromosome(): + return False + return ((self.getStart() <= interval.getStart()) and (self.getEnd() >= interval.getEnd())) + + + def getDifference(self, interval, sameStrand = False): + """ + Get the difference between this cluster and another one + @param interval: object to be compared to + @type interval: class L{Interval<Interval>} + @param sameStrand: do the comparison iff the intervals are on the same strand + @type sameStrand: boolean + @return: a (possibly empty) list of intervals + """ + newInterval = Interval() + newInterval.copy(self) + if self.getChromosome() != interval.getChromosome(): + return [newInterval] + if not self.overlapWith(interval): + return [newInterval] + if sameStrand and self.getDirection() != interval.getDirection(): + return [newInterval] + intervals = [] + if self.getStart() < interval.getStart(): + newInterval = Interval() + newInterval.copy(self) + newInterval.setEnd(min(self.getEnd(), interval.getStart() - 1)) + intervals.append(newInterval) + if self.getEnd() > interval.getEnd(): + newInterval = Interval() + newInterval.copy(self) + newInterval.setStart(max(self.getStart(), interval.getEnd() + 1)) + intervals.append(newInterval) + return intervals + + + def getIntersection(self, interval): + """ + Get the intersection between this interval and another one + @param interval: object to be compared to + @type interval: class L{Interval<Interval>} + @return: an other interval + """ + if not self.overlapWith(interval): + return None + newInterval = Interval() + newInterval.setChromosome(self.getChromosome()) + newInterval.setDirection(self.getDirection()) + newInterval.setName("%s_intersect_%s" % (self.getName(), interval.getName())) + newInterval.setStart(max(self.getStart(), interval.getStart())) + newInterval.setEnd(min(self.getEnd(), interval.getEnd())) + return newInterval + + + def getDistance(self, interval): + """ + Get the distance between two intervals (a non-negative value) + @param interval: another interval + @type interval: class L{Interval<Interval>} + """ + if self.overlapWith(interval): + return 0 + if self.getChromosome() != interval.getChromosome(): + raise Exception("Cannot get the distance between %s and %s" % (str(self), str(interval))) + return min(abs(self.getStart() - interval.getEnd()), abs(self.getEnd() - interval.getStart())) + + + def getRelativeDistance(self, interval): + """ + Get the distance between two intervals (negative if first interval is before) + @param interval: another interval + @type interval: class L{Interval<Interval>} + """ + if self.overlapWith(interval): + return 0 + if self.getChromosome() != interval.getChromosome(): + raise Exception("Cannot get the distance between %s and %s" % (str(self), str(interval))) + if self.getEnd() < interval.getStart(): + distance = interval.getStart() - self.getEnd() + else: + distance = interval.getEnd() - self.getStart() + distance *= self.getDirection() + return distance + + + def merge(self, interval, normalization = False): + """ + Merge two intervals + @param interval: another interval + @type interval: class L{Interval<Interval>} + @param normalization: whether the sum of the merge should be normalized wrt the number of mappings of each elements + @type normalization: boolean + """ + if self.getChromosome() != interval.getChromosome(): + raise Exception("Cannot merge '%s' and '%s' for they are on different chromosomes." % (str(self), str(interval))) + direction = None + if self.getStart() == self.getEnd(): + direction = interval.getDirection() + elif interval.getStart() == interval.getEnd(): + direction = self.getDirection() + elif self.getDirection() != interval.getDirection(): + raise Exception("Cannot merge '%s' and '%s' for they are on different strands." % (str(self), str(interval))) + self.setStart(min(self.getStart(), interval.getStart())) + self.setEnd(max(self.getEnd(), interval.getEnd())) + if direction != None: + self.setDirection(direction) + nbElements = 0.0 + for element in (self, interval): + for tagName in ("nbElements", "nbOccurrences"): + if tagName not in element.getTagNames(): + element.setTagValue(tagName, 1) + nbElements += float(element.getTagValue("nbElements")) / float(element.getTagValue("nbOccurrences")) if normalization else float(element.getTagValue("nbElements")) + self.setTagValue("nbElements", nbElements) + self.bin = None + for tagName in ("identity", "nbOccurrences", "occurrence", "nbMismatches", "nbGaps", "rank", "evalue", "bestRegion"): + if tagName in self.getTagNames(): + del self.tags[tagName] + + + def getBin(self): + """ + Get the bin of the interval + Computed on the fly + """ + if self.bin == None: + self.bin = getBin(self.getStart(), self.getEnd()) + return self.bin + + + def getBins(self): + """ + Get all the bin this interval could fall into + """ + return getOverlappingBins(self.getStart(), self.getEnd()) + + + def getSqlVariables(cls): + """ + Get the properties of the object that should be saved in a database + """ + variables = ["name", "chromosome", "start", "end", "direction", "tags", "bin"] + return variables + getSqlVariables = classmethod(getSqlVariables) + + + def setSqlValues(self, array): + """ + Set the values of the properties of this object as given by a results line of a SQL query + """ + self.id = array[0] + self.name = array[1].strip("'") + self.setChromosome(array[2].strip("'")) + self.setStart(array[3]) + self.setEnd(array[4]) + self.setDirection(array[5]) + self.setTagValues(array[6].strip("'"), ";", "=") + self.bin = array[7] + + + def getSqlValues(self): + """ + Get the values of the properties that should be saved in a database + """ + values = dict() + values["name"] = self.name + values["chromosome"] = self.getChromosome() + values["start"] = self.getStart() + values["end"] = self.getEnd() + values["direction"] = self.getDirection() + values["tags"] = self.getTagValues(";", "=") + values["bin"] = self.getBin() + return values + + + def getSqlTypes(cls): + """ + Get the values of the properties that should be saved in a database + """ + types = dict() + types["name"] = "varchar" + types["chromosome"] = "varchar" + types["start"] = "int" + types["end"] = "int" + types["direction"] = "tinyint" + types["tags"] = "varchar" + types["bin"] = "int" + return types + getSqlTypes = classmethod(getSqlTypes) + + + def getSqlSizes(cls): + """ + Get the sizes of the properties that should be saved in a database + """ + sizes = dict() + sizes["name"] = 255 + sizes["chromosome"] = 255 + sizes["start"] = 11 + sizes["end"] = 11 + sizes["direction"] = 4 + sizes["tags"] = 1023 + sizes["bin"] = 11 + return sizes + getSqlSizes = classmethod(getSqlSizes) + + + def printCoordinates(self): + """ + Print start and end positions (depending on the direction of the interval) + """ + if self.getDirection() == 1: + return "%d-%d" % (self.getStart(), self.getEnd()) + else: + return "%d-%d" % (self.getEnd(), self.getStart()) + + + def extractSequence(self, parser): + """ + Get the sequence corresponding to this interval + @param parser: a parser to a FASTA file + @type parser: class L{SequenceListParser<SequenceListParser>} + @return : a instance of L{Sequence<Sequence>} + """ + return parser.getSubSequence(self.getChromosome(), self.getStart(), self.getEnd(), self.getDirection(), self.name) + + + def extractWigData(self, parser): + """ + Get the data retrieved from a wig file + @param parser: a parser class to a WIG file + @type parser: class L{WigParser<WigParser>} + """ + data = parser.getRange(self.getChromosome(), self.getStart(), self.getEnd()) + if self.getDirection() == -1: + if parser.strands: + newData = {} + for strand in data: + data[strand].reverse() + newData[-strand] = data[strand] + data = newData + else: + data.reverse() + return data + + + def __str__(self): + """ + Output a simple representation of this interval + """ + direction = "+" + if self.getDirection() == -1: + direction = "-" + string = "%s:%d-%d (%s)" % (self.getChromosome(), self.getStart(), self.getEnd(), direction) + if self.name != "": + string = "(%s) %s" % (self.name, string) + return string +