view fasta-dinucleotide-shuffle.py @ 12:2f4ea569f048

Uploaded
author xuebing
date Sat, 10 Mar 2012 08:10:44 -0500
parents b7f1d9f8f3bc
children
line wrap: on
line source

#!/usr/bin/python

import sys, string, random
import sequence

# 
# turn on psyco to speed up by 3X
#
if __name__=='__main__': 
  try:
    import psyco
    #psyco.log()
    psyco.full()
    psyco_found = True
  except ImportError:
#    psyco_found = False
    pass
#  print >> sys.stderr, "psyco_found", psyco_found


# altschulEriksonDinuclShuffle.py
# P. Clote, Oct 2003

def computeCountAndLists(s):

  #Initialize lists and mono- and dinucleotide dictionaries
  List = {} #List is a dictionary of lists
  List['A'] = []; List['C'] = [];
  List['G'] = []; List['T'] = [];
  # FIXME: is this ok?
  List['N'] = []
  nuclList   = ["A","C","G","T","N"]
  s       = s.upper()
  #s       = s.replace("U","T")
  nuclCnt    = {}  #empty dictionary
  dinuclCnt  = {}  #empty dictionary
  for x in nuclList:
    nuclCnt[x]=0
    dinuclCnt[x]={}
    for y in nuclList:
      dinuclCnt[x][y]=0

  #Compute count and lists
  nuclCnt[s[0]] = 1
  nuclTotal     = 1
  dinuclTotal   = 0
  for i in range(len(s)-1):
    x = s[i]; y = s[i+1]
    List[x].append( y )
    nuclCnt[y] += 1; nuclTotal  += 1
    dinuclCnt[x][y] += 1; dinuclTotal += 1
  assert (nuclTotal==len(s))
  assert (dinuclTotal==len(s)-1)
  return nuclCnt,dinuclCnt,List
 
 
def chooseEdge(x,dinuclCnt):
  z = random.random()
  denom=dinuclCnt[x]['A']+dinuclCnt[x]['C']+dinuclCnt[x]['G']+dinuclCnt[x]['T']+dinuclCnt[x]['N']
  numerator = dinuclCnt[x]['A']
  if z < float(numerator)/float(denom):
    dinuclCnt[x]['A'] -= 1
    return 'A'
  numerator += dinuclCnt[x]['C']
  if z < float(numerator)/float(denom):
    dinuclCnt[x]['C'] -= 1
    return 'C'
  numerator += dinuclCnt[x]['G']
  if z < float(numerator)/float(denom):
    dinuclCnt[x]['G'] -= 1
    return 'G'
  numerator += dinuclCnt[x]['T']
  if z < float(numerator)/float(denom):
    dinuclCnt[x]['T'] -= 1
    return 'T'
  dinuclCnt[x]['N'] -= 1
  return 'N'

def connectedToLast(edgeList,nuclList,lastCh):
  D = {}
  for x in nuclList: D[x]=0
  for edge in edgeList:
    a = edge[0]; b = edge[1]
    if b==lastCh: D[a]=1
  for i in range(3):
    for edge in edgeList:
      a = edge[0]; b = edge[1]
      if D[b]==1: D[a]=1
  ok = 0
  for x in nuclList:
    if x!=lastCh and D[x]==0: return 0
  return 1

def eulerian(s):
  nuclCnt,dinuclCnt,List = computeCountAndLists(s)
  #compute nucleotides appearing in s
  nuclList = []
  for x in ["A","C","G","T","N"]:
    if x in s: nuclList.append(x)
  #create dinucleotide shuffle L 
  firstCh = s[0]  #start with first letter of s
  lastCh  = s[-1]
  edgeList = []
  for x in nuclList:
    if x!= lastCh: edgeList.append( [x,chooseEdge(x,dinuclCnt)] )
  ok = connectedToLast(edgeList,nuclList,lastCh)
  return ok,edgeList,nuclList,lastCh


def shuffleEdgeList(L):
  n = len(L); barrier = n
  for i in range(n-1):
    z = int(random.random() * barrier)
    tmp = L[z]
    L[z]= L[barrier-1]
    L[barrier-1] = tmp
    barrier -= 1
  return L

def dinuclShuffle(s):
  ok = 0
  while not ok:
    ok,edgeList,nuclList,lastCh = eulerian(s)
  nuclCnt,dinuclCnt,List = computeCountAndLists(s)

  #remove last edges from each vertex list, shuffle, then add back
  #the removed edges at end of vertex lists.
  for [x,y] in edgeList: List[x].remove(y)
  for x in nuclList: shuffleEdgeList(List[x])
  for [x,y] in edgeList: List[x].append(y)

  #construct the eulerian path
  L = [s[0]]; prevCh = s[0]
  for i in range(len(s)-2):
    ch = List[prevCh][0] 
    L.append( ch )
    del List[prevCh][0]
    prevCh = ch
  L.append(s[-1])
  t = string.join(L,"")
  return t
 
def main():

	#
	# defaults
	#
	file_name = None
	seed = 1
	copies = 1

	#
	# get command line arguments
	#
	usage = """USAGE: 
	%s [options]

        -f <filename>   file name (required)
        -t <tag>        added to shuffled sequence names
        -s <seed>	random seed; default: %d
	-c <n>		make <n> shuffled copies of each sequence; default: %d
        -h              print this usage message
	""" % (sys.argv[0], seed, copies)

        # no arguments: print usage
	if len(sys.argv) == 1:
		print >> sys.stderr, usage; sys.exit(1)

        tag = "";

        # parse command line
        i = 1
        while i < len(sys.argv):
                arg = sys.argv[i]
                if (arg == "-f"):
                        i += 1
                        try: file_name = sys.argv[i]
                        except: print >> sys.stderr, usage; sys.exit(1)
                elif (arg == "-t"):
                        i += 1
                        try: tag = sys.argv[i]
                        except: print >> sys.stderr, usage; sys.exit(1)
                elif (arg == "-s"):
                        i += 1
                        try: seed = string.atoi(sys.argv[i])
                        except: print >> sys.stderr, usage; sys.exit(1)
                elif (arg == "-c"):
                        i += 1
                        try: copies = string.atoi(sys.argv[i])
                        except: print >> sys.stderr, usage; sys.exit(1)
                elif (arg == "-h"):
                        print >> sys.stderr, usage; sys.exit(1)
                else:
                        print >> sys.stderr, "Unknown command line argument: " + arg
                        sys.exit(1)
                i += 1

        # check that required arguments given
        if (file_name == None):
        	print >> sys.stderr, usage; sys.exit(1)

	random.seed(seed)

	# read sequences
	seqs = sequence.readFASTA(file_name,'Extended DNA')

	for s in seqs:
		str = s.getString()
		#FIXME altschul can't handle ambigs
		name = s.getName()

		#print >> sys.stderr, ">%s" % name

		for i in range(copies):

			shuffledSeq = dinuclShuffle(str)

			if (copies == 1):
				print >> sys.stdout, ">%s\n%s" % (name+tag, shuffledSeq)
			else:
				print >> sys.stdout, ">%s_%d\n%s" % (name+tag, i, shuffledSeq)
	
if __name__ == '__main__': main()