Mercurial > repos > yufei-luo > s_mart
view SMART/Java/Python/structure/Interval.py @ 44:5f796c5c579f
Uploaded
author | m-zytnicki |
---|---|
date | Wed, 18 Sep 2013 08:32:38 -0400 |
parents | 44d5973c188c |
children | 169d364ddd91 |
line wrap: on
line source
# # Copyright INRA-URGI 2009-2010 # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. # from SMART.Java.Python.structure.Bins import * from commons.core.coord.Range import Range class Interval(Range): """ Store a genomic interval @ivar name: name of the interval [optional] @type name: string @ivar id: id of the interval [optional] @type id: int @ivar bin: bin in which the interval should be if stored in a database [computed] @type bin: int @ival tags: information about the transcript [optional] @type tags: dict @ivar verbosity: verbosity @type verbosity: int [default: 0] """ def __init__(self, interval = None, verbosity = 0): """ Constructor @param interval: interval to be copied @type interval: class L{Interval<Interval>} @param verbosity: verbosity @type verbosity: int """ Range.__init__(self) self.name = None self.id = None self.bin = None self.verbosity = verbosity self.tags = {} if interval != None: self.copy(interval) #!!!! Warning: two methods getStart() and getEnd() give the information maximum and minimum in interval.!!!!# #In case strand = "+", start < end; strand = "-", start > end def getStart(self): if self.start == -1: return -1 if self.end == -1: return self.start return self.getMin() def getEnd(self): if self.end == -1: return -1 if self.start == -1: return self.end return self.getMax() def getChromosome(self): return self.getSeqname() def getDirection(self): return 1 if self.getStrand() == "+" else -1 def getName(self): return self.name def isSet(self): """ Check if the interval is set """ return self.getStart() == None and self.getEnd() == None def copy(self, interval): """ Copy method @param interval: interval to be copied @type interval: class L{Interval<Interval>} """ self.setStart(interval.getStart()) self.setEnd(interval.getEnd()) self.setChromosome(interval.getChromosome()) self.setDirection(interval.getDirection()) self.name = interval.name self.id = interval.id self.bin = interval.bin self.tags = {} for tag in interval.tags: self.tags[tag] = interval.tags[tag] self.verbosity = interval.verbosity def setName(self, name): """ Set the name @param name: name of the interval @type name: string """ if len(name) > 100: name = name[:100] self.name = name def setChromosome(self, chromosome=""): """ Set the chromosome @param chromosome: chromosome on which the interval is @type chromosome: string """ if not chromosome: self.seqname = None else: self.seqname = chromosome.replace(".", "_").replace("|", "_") def setStart(self, start): """ Set the start point Possibly reset bin @param start: start point of the interval @type start: int """ self.bin = None direction = self.getDirection() if self.start == -1: self.start = start elif self.end == -1: self.end = start else: if direction == 1: self.start = start else: self.end = start if direction == 1: self.start, self.end = min(self.start, self.end), max(self.start, self.end) else: self.start, self.end = max(self.start, self.end), min(self.start, self.end) def setEnd(self, end): """ Set the end point Possibly reset bin @param end: end point of the interval of the interval @type end: int """ self.bin = None direction = self.getDirection() if self.end == -1: self.end = end elif self.start == -1: self.start = end else: if direction == 1: self.end = end else: self.start = end if direction == 1: self.start, self.end = min(self.start, self.end), max(self.start, self.end) else: self.start, self.end = max(self.start, self.end), min(self.start, self.end) def setSize(self, size): """ Possibly modify the end point @param size: size of the transcript @type size: int """ if self.end == None and self.start != None: self.setEnd(self.start + self.getSize() - 1) elif self.start == None and self.end != None: self.setStart(self.end - self.getSize() + 1) def getSize(self): """ Get the size """ return self.getEnd() - self.getStart() + 1 def _setDirection(self, direction): """ Set the direction of the interval (connection to Range) @param direction: direction of the transcript (+ / -) @type direction: int (1 or -1) """ if direction * self.getDirection() < 0: self.reverse() def setDirection(self, direction): """ Set the direction of the interval Possibly parse different formats @param direction: direction of the transcript (+ / -) @type direction: int or string """ if type(direction).__name__ == 'int': self._setDirection(direction / abs(direction)) elif type(direction).__name__ == 'str': if direction == "+": self._setDirection(1) elif direction == "-": self._setDirection(-1) elif direction == "1" or direction == "-1": self._setDirection(int(direction)) elif direction.lower() == "plus": self._setDirection(1) elif direction.lower() == "minus": self._setDirection(-1) else: raise Exception("Cannot understand direction %s" % (direction)) else: raise Exception("Cannot understand direction %s" % (direction)) def extendStart(self, size): """ Extend the interval by the 5' end @param size: the size to be exended @type size: int """ if self.getDirection() == 1: self.setStart(max(0, self.getStart() - size)) else: self.setEnd(self.getEnd() + size) self.bin = None def extendEnd(self, size): """ Extend the interval by the 3' end @param size: the size to be exended @type size: int """ if self.getDirection() == 1: self.setEnd(self.getEnd() + size) else: self.setStart(max(0, self.getStart() - size)) self.bin = None def restrictStart(self, size = 1): """ Restrict the interval by some nucleotides, start from its start position Remove the exons @param size: the size to be restricted to @type size: int """ if self.getDirection() == 1: self.setEnd(min(self.getEnd(), self.getStart() + size - 1)) else: self.setStart(max(self.getStart(), self.getEnd() - size + 1)) self.bin = None def restrictEnd(self, size = 1): """ Restrict the interval by some nucleotides, end from its end position Remove the exons @param size: the size to be restricted to @type size: int """ if self.getDirection() == 1: self.setStart(max(self.getStart(), self.getEnd() - size + 1)) else: self.setEnd(min(self.getEnd(), self.getStart() + size - 1)) self.bin = None def setTagValue(self, name, value): """ Set a tag @param name: name of the tag @type name: string @param value: value of the tag @type value: int or string """ self.tags[name] = value def getTagNames(self): """ Get all the names of the tags """ return self.tags.keys() def getTagValue(self, tag): """ Get the value of a tag @param tag: name of a tag @type tag: string """ if tag not in self.tags: return None return self.tags[tag] def getTagValues(self, tagSep = "; ", fieldSep = " ", surrounder = ""): """ Get the formatted tag values @param tagSep: separator between tags @type tagSep: string @param fieldSep: separator between tag name and tag value @type fieldSep: string @param surrounder: string which optionally surround values @type surrounder: string """ tags = [] for name, value in self.tags.iteritems(): if value == None: continue if isinstance(value, basestring): tags.append("%s%s%s%s%s" % (name, fieldSep, surrounder, value.replace("'", "\\'"), surrounder)) elif type(value) is int: tags.append("%s%s%s%i%s" % (name, fieldSep, surrounder, value, surrounder)) elif type(value) is float: tags.append("%s%s%s%f%s" % (name, fieldSep, surrounder, value, surrounder)) else: raise Exception("Do not know how to print '" + value + "'.") if self.getName() != None: tags.append("%s%s%s%s%s" % ("Name", fieldSep, surrounder, self.getName(), surrounder)) return tagSep.join(tags) def setTagValues(self, tags, tagSep = "; ", fieldSep = " "): """ Set the tag values using given string @param tags: the tags, concatenated @type tags: string @param tagSep: separator between tags @type tagSep: string @param fieldSep: separator between tag name and tag value @type fieldSep: string """ if tags == "": self.tags = {} return for splittedTag in tags.split(tagSep): if fieldSep not in splittedTag: raise Exception("Weird field '%s' in tags '%s'" % (splittedTag, tags)) tag, value = splittedTag.split(fieldSep, 1) if tag == "Name": self.setName(value) continue try: intValue = int(value) self.tags[tag] = intValue except ValueError: try: floatValue = float(value) self.tags[tag] = floatValue except ValueError: self.tags[tag] = value def deleteTag(self, tag): """ Remove a tag @param tag: the tag to be removed @type tag: string """ if tag in self.tags: del self.tags[tag] def setNbOccurrences(self, nbOccurrences): """ Set the number of occurrences of the interval @param nbOccurrences: number of occurrences of the interval @type nbOccurrences: int """ self.setTagValue("nbOccurrences", nbOccurrences) def setOccurrence(self, occurrence): """ Set the occurrence of this interval @param occurrence: an occurrence for this transcript @type occurrence: int """ self.setTagValue("occurrence", occurrence) def __eq__(self, interval): """ Whether two intervals are equal (start and end at same position) @param interval: object to be compared to @type interval: class L{Interval<Interval>} """ if not interval: return False return self.getChromosome() == interval.getChromosome() and self.getStart() == interval.getStart() and self.getEnd() == interval.getEnd() and self.getDirection() == interval.getDirection() def overlapWith(self, interval, nbNucleotides = 1): """ Whether two intervals overlap @param interval: object to be compared to @type interval: class L{Interval<Interval>} @param nbNucleotides: minimum number of nucleotides to declare and overlap @type nbNucleotides: int """ if self.getChromosome() != interval.getChromosome(): return False return (min(self.getEnd(), interval.getEnd()) - max(self.getStart(), interval.getStart()) + 1 >= nbNucleotides) def isIncludeIn(self, interval): return interval.include(self) def include(self, interval): """ Whether this interval includes the other one @param interval: object to be compared to @type interval: class L{Interval<Interval>} """ if self.getChromosome() != interval.getChromosome(): return False return ((self.getStart() <= interval.getStart()) and (self.getEnd() >= interval.getEnd())) def getDifference(self, interval, sameStrand = False): """ Get the difference between this cluster and another one @param interval: object to be compared to @type interval: class L{Interval<Interval>} @param sameStrand: do the comparison iff the intervals are on the same strand @type sameStrand: boolean @return: a (possibly empty) list of intervals """ newInterval = Interval() newInterval.copy(self) if self.getChromosome() != interval.getChromosome(): return [newInterval] if not self.overlapWith(interval): return [newInterval] if sameStrand and self.getDirection() != interval.getDirection(): return [newInterval] intervals = [] if self.getStart() < interval.getStart(): newInterval = Interval() newInterval.copy(self) newInterval.setEnd(min(self.getEnd(), interval.getStart() - 1)) intervals.append(newInterval) if self.getEnd() > interval.getEnd(): newInterval = Interval() newInterval.copy(self) newInterval.setStart(max(self.getStart(), interval.getEnd() + 1)) intervals.append(newInterval) return intervals def getIntersection(self, interval): """ Get the intersection between this interval and another one @param interval: object to be compared to @type interval: class L{Interval<Interval>} @return: an other interval """ if not self.overlapWith(interval): return None newInterval = Interval() newInterval.setChromosome(self.getChromosome()) newInterval.setDirection(self.getDirection()) newInterval.setName("%s_intersect_%s" % (self.getName(), interval.getName())) newInterval.setStart(max(self.getStart(), interval.getStart())) newInterval.setEnd(min(self.getEnd(), interval.getEnd())) return newInterval def getDistance(self, interval): """ Get the distance between two intervals (a non-negative value) @param interval: another interval @type interval: class L{Interval<Interval>} """ if self.overlapWith(interval): return 0 if self.getChromosome() != interval.getChromosome(): raise Exception("Cannot get the distance between %s and %s" % (str(self), str(interval))) return min(abs(self.getStart() - interval.getEnd()), abs(self.getEnd() - interval.getStart())) def getRelativeDistance(self, interval): """ Get the distance between two intervals (negative if first interval is before) @param interval: another interval @type interval: class L{Interval<Interval>} """ if self.overlapWith(interval): return 0 if self.getChromosome() != interval.getChromosome(): raise Exception("Cannot get the distance between %s and %s" % (str(self), str(interval))) if self.getEnd() < interval.getStart(): distance = interval.getStart() - self.getEnd() else: distance = interval.getEnd() - self.getStart() distance *= self.getDirection() return distance def merge(self, interval, normalization = False): """ Merge two intervals @param interval: another interval @type interval: class L{Interval<Interval>} @param normalization: whether the sum of the merge should be normalized wrt the number of mappings of each elements @type normalization: boolean """ if self.getChromosome() != interval.getChromosome(): raise Exception("Cannot merge '%s' and '%s' for they are on different chromosomes." % (str(self), str(interval))) direction = None if self.getStart() == self.getEnd(): direction = interval.getDirection() elif interval.getStart() == interval.getEnd(): direction = self.getDirection() elif self.getDirection() != interval.getDirection(): raise Exception("Cannot merge '%s' and '%s' for they are on different strands." % (str(self), str(interval))) self.setStart(min(self.getStart(), interval.getStart())) self.setEnd(max(self.getEnd(), interval.getEnd())) if direction != None: self.setDirection(direction) nbElements = 0.0 for element in (self, interval): for tagName in ("nbElements", "nbOccurrences"): if tagName not in element.getTagNames(): element.setTagValue(tagName, 1) nbElements += float(element.getTagValue("nbElements")) / float(element.getTagValue("nbOccurrences")) if normalization else float(element.getTagValue("nbElements")) self.setTagValue("nbElements", nbElements) self.bin = None for tagName in ("identity", "nbOccurrences", "occurrence", "nbMismatches", "nbGaps", "rank", "evalue", "bestRegion"): if tagName in self.getTagNames(): del self.tags[tagName] def getBin(self): """ Get the bin of the interval Computed on the fly """ if self.bin == None: self.bin = getBin(self.getStart(), self.getEnd()) return self.bin def getBins(self): """ Get all the bin this interval could fall into """ return getOverlappingBins(self.getStart(), self.getEnd()) def getSqlVariables(cls): """ Get the properties of the object that should be saved in a database """ variables = ["name", "chromosome", "start", "end", "direction", "tags", "bin"] return variables getSqlVariables = classmethod(getSqlVariables) def setSqlValues(self, array): """ Set the values of the properties of this object as given by a results line of a SQL query """ self.id = array[0] self.name = array[1].strip("'") self.setChromosome(array[2].strip("'")) self.setStart(array[3]) self.setEnd(array[4]) self.setDirection(array[5]) self.setTagValues(array[6].strip("'"), ";", "=") self.bin = array[7] def getSqlValues(self): """ Get the values of the properties that should be saved in a database """ values = dict() values["name"] = self.name values["chromosome"] = self.getChromosome() values["start"] = self.getStart() values["end"] = self.getEnd() values["direction"] = self.getDirection() values["tags"] = self.getTagValues(";", "=") values["bin"] = self.getBin() return values def getSqlTypes(cls): """ Get the values of the properties that should be saved in a database """ types = dict() types["name"] = "varchar" types["chromosome"] = "varchar" types["start"] = "int" types["end"] = "int" types["direction"] = "tinyint" types["tags"] = "varchar" types["bin"] = "int" return types getSqlTypes = classmethod(getSqlTypes) def getSqlSizes(cls): """ Get the sizes of the properties that should be saved in a database """ sizes = dict() sizes["name"] = 255 sizes["chromosome"] = 255 sizes["start"] = 11 sizes["end"] = 11 sizes["direction"] = 4 sizes["tags"] = 1023 sizes["bin"] = 11 return sizes getSqlSizes = classmethod(getSqlSizes) def printCoordinates(self): """ Print start and end positions (depending on the direction of the interval) """ if self.getDirection() == 1: return "%d-%d" % (self.getStart(), self.getEnd()) else: return "%d-%d" % (self.getEnd(), self.getStart()) def extractSequence(self, parser): """ Get the sequence corresponding to this interval @param parser: a parser to a FASTA file @type parser: class L{SequenceListParser<SequenceListParser>} @return : a instance of L{Sequence<Sequence>} """ return parser.getSubSequence(self.getChromosome(), self.getStart(), self.getEnd(), self.getDirection(), self.name) def extractWigData(self, parser): """ Get the data retrieved from a wig file @param parser: a parser class to a WIG file @type parser: class L{WigParser<WigParser>} """ data = parser.getRange(self.getChromosome(), self.getStart(), self.getEnd()) if self.getDirection() == -1: if parser.strands: newData = {} for strand in data: data[strand].reverse() newData[-strand] = data[strand] data = newData else: data.reverse() return data def __str__(self): """ Output a simple representation of this interval """ direction = "+" if self.getDirection() == -1: direction = "-" string = "%s:%d-%d (%s)" % (self.getChromosome(), self.getStart(), self.getEnd(), direction) if self.name != "": string = "(%s) %s" % (self.name, string) return string