diff bin/maf-cull @ 0:06f8460885ff

migrate from GitHub
author yutaka-saito
date Sun, 19 Apr 2015 20:51:13 +0900
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/maf-cull	Sun Apr 19 20:51:13 2015 +0900
@@ -0,0 +1,112 @@
+#! /usr/bin/env python
+
+# Read MAF-format alignments.  Write them, omitting alignments whose
+# coordinates in the top-most sequence are contained in those of >=
+# cullingLimit higher-scoring alignments.
+
+# Alignments on opposite strands are not considered to contain each
+# other.
+
+# The alignments must be sorted by sequence name, then strand, then
+# start coordinate.
+
+# This algorithm is not theoretically optimal, but it is simple and
+# probably fast in practice.  Optimal algorithms are described in:
+# Winnowing sequences from a database search.
+# Berman P, Zhang Z, Wolf YI, Koonin EV, Miller W.
+# J Comput Biol. 2000 Feb-Apr;7(1-2):293-302.
+# (Use a "priority search tree" or an "interval tree".)
+
+# Seems to work with Python 2.x, x>=4
+
+import fileinput, itertools, operator, optparse, os, signal, sys
+
+# The intervals must have size > 0.
+
+def isFresh(oldInterval, newInterval):
+    return oldInterval.end > newInterval.start
+
+def freshIntervals(storedIntervals, newInterval):
+    """Yields those storedIntervals that overlap newInterval."""
+    return [i for i in storedIntervals if isFresh(i, newInterval)]
+
+def isDominated(dog, queen):
+    return dog.score < queen.score and dog.end <= queen.end
+
+def isWanted(newInterval, storedIntervals, cullingLimit):
+    """Is newInterval dominated by < cullingLimit storedIntervals?"""
+    dominators = (i for i in storedIntervals if isDominated(newInterval, i))
+    return len(list(dominators)) < cullingLimit
+
+# Check that the intervals are sorted by start position, and further
+# sort them in descending order of score.
+def sortedIntervals(intervals):
+    oldStart = ()
+    for k, v in itertools.groupby(intervals, operator.attrgetter("start")):
+        if k < oldStart: raise Exception("the input is not sorted properly")
+        oldStart = k
+        for i in sorted(v, key=operator.attrgetter("score"), reverse=True):
+            yield i
+
+def culledIntervals(intervals, cullingLimit):
+    """Yield intervals contained in < cullingLimit higher-scoring intervals."""
+    storedIntervals = []
+    for i in sortedIntervals(intervals):
+        storedIntervals = freshIntervals(storedIntervals, i)
+        if isWanted(i, storedIntervals, cullingLimit):
+            yield i
+            storedIntervals.append(i)
+
+class Maf:
+    def __init__(self, lines):
+        self.lines = lines
+        try:
+            aLine = lines[0]
+            aWords = aLine.split()
+            scoreGen = (i for i in aWords if i.startswith("score="))
+            scoreWord = scoreGen.next()
+            self.score = float(scoreWord.split("=")[1])
+        except: raise Exception("missing score")
+        try:
+            sLine = lines[1]
+            sWords = sLine.split()
+            seqName = sWords[1]
+            alnStart = int(sWords[2])
+            alnSize = int(sWords[3])
+            strand = sWords[4]
+            self.start = seqName, strand, alnStart
+            self.end = seqName, strand, alnStart + alnSize
+        except: raise Exception("can't interpret the MAF data")
+
+def mafInput(lines):
+    for k, v in itertools.groupby(lines, str.isspace):
+        if not k: yield Maf(list(v))
+
+def filterComments(lines):
+    for i in lines:
+        if i.startswith("#"): print i,
+        else: yield i
+
+def mafCull(opts, args):
+    inputLines = fileinput.input(args)
+    inputMafs = mafInput(filterComments(inputLines))
+    for maf in culledIntervals(inputMafs, opts.limit):
+        for i in maf.lines: print i,
+        print  # blank line after each alignment
+
+if __name__ == "__main__":
+    signal.signal(signal.SIGPIPE, signal.SIG_DFL)  # avoid silly error message
+
+    usage = "%prog [options] my-alignments.maf"
+    description = "Cull alignments whose top-sequence coordinates are contained in LIMIT or more higher-scoring alignments."
+    op = optparse.OptionParser(usage=usage, description=description)
+    op.add_option("-l", "--limit", type="int", default=2,
+                  help="culling limit (default: %default)")
+    (opts, args) = op.parse_args()
+    if opts.limit < 1: op.error("option -l: should be >= 1")
+
+    try: mafCull(opts, args)
+    except KeyboardInterrupt: pass  # avoid silly error message
+    except Exception, e:
+        prog = os.path.basename(sys.argv[0])
+        sys.exit(prog + ": error: " + str(e))