Mercurial > repos > yufei-luo > s_mart
view SMART/Java/Python/GetFlanking.py @ 67:f4de72c80eac draft
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 16 Nov 2015 11:59:35 -0500 |
parents | 169d364ddd91 |
children |
line wrap: on
line source
#! /usr/bin/env python # # Copyright INRA-URGI 2009-2011 # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. # from optparse import OptionParser from commons.core.parsing.ParserChooser import ParserChooser from commons.core.writer.TranscriptWriter import TranscriptWriter from SMART.Java.Python.structure.Transcript import Transcript from SMART.Java.Python.structure.Interval import Interval from SMART.Java.Python.misc.Progress import Progress QUERY = 0 REFERENCE = 1 INPUTS = (QUERY, REFERENCE) STRANDS = (-1, 1) TAG_DISTANCE = "distance_" TAG_SENSE = "_sense" TAG_REGION = "_region" TAGS_REGION = {-1: "_upstream", 0: "", 1: "_downstream"} TAGS_RREGION = {-1: "upstream", 0: "overlapping", 1: "downstream"} TAGS_SENSE = {-1: "antisense", 0: "", 1: "colinear"} STRANDSTOSTR = {-1: "(-)", 0: "", 1: "(+)"} def getOrderKey(transcript, direction): if direction == 1: return transcript.getEnd() return - transcript.getStart() def isInGoodRegion(transcriptRef, transcriptQuery, direction): if direction == 1: return transcriptQuery.getEnd() > transcriptRef.getEnd() return transcriptQuery.getStart() < transcriptRef.getStart() class GetFlanking(object): def __init__(self, verbosity): self.verbosity = verbosity self.transcripts = dict([id, {}] for id in INPUTS) self.directions = [] self.noOverlap = False self.colinear = False self.antisense = False self.distance = None self.minDistance = None self.maxDistance = None self.tagName = "flanking" def setInputFile(self, fileName, format, id): chooser = ParserChooser(self.verbosity) chooser.findFormat(format) parser = chooser.getParser(fileName) for transcript in parser.getIterator(): chromosome = transcript.getChromosome() if chromosome not in self.transcripts[id]: self.transcripts[id][chromosome] = [] self.transcripts[id][chromosome].append(transcript) def setOutputFile(self, fileName): self.writer = TranscriptWriter(fileName, "gff3", self.verbosity) def addUpstreamDirection(self, upstream): if upstream: self.directions.append(-1) def addDownstreamDirection(self, downstream): if downstream: self.directions.append(1) def setColinear(self, colinear): self.colinear = colinear def setAntisense(self, antisense): self.antisense = antisense def setNoOverlap(self, noOverlap): self.noOverlap = noOverlap def setMinDistance(self, distance): self.minDistance = distance def setMaxDistance(self, distance): self.maxDistance = distance def setNewTagName(self, tagName): self.tagName = tagName def match(self, transcriptRef, transcriptQuery, direction): if self.noOverlap and transcriptRef.overlapWith(transcriptQuery): return False if self.colinear and transcriptRef.getDirection() != transcriptQuery.getDirection(): return False if self.antisense and transcriptRef.getDirection() == transcriptQuery.getDirection(): return False if self.minDistance != None or self.maxDistance != None: distance = transcriptRef.getDistance(transcriptQuery) if self.minDistance != None and distance < self.minDistance: return False if self.maxDistance != None and distance > self.maxDistance: return False return True def getFlanking(self, direction): for chromosome in sorted(self.transcripts[REFERENCE].keys()): if chromosome not in self.transcripts[QUERY]: continue sortedTranscripts = dict([id, {}] for id in INPUTS) for id in INPUTS: sortedTranscripts[id] = sorted(self.transcripts[id][chromosome], key = lambda t: getOrderKey(t, direction)) refIndex = 0 currentRefs = [] outputs = set() progress = Progress(len(sortedTranscripts[QUERY]), "Reading chr %s %s" % (chromosome, STRANDSTOSTR[direction]), self.verbosity) for query in sortedTranscripts[QUERY]: while refIndex < len(sortedTranscripts[REFERENCE]) and isInGoodRegion(sortedTranscripts[REFERENCE][refIndex], query, direction): currentRefs.append(sortedTranscripts[REFERENCE][refIndex]) refIndex += 1 nextCurrentRefs = [] for currentRef in currentRefs: if self.match(currentRef, query, direction): if currentRef not in self.flankings: self.flankings[currentRef] = {} self.flankings[currentRef][direction * currentRef.getDirection()] = query else: nextCurrentRefs.append(currentRef) currentRefs = nextCurrentRefs progress.inc() progress.done() def setTags(self, query, reference, direction): refName = reference.getTagValue("ID") if refName == None: refName = reference.getName() if refName == None: refName = reference.__str__() query.setTagValue("%s%s" % (self.tagName, TAGS_REGION[direction]), refName) query.setTagValue("%s_%s%s" % (TAG_DISTANCE, self.tagName, TAGS_REGION[direction]), query.getDistance(reference)) if direction == 0: query.setTagValue("%s_%s" % (TAG_SENSE, self.tagName), TAGS_SENSE[query.getDirection() * reference.getDirection()]) query.setTagValue("%s_%s" % (TAG_REGION, self.tagName), TAGS_RREGION[cmp(query.getRelativeDistance(reference), 0)]) for tag in reference.getTagNames(): if tag not in ("quality", "feature"): query.setTagValue("%s%s_%s" % (self.tagName, TAGS_REGION[direction], tag), reference.getTagValue(tag)) return query def write(self): outputs = set() progress = Progress(len(self.flankings.keys()), "Printing data", self.verbosity) for transcriptRef in self.flankings.keys(): if self.directions: for direction in self.directions: if direction in self.flankings[transcriptRef]: query = self.flankings[transcriptRef][direction] outputs.add(self.setTags(query, transcriptRef, direction)) else: if self.flankings[transcriptRef]: query = sorted(self.flankings[transcriptRef].values(), key = lambda query: query.getDistance(transcriptRef))[0] outputs.add(self.setTags(query, transcriptRef, 0)) progress.inc() for transcript in sorted(list(outputs), key = lambda flanking: (flanking.getChromosome(), flanking.getStart(), flanking.getEnd())): self.writer.addTranscript(transcript) self.writer.close() progress.done() def run(self): self.flankings = {} for direction in STRANDS: self.getFlanking(direction) self.write() if __name__ == "__main__": description = "Get Flanking v1.0.1: Get the flanking regions of a set of reference. [Category: Data Selection]" parser = OptionParser(description = description) parser.add_option("-i", "--input1", dest="inputFileName1", action="store", type="string", help="query input file [compulsory] [format: file in transcript format given by -f]") parser.add_option("-f", "--format1", dest="format1", action="store", type="string", help="format of previous file [compulsory] [format: transcript file format]") parser.add_option("-j", "--input2", dest="inputFileName2", action="store", type="string", help="reference input file [compulsory] [format: file in transcript format given by -g]") parser.add_option("-g", "--format2", dest="format2", action="store", type="string", help="format of previous file [compulsory] [format: transcript file format]") parser.add_option("-5", "--upstream", dest="upstream", action="store_true", default=False, help="output upstream elements [format: boolean] [default: False]") parser.add_option("-3", "--downstream", dest="downstream", action="store_true", default=False, help="output downstream elements [format: boolean] [default: False]") parser.add_option("-c", "--colinear", dest="colinear", action="store_true", default=False, help="find first colinear element [format: boolean] [default: False]") parser.add_option("-a", "--antisense", dest="antisense", action="store_true", default=False, help="find first anti-sense element [format: boolean] [default: False]") parser.add_option("-e", "--noOverlap", dest="noOverlap", action="store_true", default=False, help="do not consider elements which are overlapping reference elements [format: boolean] [default: False]") parser.add_option("-d", "--minDistance", dest="minDistance", action="store", default=None, type="int", help="minimum distance between 2 elements [format: int]") parser.add_option("-D", "--maxDistance", dest="maxDistance", action="store", default=None, type="int", help="maximum distance between 2 elements [format: int]") parser.add_option("-t", "--tag", dest="tagName", action="store", default="flanking", type="string", help="name of the new tag [format: string] [default: flanking]") parser.add_option("-o", "--output", dest="outputFileName", action="store", type="string", help="output file [format: output file in GFF3 format]") parser.add_option("-v", "--verbosity", dest="verbosity", action="store", default=1, type="int", help="trace level [format: int]") (options, args) = parser.parse_args() gf = GetFlanking(options.verbosity) gf.setInputFile(options.inputFileName1, options.format1, QUERY) gf.setInputFile(options.inputFileName2, options.format2, REFERENCE) gf.setOutputFile(options.outputFileName) gf.addUpstreamDirection(options.upstream) gf.addDownstreamDirection(options.downstream) gf.setColinear(options.colinear) gf.setAntisense(options.antisense) gf.setNoOverlap(options.noOverlap) gf.setMinDistance(options.minDistance) gf.setMaxDistance(options.maxDistance) gf.setNewTagName(options.tagName) gf.run()