Mercurial > repos > yufei-luo > s_mart
diff SMART/Java/Python/GetFlanking.py @ 36:44d5973c188c
Uploaded
author | m-zytnicki |
---|---|
date | Tue, 30 Apr 2013 15:02:29 -0400 |
parents | |
children | 169d364ddd91 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/SMART/Java/Python/GetFlanking.py Tue Apr 30 15:02:29 2013 -0400 @@ -0,0 +1,233 @@ +#! /usr/bin/env python +# +# Copyright INRA-URGI 2009-2011 +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. +# +from optparse import OptionParser +from commons.core.parsing.ParserChooser import ParserChooser +from commons.core.writer.TranscriptWriter import TranscriptWriter +from SMART.Java.Python.structure.Transcript import Transcript +from SMART.Java.Python.structure.Interval import Interval +from SMART.Java.Python.misc.Progress import Progress + +QUERY = 0 +REFERENCE = 1 +INPUTS = (QUERY, REFERENCE) +STRANDS = (-1, 1) +TAG_DISTANCE = "distance_" +TAG_SENSE = "_sense" +TAG_REGION = "_region" +TAGS_REGION = {-1: "_upstream", 0: "", 1: "_downstream"} +TAGS_RREGION = {-1: "upstream", 0: "overlapping", 1: "downstream"} +TAGS_SENSE = {-1: "antisense", 0: "", 1: "collinear"} +STRANDSTOSTR = {-1: "(-)", 0: "", 1: "(+)"} + + +def getOrderKey(transcript, direction, input): + if direction == 1: + if input == QUERY: + return (transcript.getEnd(), -transcript.getStart()) + return (transcript.getStart(), -transcript.getEnd()) + if input == QUERY: + return (-transcript.getStart(), transcript.getEnd()) + return (-transcript.getEnd(), transcript.getStart()) + + +class GetFlanking(object): + + def __init__(self, verbosity): + self.verbosity = verbosity + self.transcripts = dict([id, {}] for id in INPUTS) + self.directions = [] + self.noOverlap = False + self.colinear = False + self.antisense = False + self.distance = None + self.minDistance = None + self.maxDistance = None + self.tagName = "flanking" + + def setInputFile(self, fileName, format, id): + chooser = ParserChooser(self.verbosity) + chooser.findFormat(format) + parser = chooser.getParser(fileName) + for transcript in parser.getIterator(): + chromosome = transcript.getChromosome() + if chromosome not in self.transcripts[id]: + self.transcripts[id][chromosome] = [] + self.transcripts[id][chromosome].append(transcript) + + def setOutputFile(self, fileName): + self.writer = TranscriptWriter(fileName, "gff3", self.verbosity) + + def addUpstreamDirection(self, upstream): + if upstream: + self.directions.append(-1) + + def addDownstreamDirection(self, downstream): + if downstream: + self.directions.append(1) + + def setColinear(self, colinear): + self.colinear = colinear + + def setAntisense(self, antisense): + self.antisense = antisense + + def setNoOverlap(self, noOverlap): + self.noOverlap = noOverlap + + def setMinDistance(self, distance): + self.minDistance = distance + + def setMaxDistance(self, distance): + self.maxDistance = distance + + def setNewTagName(self, tagName): + self.tagName = tagName + + def match(self, transcriptQuery, transcriptRef, direction): + #print "comparing", transcriptQuery, "with", transcriptRef, "on direction", direction + if direction == 1 and transcriptRef.getEnd() < transcriptQuery.getStart(): + return False + if direction == -1 and transcriptQuery.getEnd() < transcriptRef.getStart(): + return False + if self.noOverlap and transcriptRef.overlapWith(transcriptQuery): + return False + if self.colinear and transcriptRef.getDirection() != transcriptQuery.getDirection(): + return False + if self.antisense and transcriptRef.getDirection() == transcriptQuery.getDirection(): + return False + if self.minDistance != None or self.maxDistance != None: + distance = transcriptRef.getDistance(transcriptQuery) + if self.minDistance != None and distance < self.minDistance: + return False + if self.maxDistance != None and distance > self.maxDistance: + return False + return True + + def getFlanking(self, chromosome, direction): + if chromosome not in self.transcripts[REFERENCE]: + return + sortedTranscripts = dict([id, {}] for id in INPUTS) + for id in INPUTS: + sortedTranscripts[id] = sorted(self.transcripts[id][chromosome], key = lambda t: getOrderKey(t, direction, id)) + refIndex = 0 + progress = Progress(len(sortedTranscripts[QUERY]), "Reading chr %s %s" % (chromosome, STRANDSTOSTR[direction]), self.verbosity) + for query in sortedTranscripts[QUERY]: + #print "Q: ", query + #print "R1: ", sortedTranscripts[REFERENCE][refIndex] + while not self.match(query, sortedTranscripts[REFERENCE][refIndex], direction): + refIndex += 1 + if refIndex == len(sortedTranscripts[REFERENCE]): + progress.done() + #print "done" + return + #print "R2: ", sortedTranscripts[REFERENCE][refIndex] + self.flankings[query][direction] = sortedTranscripts[REFERENCE][refIndex] + progress.inc() + progress.done() + + def setTags(self, query, reference, direction): + refName = reference.getTagValue("ID") + if refName == None: + refName = reference.getName() + if refName == None: + refName = reference.__str__() + query.setTagValue("%s%s" % (self.tagName, TAGS_REGION[direction*query.getDirection()]), refName) + query.setTagValue("%s_%s%s" % (TAG_DISTANCE, self.tagName, TAGS_REGION[direction*query.getDirection()]), query.getDistance(reference)) + query.setTagValue("%s_%s" % (TAG_SENSE, self.tagName), TAGS_SENSE[query.getDirection() * reference.getDirection()]) + if direction == 0: + query.setTagValue("%s_%s" % (TAG_REGION, self.tagName), TAGS_RREGION[cmp(query.getRelativeDistance(reference), 0)]) + for tag in reference.getTagNames(): + if tag not in ("quality", "feature"): + query.setTagValue("%s%s_%s" % (self.tagName, TAGS_REGION[direction*query.getDirection()], tag), reference.getTagValue(tag)) + return query + + def write(self): + progress = Progress(len(self.flankings.keys()), "Printing data", self.verbosity) + for transcriptQuery in self.flankings.keys(): + if not self.flankings[transcriptQuery]: + self.writer.addTranscript(transcriptQuery) + elif self.directions: + for direction in self.directions: + #relativeDirection = direction if transcriptQuery.getDirection() == 1 else - direction + relativeDirection = direction * transcriptQuery.getDirection() + if relativeDirection in self.flankings[transcriptQuery]: + transcriptRef = self.flankings[transcriptQuery][relativeDirection] + transcriptQuery = self.setTags(transcriptQuery, transcriptRef, relativeDirection) + self.writer.addTranscript(transcriptQuery) + else: + transcriptRef = sorted(self.flankings[transcriptQuery].values(), key = lambda transcriptRef: transcriptQuery.getDistance(transcriptRef))[0] + self.writer.addTranscript(self.setTags(transcriptQuery, transcriptRef, 0)) + progress.inc() + progress.done() + + def run(self): + for chromosome in sorted(self.transcripts[QUERY].keys()): + self.flankings = dict([query, {}] for query in self.transcripts[QUERY][chromosome]) + for direction in STRANDS: + #print "comparison", chromosome, direction + self.getFlanking(chromosome, direction) + self.write() + self.writer.close() + +if __name__ == "__main__": + + description = "Get Flanking v1.0.1: Get the flanking regions of a set of reference. [Category: Data Selection]" + + parser = OptionParser(description = description) + parser.add_option("-i", "--input1", dest="inputFileName1", action="store", type="string", help="query input file [compulsory] [format: file in transcript format given by -f]") + parser.add_option("-f", "--format1", dest="format1", action="store", type="string", help="format of previous file [compulsory] [format: transcript file format]") + parser.add_option("-j", "--input2", dest="inputFileName2", action="store", type="string", help="reference input file [compulsory] [format: file in transcript format given by -g]") + parser.add_option("-g", "--format2", dest="format2", action="store", type="string", help="format of previous file [compulsory] [format: transcript file format]") + parser.add_option("-5", "--upstream", dest="upstream", action="store_true", default=False, help="output upstream elements [format: boolean] [default: False]") + parser.add_option("-3", "--downstream", dest="downstream", action="store_true", default=False, help="output downstream elements [format: boolean] [default: False]") + parser.add_option("-c", "--colinear", dest="colinear", action="store_true", default=False, help="find first colinear element [format: boolean] [default: False]") + parser.add_option("-a", "--antisense", dest="antisense", action="store_true", default=False, help="find first anti-sense element [format: boolean] [default: False]") + parser.add_option("-e", "--noOverlap", dest="noOverlap", action="store_true", default=False, help="do not consider elements which are overlapping reference elements [format: boolean] [default: False]") + parser.add_option("-d", "--minDistance", dest="minDistance", action="store", default=None, type="int", help="minimum distance between 2 elements [format: int]") + parser.add_option("-D", "--maxDistance", dest="maxDistance", action="store", default=None, type="int", help="maximum distance between 2 elements [format: int]") + parser.add_option("-t", "--tag", dest="tagName", action="store", default="flanking", type="string", help="name of the new tag [format: string] [default: flanking]") + parser.add_option("-o", "--output", dest="outputFileName", action="store", type="string", help="output file [format: output file in GFF3 format]") + parser.add_option("-v", "--verbosity", dest="verbosity", action="store", default=1, type="int", help="trace level [format: int]") + (options, args) = parser.parse_args() + + gf = GetFlanking(options.verbosity) + gf.setInputFile(options.inputFileName1, options.format1, QUERY) + gf.setInputFile(options.inputFileName2, options.format2, REFERENCE) + gf.setOutputFile(options.outputFileName) + gf.addUpstreamDirection(options.upstream) + gf.addDownstreamDirection(options.downstream) + gf.setColinear(options.colinear) + gf.setAntisense(options.antisense) + gf.setNoOverlap(options.noOverlap) + gf.setMinDistance(options.minDistance) + gf.setMaxDistance(options.maxDistance) + gf.setNewTagName(options.tagName) + gf.run()