diff mytools/fasta-dinucleotide-shuffle.py @ 9:87eb5c5ddfe9

Uploaded
author xuebing
date Fri, 09 Mar 2012 20:01:43 -0500
parents f0dc65e7f6c0
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mytools/fasta-dinucleotide-shuffle.py	Fri Mar 09 20:01:43 2012 -0500
@@ -0,0 +1,223 @@
+#!/usr/bin/python
+
+import sys, string, random
+import sequence
+
+# 
+# turn on psyco to speed up by 3X
+#
+if __name__=='__main__': 
+  try:
+    import psyco
+    #psyco.log()
+    psyco.full()
+    psyco_found = True
+  except ImportError:
+#    psyco_found = False
+    pass
+#  print >> sys.stderr, "psyco_found", psyco_found
+
+
+# altschulEriksonDinuclShuffle.py
+# P. Clote, Oct 2003
+
+def computeCountAndLists(s):
+
+  #Initialize lists and mono- and dinucleotide dictionaries
+  List = {} #List is a dictionary of lists
+  List['A'] = []; List['C'] = [];
+  List['G'] = []; List['T'] = [];
+  # FIXME: is this ok?
+  List['N'] = []
+  nuclList   = ["A","C","G","T","N"]
+  s       = s.upper()
+  #s       = s.replace("U","T")
+  nuclCnt    = {}  #empty dictionary
+  dinuclCnt  = {}  #empty dictionary
+  for x in nuclList:
+    nuclCnt[x]=0
+    dinuclCnt[x]={}
+    for y in nuclList:
+      dinuclCnt[x][y]=0
+
+  #Compute count and lists
+  nuclCnt[s[0]] = 1
+  nuclTotal     = 1
+  dinuclTotal   = 0
+  for i in range(len(s)-1):
+    x = s[i]; y = s[i+1]
+    List[x].append( y )
+    nuclCnt[y] += 1; nuclTotal  += 1
+    dinuclCnt[x][y] += 1; dinuclTotal += 1
+  assert (nuclTotal==len(s))
+  assert (dinuclTotal==len(s)-1)
+  return nuclCnt,dinuclCnt,List
+ 
+ 
+def chooseEdge(x,dinuclCnt):
+  z = random.random()
+  denom=dinuclCnt[x]['A']+dinuclCnt[x]['C']+dinuclCnt[x]['G']+dinuclCnt[x]['T']+dinuclCnt[x]['N']
+  numerator = dinuclCnt[x]['A']
+  if z < float(numerator)/float(denom):
+    dinuclCnt[x]['A'] -= 1
+    return 'A'
+  numerator += dinuclCnt[x]['C']
+  if z < float(numerator)/float(denom):
+    dinuclCnt[x]['C'] -= 1
+    return 'C'
+  numerator += dinuclCnt[x]['G']
+  if z < float(numerator)/float(denom):
+    dinuclCnt[x]['G'] -= 1
+    return 'G'
+  numerator += dinuclCnt[x]['T']
+  if z < float(numerator)/float(denom):
+    dinuclCnt[x]['T'] -= 1
+    return 'T'
+  dinuclCnt[x]['N'] -= 1
+  return 'N'
+
+def connectedToLast(edgeList,nuclList,lastCh):
+  D = {}
+  for x in nuclList: D[x]=0
+  for edge in edgeList:
+    a = edge[0]; b = edge[1]
+    if b==lastCh: D[a]=1
+  for i in range(3):
+    for edge in edgeList:
+      a = edge[0]; b = edge[1]
+      if D[b]==1: D[a]=1
+  ok = 0
+  for x in nuclList:
+    if x!=lastCh and D[x]==0: return 0
+  return 1
+
+def eulerian(s):
+  nuclCnt,dinuclCnt,List = computeCountAndLists(s)
+  #compute nucleotides appearing in s
+  nuclList = []
+  for x in ["A","C","G","T","N"]:
+    if x in s: nuclList.append(x)
+  #create dinucleotide shuffle L 
+  firstCh = s[0]  #start with first letter of s
+  lastCh  = s[-1]
+  edgeList = []
+  for x in nuclList:
+    if x!= lastCh: edgeList.append( [x,chooseEdge(x,dinuclCnt)] )
+  ok = connectedToLast(edgeList,nuclList,lastCh)
+  return ok,edgeList,nuclList,lastCh
+
+
+def shuffleEdgeList(L):
+  n = len(L); barrier = n
+  for i in range(n-1):
+    z = int(random.random() * barrier)
+    tmp = L[z]
+    L[z]= L[barrier-1]
+    L[barrier-1] = tmp
+    barrier -= 1
+  return L
+
+def dinuclShuffle(s):
+  ok = 0
+  while not ok:
+    ok,edgeList,nuclList,lastCh = eulerian(s)
+  nuclCnt,dinuclCnt,List = computeCountAndLists(s)
+
+  #remove last edges from each vertex list, shuffle, then add back
+  #the removed edges at end of vertex lists.
+  for [x,y] in edgeList: List[x].remove(y)
+  for x in nuclList: shuffleEdgeList(List[x])
+  for [x,y] in edgeList: List[x].append(y)
+
+  #construct the eulerian path
+  L = [s[0]]; prevCh = s[0]
+  for i in range(len(s)-2):
+    ch = List[prevCh][0] 
+    L.append( ch )
+    del List[prevCh][0]
+    prevCh = ch
+  L.append(s[-1])
+  t = string.join(L,"")
+  return t
+ 
+def main():
+
+	#
+	# defaults
+	#
+	file_name = None
+	seed = 1
+	copies = 1
+
+	#
+	# get command line arguments
+	#
+	usage = """USAGE: 
+	%s [options]
+
+        -f <filename>   file name (required)
+        -t <tag>        added to shuffled sequence names
+        -s <seed>	random seed; default: %d
+	-c <n>		make <n> shuffled copies of each sequence; default: %d
+        -h              print this usage message
+	""" % (sys.argv[0], seed, copies)
+
+        # no arguments: print usage
+	if len(sys.argv) == 1:
+		print >> sys.stderr, usage; sys.exit(1)
+
+        tag = "";
+
+        # parse command line
+        i = 1
+        while i < len(sys.argv):
+                arg = sys.argv[i]
+                if (arg == "-f"):
+                        i += 1
+                        try: file_name = sys.argv[i]
+                        except: print >> sys.stderr, usage; sys.exit(1)
+                elif (arg == "-t"):
+                        i += 1
+                        try: tag = sys.argv[i]
+                        except: print >> sys.stderr, usage; sys.exit(1)
+                elif (arg == "-s"):
+                        i += 1
+                        try: seed = string.atoi(sys.argv[i])
+                        except: print >> sys.stderr, usage; sys.exit(1)
+                elif (arg == "-c"):
+                        i += 1
+                        try: copies = string.atoi(sys.argv[i])
+                        except: print >> sys.stderr, usage; sys.exit(1)
+                elif (arg == "-h"):
+                        print >> sys.stderr, usage; sys.exit(1)
+                else:
+                        print >> sys.stderr, "Unknown command line argument: " + arg
+                        sys.exit(1)
+                i += 1
+
+        # check that required arguments given
+        if (file_name == None):
+        	print >> sys.stderr, usage; sys.exit(1)
+
+	random.seed(seed)
+
+	# read sequences
+	seqs = sequence.readFASTA(file_name,'Extended DNA')
+
+	for s in seqs:
+		str = s.getString()
+		#FIXME altschul can't handle ambigs
+		name = s.getName()
+
+		#print >> sys.stderr, ">%s" % name
+
+		for i in range(copies):
+
+			shuffledSeq = dinuclShuffle(str)
+
+			if (copies == 1):
+				print >> sys.stdout, ">%s\n%s" % (name+tag, shuffledSeq)
+			else:
+				print >> sys.stdout, ">%s_%d\n%s" % (name+tag, i, shuffledSeq)
+	
+if __name__ == '__main__': main()