Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/SMART/Java/Python/GetFlanking.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/SMART/Java/Python/GetFlanking.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,231 @@ +#! /usr/bin/env python +# +# Copyright INRA-URGI 2009-2011 +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. +# +from optparse import OptionParser +from commons.core.parsing.ParserChooser import ParserChooser +from commons.core.writer.TranscriptWriter import TranscriptWriter +from SMART.Java.Python.structure.Transcript import Transcript +from SMART.Java.Python.structure.Interval import Interval +from SMART.Java.Python.misc.Progress import Progress + +QUERY = 0 +REFERENCE = 1 +INPUTS = (QUERY, REFERENCE) +STRANDS = (-1, 1) +TAG_DISTANCE = "distance_" +TAG_SENSE = "_sense" +TAG_REGION = "_region" +TAGS_REGION = {-1: "_upstream", 0: "", 1: "_downstream"} +TAGS_RREGION = {-1: "upstream", 0: "overlapping", 1: "downstream"} +TAGS_SENSE = {-1: "antisense", 0: "", 1: "colinear"} +STRANDSTOSTR = {-1: "(-)", 0: "", 1: "(+)"} + + +def getOrderKey(transcript, direction): + if direction == 1: + return transcript.getEnd() + return - transcript.getStart() + +def isInGoodRegion(transcriptRef, transcriptQuery, direction): + if direction == 1: + return transcriptQuery.getEnd() > transcriptRef.getEnd() + return transcriptQuery.getStart() < transcriptRef.getStart() + + +class GetFlanking(object): + + def __init__(self, verbosity): + self.verbosity = verbosity + self.transcripts = dict([id, {}] for id in INPUTS) + self.directions = [] + self.noOverlap = False + self.colinear = False + self.antisense = False + self.distance = None + self.minDistance = None + self.maxDistance = None + self.tagName = "flanking" + + def setInputFile(self, fileName, format, id): + chooser = ParserChooser(self.verbosity) + chooser.findFormat(format) + parser = chooser.getParser(fileName) + for transcript in parser.getIterator(): + chromosome = transcript.getChromosome() + if chromosome not in self.transcripts[id]: + self.transcripts[id][chromosome] = [] + self.transcripts[id][chromosome].append(transcript) + + def setOutputFile(self, fileName): + self.writer = TranscriptWriter(fileName, "gff3", self.verbosity) + + def addUpstreamDirection(self, upstream): + if upstream: + self.directions.append(-1) + + def addDownstreamDirection(self, downstream): + if downstream: + self.directions.append(1) + + def setColinear(self, colinear): + self.colinear = colinear + + def setAntisense(self, antisense): + self.antisense = antisense + + def setNoOverlap(self, noOverlap): + self.noOverlap = noOverlap + + def setMinDistance(self, distance): + self.minDistance = distance + + def setMaxDistance(self, distance): + self.maxDistance = distance + + def setNewTagName(self, tagName): + self.tagName = tagName + + def match(self, transcriptRef, transcriptQuery, direction): + if self.noOverlap and transcriptRef.overlapWith(transcriptQuery): + return False + if self.colinear and transcriptRef.getDirection() != transcriptQuery.getDirection(): + return False + if self.antisense and transcriptRef.getDirection() == transcriptQuery.getDirection(): + return False + if self.minDistance != None or self.maxDistance != None: + distance = transcriptRef.getDistance(transcriptQuery) + if self.minDistance != None and distance < self.minDistance: + return False + if self.maxDistance != None and distance > self.maxDistance: + return False + return True + + def getFlanking(self, direction): + for chromosome in sorted(self.transcripts[REFERENCE].keys()): + if chromosome not in self.transcripts[QUERY]: + continue + sortedTranscripts = dict([id, {}] for id in INPUTS) + for id in INPUTS: + sortedTranscripts[id] = sorted(self.transcripts[id][chromosome], key = lambda t: getOrderKey(t, direction)) + refIndex = 0 + currentRefs = [] + outputs = set() + progress = Progress(len(sortedTranscripts[QUERY]), "Reading chr %s %s" % (chromosome, STRANDSTOSTR[direction]), self.verbosity) + for query in sortedTranscripts[QUERY]: + while refIndex < len(sortedTranscripts[REFERENCE]) and isInGoodRegion(sortedTranscripts[REFERENCE][refIndex], query, direction): + currentRefs.append(sortedTranscripts[REFERENCE][refIndex]) + refIndex += 1 + nextCurrentRefs = [] + for currentRef in currentRefs: + if self.match(currentRef, query, direction): + if currentRef not in self.flankings: + self.flankings[currentRef] = {} + self.flankings[currentRef][direction * currentRef.getDirection()] = query + else: + nextCurrentRefs.append(currentRef) + currentRefs = nextCurrentRefs + progress.inc() + progress.done() + + def setTags(self, query, reference, direction): + refName = reference.getTagValue("ID") + if refName == None: + refName = reference.getName() + if refName == None: + refName = reference.__str__() + query.setTagValue("%s%s" % (self.tagName, TAGS_REGION[direction]), refName) + query.setTagValue("%s_%s%s" % (TAG_DISTANCE, self.tagName, TAGS_REGION[direction]), query.getDistance(reference)) + if direction == 0: + query.setTagValue("%s_%s" % (TAG_SENSE, self.tagName), TAGS_SENSE[query.getDirection() * reference.getDirection()]) + query.setTagValue("%s_%s" % (TAG_REGION, self.tagName), TAGS_RREGION[cmp(query.getRelativeDistance(reference), 0)]) + for tag in reference.getTagNames(): + if tag not in ("quality", "feature"): + query.setTagValue("%s%s_%s" % (self.tagName, TAGS_REGION[direction], tag), reference.getTagValue(tag)) + return query + + def write(self): + outputs = set() + progress = Progress(len(self.flankings.keys()), "Printing data", self.verbosity) + for transcriptRef in self.flankings.keys(): + if self.directions: + for direction in self.directions: + if direction in self.flankings[transcriptRef]: + query = self.flankings[transcriptRef][direction] + outputs.add(self.setTags(query, transcriptRef, direction)) + else: + if self.flankings[transcriptRef]: + query = sorted(self.flankings[transcriptRef].values(), key = lambda query: query.getDistance(transcriptRef))[0] + outputs.add(self.setTags(query, transcriptRef, 0)) + progress.inc() + for transcript in sorted(list(outputs), key = lambda flanking: (flanking.getChromosome(), flanking.getStart(), flanking.getEnd())): + self.writer.addTranscript(transcript) + self.writer.close() + progress.done() + + def run(self): + self.flankings = {} + for direction in STRANDS: + self.getFlanking(direction) + self.write() + +if __name__ == "__main__": + + description = "Get Flanking v1.0.1: Get the flanking regions of a set of reference. [Category: Data Selection]" + + parser = OptionParser(description = description) + parser.add_option("-i", "--input1", dest="inputFileName1", action="store", type="string", help="query input file [compulsory] [format: file in transcript format given by -f]") + parser.add_option("-f", "--format1", dest="format1", action="store", type="string", help="format of previous file [compulsory] [format: transcript file format]") + parser.add_option("-j", "--input2", dest="inputFileName2", action="store", type="string", help="reference input file [compulsory] [format: file in transcript format given by -g]") + parser.add_option("-g", "--format2", dest="format2", action="store", type="string", help="format of previous file [compulsory] [format: transcript file format]") + parser.add_option("-5", "--upstream", dest="upstream", action="store_true", default=False, help="output upstream elements [format: boolean] [default: False]") + parser.add_option("-3", "--downstream", dest="downstream", action="store_true", default=False, help="output downstream elements [format: boolean] [default: False]") + parser.add_option("-c", "--colinear", dest="colinear", action="store_true", default=False, help="find first colinear element [format: boolean] [default: False]") + parser.add_option("-a", "--antisense", dest="antisense", action="store_true", default=False, help="find first anti-sense element [format: boolean] [default: False]") + parser.add_option("-e", "--noOverlap", dest="noOverlap", action="store_true", default=False, help="do not consider elements which are overlapping reference elements [format: boolean] [default: False]") + parser.add_option("-d", "--minDistance", dest="minDistance", action="store", default=None, type="int", help="minimum distance between 2 elements [format: int]") + parser.add_option("-D", "--maxDistance", dest="maxDistance", action="store", default=None, type="int", help="maximum distance between 2 elements [format: int]") + parser.add_option("-t", "--tag", dest="tagName", action="store", default="flanking", type="string", help="name of the new tag [format: string] [default: flanking]") + parser.add_option("-o", "--output", dest="outputFileName", action="store", type="string", help="output file [format: output file in GFF3 format]") + parser.add_option("-v", "--verbosity", dest="verbosity", action="store", default=1, type="int", help="trace level [format: int]") + (options, args) = parser.parse_args() + + gf = GetFlanking(options.verbosity) + gf.setInputFile(options.inputFileName1, options.format1, QUERY) + gf.setInputFile(options.inputFileName2, options.format2, REFERENCE) + gf.setOutputFile(options.outputFileName) + gf.addUpstreamDirection(options.upstream) + gf.addDownstreamDirection(options.downstream) + gf.setColinear(options.colinear) + gf.setAntisense(options.antisense) + gf.setNoOverlap(options.noOverlap) + gf.setMinDistance(options.minDistance) + gf.setMaxDistance(options.maxDistance) + gf.setNewTagName(options.tagName) + gf.run()