Mercurial > repos > yufei-luo > s_mart
view SMART/Java/Python/GetFlanking.py @ 36:44d5973c188c
Uploaded
author | m-zytnicki |
---|---|
date | Tue, 30 Apr 2013 15:02:29 -0400 |
parents | |
children | 169d364ddd91 |
line wrap: on
line source
#! /usr/bin/env python # # Copyright INRA-URGI 2009-2011 # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. # from optparse import OptionParser from commons.core.parsing.ParserChooser import ParserChooser from commons.core.writer.TranscriptWriter import TranscriptWriter from SMART.Java.Python.structure.Transcript import Transcript from SMART.Java.Python.structure.Interval import Interval from SMART.Java.Python.misc.Progress import Progress QUERY = 0 REFERENCE = 1 INPUTS = (QUERY, REFERENCE) STRANDS = (-1, 1) TAG_DISTANCE = "distance_" TAG_SENSE = "_sense" TAG_REGION = "_region" TAGS_REGION = {-1: "_upstream", 0: "", 1: "_downstream"} TAGS_RREGION = {-1: "upstream", 0: "overlapping", 1: "downstream"} TAGS_SENSE = {-1: "antisense", 0: "", 1: "collinear"} STRANDSTOSTR = {-1: "(-)", 0: "", 1: "(+)"} def getOrderKey(transcript, direction, input): if direction == 1: if input == QUERY: return (transcript.getEnd(), -transcript.getStart()) return (transcript.getStart(), -transcript.getEnd()) if input == QUERY: return (-transcript.getStart(), transcript.getEnd()) return (-transcript.getEnd(), transcript.getStart()) class GetFlanking(object): def __init__(self, verbosity): self.verbosity = verbosity self.transcripts = dict([id, {}] for id in INPUTS) self.directions = [] self.noOverlap = False self.colinear = False self.antisense = False self.distance = None self.minDistance = None self.maxDistance = None self.tagName = "flanking" def setInputFile(self, fileName, format, id): chooser = ParserChooser(self.verbosity) chooser.findFormat(format) parser = chooser.getParser(fileName) for transcript in parser.getIterator(): chromosome = transcript.getChromosome() if chromosome not in self.transcripts[id]: self.transcripts[id][chromosome] = [] self.transcripts[id][chromosome].append(transcript) def setOutputFile(self, fileName): self.writer = TranscriptWriter(fileName, "gff3", self.verbosity) def addUpstreamDirection(self, upstream): if upstream: self.directions.append(-1) def addDownstreamDirection(self, downstream): if downstream: self.directions.append(1) def setColinear(self, colinear): self.colinear = colinear def setAntisense(self, antisense): self.antisense = antisense def setNoOverlap(self, noOverlap): self.noOverlap = noOverlap def setMinDistance(self, distance): self.minDistance = distance def setMaxDistance(self, distance): self.maxDistance = distance def setNewTagName(self, tagName): self.tagName = tagName def match(self, transcriptQuery, transcriptRef, direction): #print "comparing", transcriptQuery, "with", transcriptRef, "on direction", direction if direction == 1 and transcriptRef.getEnd() < transcriptQuery.getStart(): return False if direction == -1 and transcriptQuery.getEnd() < transcriptRef.getStart(): return False if self.noOverlap and transcriptRef.overlapWith(transcriptQuery): return False if self.colinear and transcriptRef.getDirection() != transcriptQuery.getDirection(): return False if self.antisense and transcriptRef.getDirection() == transcriptQuery.getDirection(): return False if self.minDistance != None or self.maxDistance != None: distance = transcriptRef.getDistance(transcriptQuery) if self.minDistance != None and distance < self.minDistance: return False if self.maxDistance != None and distance > self.maxDistance: return False return True def getFlanking(self, chromosome, direction): if chromosome not in self.transcripts[REFERENCE]: return sortedTranscripts = dict([id, {}] for id in INPUTS) for id in INPUTS: sortedTranscripts[id] = sorted(self.transcripts[id][chromosome], key = lambda t: getOrderKey(t, direction, id)) refIndex = 0 progress = Progress(len(sortedTranscripts[QUERY]), "Reading chr %s %s" % (chromosome, STRANDSTOSTR[direction]), self.verbosity) for query in sortedTranscripts[QUERY]: #print "Q: ", query #print "R1: ", sortedTranscripts[REFERENCE][refIndex] while not self.match(query, sortedTranscripts[REFERENCE][refIndex], direction): refIndex += 1 if refIndex == len(sortedTranscripts[REFERENCE]): progress.done() #print "done" return #print "R2: ", sortedTranscripts[REFERENCE][refIndex] self.flankings[query][direction] = sortedTranscripts[REFERENCE][refIndex] progress.inc() progress.done() def setTags(self, query, reference, direction): refName = reference.getTagValue("ID") if refName == None: refName = reference.getName() if refName == None: refName = reference.__str__() query.setTagValue("%s%s" % (self.tagName, TAGS_REGION[direction*query.getDirection()]), refName) query.setTagValue("%s_%s%s" % (TAG_DISTANCE, self.tagName, TAGS_REGION[direction*query.getDirection()]), query.getDistance(reference)) query.setTagValue("%s_%s" % (TAG_SENSE, self.tagName), TAGS_SENSE[query.getDirection() * reference.getDirection()]) if direction == 0: query.setTagValue("%s_%s" % (TAG_REGION, self.tagName), TAGS_RREGION[cmp(query.getRelativeDistance(reference), 0)]) for tag in reference.getTagNames(): if tag not in ("quality", "feature"): query.setTagValue("%s%s_%s" % (self.tagName, TAGS_REGION[direction*query.getDirection()], tag), reference.getTagValue(tag)) return query def write(self): progress = Progress(len(self.flankings.keys()), "Printing data", self.verbosity) for transcriptQuery in self.flankings.keys(): if not self.flankings[transcriptQuery]: self.writer.addTranscript(transcriptQuery) elif self.directions: for direction in self.directions: #relativeDirection = direction if transcriptQuery.getDirection() == 1 else - direction relativeDirection = direction * transcriptQuery.getDirection() if relativeDirection in self.flankings[transcriptQuery]: transcriptRef = self.flankings[transcriptQuery][relativeDirection] transcriptQuery = self.setTags(transcriptQuery, transcriptRef, relativeDirection) self.writer.addTranscript(transcriptQuery) else: transcriptRef = sorted(self.flankings[transcriptQuery].values(), key = lambda transcriptRef: transcriptQuery.getDistance(transcriptRef))[0] self.writer.addTranscript(self.setTags(transcriptQuery, transcriptRef, 0)) progress.inc() progress.done() def run(self): for chromosome in sorted(self.transcripts[QUERY].keys()): self.flankings = dict([query, {}] for query in self.transcripts[QUERY][chromosome]) for direction in STRANDS: #print "comparison", chromosome, direction self.getFlanking(chromosome, direction) self.write() self.writer.close() if __name__ == "__main__": description = "Get Flanking v1.0.1: Get the flanking regions of a set of reference. [Category: Data Selection]" parser = OptionParser(description = description) parser.add_option("-i", "--input1", dest="inputFileName1", action="store", type="string", help="query input file [compulsory] [format: file in transcript format given by -f]") parser.add_option("-f", "--format1", dest="format1", action="store", type="string", help="format of previous file [compulsory] [format: transcript file format]") parser.add_option("-j", "--input2", dest="inputFileName2", action="store", type="string", help="reference input file [compulsory] [format: file in transcript format given by -g]") parser.add_option("-g", "--format2", dest="format2", action="store", type="string", help="format of previous file [compulsory] [format: transcript file format]") parser.add_option("-5", "--upstream", dest="upstream", action="store_true", default=False, help="output upstream elements [format: boolean] [default: False]") parser.add_option("-3", "--downstream", dest="downstream", action="store_true", default=False, help="output downstream elements [format: boolean] [default: False]") parser.add_option("-c", "--colinear", dest="colinear", action="store_true", default=False, help="find first colinear element [format: boolean] [default: False]") parser.add_option("-a", "--antisense", dest="antisense", action="store_true", default=False, help="find first anti-sense element [format: boolean] [default: False]") parser.add_option("-e", "--noOverlap", dest="noOverlap", action="store_true", default=False, help="do not consider elements which are overlapping reference elements [format: boolean] [default: False]") parser.add_option("-d", "--minDistance", dest="minDistance", action="store", default=None, type="int", help="minimum distance between 2 elements [format: int]") parser.add_option("-D", "--maxDistance", dest="maxDistance", action="store", default=None, type="int", help="maximum distance between 2 elements [format: int]") parser.add_option("-t", "--tag", dest="tagName", action="store", default="flanking", type="string", help="name of the new tag [format: string] [default: flanking]") parser.add_option("-o", "--output", dest="outputFileName", action="store", type="string", help="output file [format: output file in GFF3 format]") parser.add_option("-v", "--verbosity", dest="verbosity", action="store", default=1, type="int", help="trace level [format: int]") (options, args) = parser.parse_args() gf = GetFlanking(options.verbosity) gf.setInputFile(options.inputFileName1, options.format1, QUERY) gf.setInputFile(options.inputFileName2, options.format2, REFERENCE) gf.setOutputFile(options.outputFileName) gf.addUpstreamDirection(options.upstream) gf.addDownstreamDirection(options.downstream) gf.setColinear(options.colinear) gf.setAntisense(options.antisense) gf.setNoOverlap(options.noOverlap) gf.setMinDistance(options.minDistance) gf.setMaxDistance(options.maxDistance) gf.setNewTagName(options.tagName) gf.run()