Mercurial > repos > rreumerman > snptools
diff trams.py @ 4:bd5692103d5b draft
Uploaded
author | rreumerman |
---|---|
date | Fri, 05 Apr 2013 05:00:40 -0400 |
parents | |
children | 8de0ffc2166f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/trams.py Fri Apr 05 05:00:40 2013 -0400 @@ -0,0 +1,497 @@ +################# +transl_table = 11 +intro_message = ''' +------------------------------------------------------------------+ + | Tool for Rapid Annotation of Microbial SNPs (TRAMS): a simple | + | program for rapid annotation of genomic variation in prokaryotes | + | | + | Developed by: Richard A. Reumerman, Paul R. Herron, | + | Paul A. Hoskisson and Vartul Sangal | + +------------------------------------------------------------------+\n''' +################# + +import sys +import time +start = time.clock() + +# Command line files: SNP REF REF-TYPE ANNOT OVERL SUM; +if len(sys.argv) < 7: + exit("\nNot enough arguments given.\nUsage: TRAMS_Galaxy.py [SNP.] [REF.] [ANNOT.] [OVERL.] [SUM.]") +try: + file_snps = open(sys.argv[1], "rU") +except IOError as e: + exit("Error trying to open '"+sys.argv[1]+"': {1}".format(e.errno, e.strerror)) +try: + file_ref = open(sys.argv[2], "rU") +except IOError as e: + exit("Error trying to open '"+sys.argv[2]+"': {1}".format(e.errno, e.strerror)) + +filetype_reference = sys.argv[3] + +try: + file_out = open(sys.argv[4], "w") +except IOError as e: + exit("Error trying to open '"+sys.argv[4]+"': {1}".format(e.errno, e.strerror)) +try: + file_overlap = open(sys.argv[5], "w") +except IOError as e: + exit("Error trying to open '"+sys.argv[5]+"': {1}".format(e.errno, e.strerror)) +try: + file_summary = open(sys.argv[6], "w") +except IOError as e: + exit("Error trying to open '"+sys.argv[6]+"': {1}".format(e.errno, e.strerror)) + +import Bio +from Bio import SeqIO, SeqFeature +from Bio.SeqRecord import SeqRecord +from Bio.Seq import Seq +from Bio.Alphabet import generic_dna, IUPAC +from Bio.Data import CodonTable + +modules_loaded = time.clock() + +def non_coding_calc(gene, pos = 0): + '''This function takes a pseudogene and returns the number of bases + located in between the sub-features before 'pos'. Returns 0 if 'pseudo' = False. + Input: {start, subfeats, pseudo}, pos (default = 0)''' + if not gene['pseudo']: return 0 + + non_coding_bases = 0 + prev_subfeat_end = gene['start'] + if gene['strand'] == -1: + for subfeature in gene['subfeats']: + if subfeature.location._start.position < pos: + prev_subfeat_end = subfeature.location._end.position + continue + non_coding_bases += (subfeature.location._start.position - prev_subfeat_end) + prev_subfeat_end = subfeature.location._end.position + else: + for subfeature in gene['subfeats']: + non_coding_bases += (subfeature.location._start.position - prev_subfeat_end) + prev_subfeat_end = subfeature.location._end.position + if prev_subfeat_end >= pos and pos != 0: break + + return non_coding_bases + + +def region_calc(bounds,length): + regions = [] + lastend=i=0 + while i < len(bounds): + if bounds[i]['start'] > lastend:# Intergenic region present; + regions.append([lastend,bounds[i]['start'],-1]) + lastend = bounds[i]['start'] + else: + regions.append([bounds[i]['start'],bounds[i]['end'],i]) + if bounds[i]['end'] > lastend: + lastend = bounds[i]['end'] + i += 1 + + if regions[-1][1] < length:# Final tail of genome; + regions.append([lastend,length,-1]) + + return regions + + +def overlap_calc(bounds): + '''This function takes an array of feature starts and ends and + returns an array of starts and ends of all overlapping regions. + Input: [{start,end}]''' + i = 0 + overlaps = [] + while i < len(bounds) - 1: + for downstr in bounds[i+1:]: + if downstr[0] < bounds[i][1]:# Features overlap; + if downstr[1] < bounds[i][1]:# Complete overlap; + overlaps.append([downstr[0],downstr[1],bounds[i][2],downstr[2],[0,0]]) + else:# Partial overlap; + overlaps.append([downstr[0],bounds[i][1],bounds[i][2],downstr[2],[0,0]]) + else:# No use looking further; + break + + i += 1 + + return overlaps + + +def match_feature(bounds,pos,prev=0): + '''This function checks if a position is located inside a feature and + returns the feature's number if found or -1 if none is found. + Input: {start,end},pos,prev_feat (default = 0)''' + for i in range(prev, len(bounds)): + if (pos >= bounds[i]['start']) and (pos < bounds[i]['end']): + return i + elif pos < bounds[i]['start']:# No use looking further + return -1 + + return -1 + + +def write_output(line,target=file_out): + '''This function takes the 2 dimensional array containing all the SNP + data. It contains an array of information on the feature and an array + for each strain for which SNPs are given. + Input: [[pos],[ref],[cells],[cells],etc]''' + target.write('\n'+str(line[0][0])) + for cell in line[1]: + target.write('\t'+str(cell)) + for strain in line[2:]: + target.write('\t') + for cell in strain: + target.write('\t'+str(cell)) + + target.flush() + + +def new_codon_calc(ref_codon, new_base, pos_in_cod): + return str(ref_codon[0:pos_in_cod-1]+new_base+ref_codon[pos_in_cod:len(ref_codon)]) + + +def mut_type_check(ref_res, ref_codon, pos_in_gene, new_base, new_codon): + if str(new_codon).lower() == str(ref_codon).lower(): + return ['','','',''] + new_residue = Seq(new_codon).translate(table=transl_table) + if str(new_residue) == str(ref_res): + mut_type = 'synonymous' + elif (pos_in_gene / 3) < 1 and str(ref_codon).upper() in CodonTable.unambiguous_dna_by_id[transl_table].start_codons:# position 0,1 or 2 and SNP is in start codon; + if str(new_codon).upper() in CodonTable.unambiguous_dna_by_id[transl_table].start_codons: mut_type = 'nonsynonymous' + else: mut_type = 'nonstart' + elif str(new_residue) == '*': mut_type = 'nonsense' + elif str(ref_res) == '*': mut_type = 'nonstop' + else: mut_type = 'nonsynonymous' + + return [mut_type,new_base,new_codon,new_residue] + + +def codon_process(codon): + '''This function processes a codon. It loops through it 3 times, + once to determine which is the highest position mutated, once to + fill in the cells for the output file and once to output all lines. + Input: [empty,start_pos,[line1],[line2],[line3],strand] + It also uses global variable strain_nr''' + lastposition = [-1]*(strain_nr-1) + new_codons = ['']*(strain_nr-1) + if codon[-1] == -1:# Change codon position order for -1 features; + temp = codon [1:-1] + temp.reverse() + codon[1:-1] = temp + for i,line in enumerate(codon[1:-1],1): + if line == '': continue + for j,strain in enumerate(line[2:]): + if strain[1] in ['a','g','c','t']: + lastposition[j] = i + new_codons[j] = codon[i][1][8] + + for i,line in enumerate(codon[1:-1],1): + if codon[-1] == -1: pos_in_cod = 4-i + else: pos_in_cod = i + + if line == '': continue + for j,strain in enumerate(line[2:]): + if i == lastposition[j]: # Check for synonymous etc.; + new_codons[j] = new_codon_calc(new_codons[j],strain[1],pos_in_cod) + codon[i][j+2] = mut_type_check(line[1][9],line[1][8],codon[0],strain[1],new_codons[j]) + straininfo[j][codon[i][j+2][0]] += 1# Counting; + elif strain[1] in ['a','g','c','t']: + codon[i][j+2] = ['MNP',strain[1],'',''] + straininfo[j]['mnps'] += 1 + new_codons[j] = new_codon_calc(new_codons[j],strain[1],pos_in_cod) + elif strain[0] == 'Allele missing': codon[i][j+2] = strain + else: codon[i][j+2] = ['']*4 + + for line in codon[1:-1]: + if line != '': write_output(line) + +def feature_props(feature): + properties = {'type':feature.type,'strand':feature.location._strand, + 'sequence':feature.extract(seq_record.seq),'pseudo': False, + 'locus_tag':'','gene_name':'','product':'', + 'start':int(feature.location._start.position), + 'end':int(feature.location._end.position)} + if 'pseudo' in feature.qualifiers: + properties['pseudo'] = True + properties['type'] = 'pseudogene' + properties['pure_seq'] = properties['sequence'] + if properties['strand'] == -1: + properties['sequence'] = seq_record.seq[feature.location._start.position:feature.location._end.position].reverse_complement() + else: + properties['sequence'] = seq_record.seq[feature.location._start.position:feature.location._end.position] + if feature.sub_features: properties['subfeats'] = feature.sub_features + if 'locus_tag' in feature.qualifiers: properties['locus_tag'] = feature.qualifiers['locus_tag'][0] + if 'gene' in feature.qualifiers: properties['gene_name']= feature.qualifiers['gene'][0] + if feature.type in ['tRNA','rRNA','CDS']: properties['product'] = feature.qualifiers['product'][0] + + return properties + +# Read embl/genbank file for information on sequence features; +try: + seq_record = SeqIO.parse(file_ref, filetype_reference).next() +except: + file_ref.close() + quit("Error reading "+sys.argv[2]+", please check file for errors.") +file_ref.close() + +# Loop through genome features and save relevant properties; +feats = []# Dictionary of properties; + +feature_types = {'intergenic':0,'gene':0,'pseudogene':0} +feat_temp_store = '' +for feature in seq_record.features: + # Check if gene is defined as other feature (e.g. CDS). Else, save info from 'gene'; + if feat_temp_store != '': + if (feature.location._start.position == feat_temp_store.location._start.position and + feature.location._end.position == feat_temp_store.location._end.position):# Gene also defined as other feature; + feat_temp_store = '' + else:# Gene not also defined as CDS; + feats.append(feature_props(feat_temp_store)) + feat_temp_store = '' + elif feature.type == 'gene': + feat_temp_store = feature + + if not feature.type in ['source','gene','misc_feature']: + if not feature.type in feature_types and feature.type != 'CDS': feature_types[feature.type] = 0 + feats.append(feature_props(feature)) + + +feat_props = sorted(feats, key=lambda cells:int(cells['start'])) +feat_boundaries = [{'start':item['start'],'end':item['end']} for item in feat_props] +regions = region_calc(feat_boundaries,len(seq_record.seq)) +feat_overlap = overlap_calc(regions) + +reference_loaded = time.clock() + +# Create array of SNPs from input file for processing; +lines = [line.split('\t') for line in file_snps if line.strip()] +file_snps.close() +# First line contains headers, extract number of strains etc; +headers = lines[0] +snp_table = sorted(lines[1:], key=lambda cells:int(cells[0])) + +snps_loaded = time.clock() + +# Print output file headers; +headers[-1] = headers[-1].rstrip()# Remove newline character; +strain_nr = len(headers)-1 +strains_found = 'Found '+str(strain_nr)+' strains: '+headers[1]+' (reference)' +first_line = '\t'+headers[1]+'\t'*9 +second_line = 'Position\tFeature\tLocus tag\tGene\tProduct\tStart\tEnd\tStrand\tRef. base\tRef. codon\tRef. res.' +straininfo = [0]*(len(headers[2:])) +for i,strain in enumerate(headers[2:]): + straininfo[i] = {'snps':0,'mnps':0,'synonymous':0,'nonsynonymous':0,'nonstart':0,'nonstop':0,'nonsense':0} + straininfo[i].update(feature_types) + strains_found += ', '+strain + first_line += '\t\t'+strain+'\t'*3 + second_line += '\t\tSNP type\tNew base\tNew codon\tNew res.' + +file_out.write(first_line+'\n'+second_line) +file_out.flush() + +# Loop through SNPs from array and process them; +props = {}# Properties of a feature; +prev_snp = ''# Position of previous SNP; +to_write = []# Information of current SNP; +compl_bases = {'a':'t','t':'a','g':'c','c':'g'} +firstsnp = True# First snp of region, or of codon in cases of 3 positions in codon mutated; +prev_start=j=k=0 +overlap_snps = [] +codon = ['']*5# Array of codon positions. First item is position of first base of codon in the gene; + +for region in regions: + firstsnp = True + i = prev_start + while i < len(snp_table):# Loop through SNPs + snp_entry = snp_table[i] + if not str(snp_entry[0]).isdigit():# Not a valid line, skip; + i += 1 + continue + + pos = int(snp_entry[0])-1 + if pos < region[0]:# Not inside region yet; + i += 1 + continue + elif firstsnp and pos < region[1]: + prev_start = i + elif pos >= region[1]:# End of region, process and next; + if not firstsnp and codon != ['','','','','']: + codon_process(codon) + break + + # Documentation of SNPs in feature overlaps; + while j < len(feat_overlap)-1 and pos > feat_overlap[j][1]: j += 1 + k = j + while k < len(feat_overlap) and pos >= feat_overlap[k][0]: + if pos < feat_overlap[k][1]: + if feat_overlap[k][4][0] == 0: + feat_overlap[k][4][0] = pos + feat_overlap[k][4][1] = pos + k += 1 + + + snp_entry[-1] = snp_entry[-1].rstrip()# Remove newline character at end of line; + + # Prevent crash upon encounter nonstandard base character; + if snp_entry[1] not in ['a','g','c','t']:# Ref base + i += 1 + continue + for m,base in enumerate(snp_entry[2:],2): + if base.lower() not in ['a','g','c','t']: + snp_entry[m] = '' + # Crash prevented + + mnp=in_feat=False + snp_feat = region[2] + ref_base = snp_entry[1] + + to_write = [[pos+1]] + + # Output feature properties and reference situation; + if snp_feat == -1: + codon = ['']*5 + to_write.append(['intergenic','','','','','','',ref_base.upper(),'','']) + elif feat_props[snp_feat]['type'] not in ['CDS','gene','pseudogene']:# In feature, but non-coding; + codon = ['']*5 + props = feat_props[snp_feat] + if props['strand'] == -1: ref_base = (compl_bases[snp_entry[1].lower()]) + else: ref_base = snp_entry[1] + to_write.append([props['type'],props['locus_tag'],props['gene_name'], + props['product'],props['start']+1,props['end'], + '',ref_base.upper(),'','']) + else:# in CDS/gene feature, check codon etc; + props = feat_props[snp_feat] + sequence = props['sequence'] + if props['strand'] == -1: + pos_in_gene = props['end'] - pos - 1# Python counting + ref_base = (compl_bases[snp_entry[1].lower()]) + else: + pos_in_gene = pos - props['start']# Python counting + ref_base = snp_entry[1] + + in_feat = True + if props['pseudo'] and 'subfeats' in props:# Pseudogene that needs special attention; + in_feat = False + subfeat_boundaries = [{'start':item.location._start.position,'end':item.location._end.position} + for item in props['subfeats']] + snp_subfeat = match_feature(subfeat_boundaries,pos) + if snp_subfeat != -1: + in_feat = True + pos_in_gene -= non_coding_calc({'start':props['start'],'subfeats':props['subfeats'], + 'pseudo':True,'strand':props['strand']},pos) + sequence = props['pure_seq'] + + if not in_feat:# In pseudogene non-coding region; + codon = ['']*5 + to_write.append(['non coding',props['locus_tag'],props['gene_name'],props['product'], + props['start']+1,props['end'],props['strand'],ref_base.upper(), + '','']) + else:# In coding region; + pos_in_cod = (pos_in_gene+1)%3 + if pos_in_cod == 0: pos_in_cod = 3# Remainder of division 0 means 3rd place in codon; + + old_codon = sequence[pos_in_gene-pos_in_cod+1:pos_in_gene-pos_in_cod+4].upper() + old_residue = old_codon.translate(table=transl_table) + to_write.append([props['type'],props['locus_tag'],props['gene_name'],props['product'], + props['start']+1,props['end'],props['strand'],ref_base.upper(), + old_codon,old_residue]) + + if in_feat and not firstsnp and (pos >= prev_snp):# Check if snp is in same codon as previous snp. Position check for overlapping features; + if props['strand'] == 1 and (pos - prev_snp + 1) < pos_in_cod:# Same codon (Positive strand); + mnp = True + elif props['strand'] == -1 and (pos - prev_snp + 1) <= (3 - pos_in_cod):# Same codon (negative strand); + mnp = True + + # Process previous codon if not MNP; + if in_feat and not mnp: + if not firstsnp: + codon_process(codon) + codon = [pos_in_gene-pos_in_cod+1,'','','',props['strand']] + + + for l, snp in enumerate(snp_entry[2:]):# Loop through SNPs/strains; + + snp = snp.lower() + if snp == '':# Empty cell; + to_write.append(['','','','']) + continue + + if snp == '-': # Feature not present in this strain; + to_write.append(['Allele missing','','','']) + continue + + if snp_feat == -1:# Intergenic; + if snp == ref_base.lower(): + to_write.append(['']*4) + else: + to_write.append(['',snp,'','']) + straininfo[l]['intergenic'] += 1 + straininfo[l]['snps'] += 1 + continue + + if props['strand'] == -1: + snp = compl_bases[snp] + + if snp == ref_base.lower(): + to_write.append(['']*4) + else: + to_write.append(['',snp,'','']) + straininfo[l]['snps'] += 1 + if props['type'] != 'CDS': + straininfo[l][props['type']] += 1 + + + + if props['type'] in ['CDS','gene','pseudogene'] and in_feat: + codon[pos_in_cod] = to_write + else: + write_output(to_write) + + if firstsnp: firstsnp = False + prev_snp = pos+1 + i += 1 + + +if codon != ['','','','','']: codon_process(codon) + +file_out.close() + +end = time.clock() + +file_summary.write("\n") +file_summary.write(intro_message) +file_summary.write('\n'+strains_found+'.\n') + +file_summary.write("\nFinished annotation. Total time: %s s\n\n" % round(end-start,1)) + + +file_overlap.write('SNP start\tSNP end\tFeature 1\tLocus tag\tProduct\t\tFeature 2\tLocus tag\tProduct') +for overlap in feat_overlap: + if overlap[4] != [0,0]: + overlap[4][0]+=1 + overlap[4][1]+=1 + if overlap[4][0] == overlap[4][1]: overlap[4][1] = '' + write_output([[str(overlap[4][0])],[str(overlap[4][1]),feat_props[overlap[2]]['type'],feat_props[overlap[2]]['locus_tag'],feat_props[overlap[2]]['product']], + [feat_props[overlap[3]]['type'],feat_props[overlap[3]]['locus_tag'],feat_props[overlap[3]]['product']]], + file_overlap) + + +for i,strain in enumerate(headers[2:]): + file_summary.write("\n") + info = straininfo[i] + file_summary.write("+ Strain %s:\n" % strain) + file_summary.write(" %s SNPs found\n" % info['snps']) + file_summary.write(" Number of SNPs found CDS features: %s\n" % (info['mnps']+info['nonstart']+info['nonstop']+info['nonsense']+ + info['synonymous']+info['nonsynonymous'])) + file_summary.write(" (of which in pseudogenes: %s)\n" % info['pseudogene']) + file_summary.write(" - MNPs: %s\n" % info['mnps']) + file_summary.write(" - Synonymous: %s\n" % info['synonymous']) + file_summary.write(" - Nonsynonymous: %s\n" % info['nonsynonymous']) + file_summary.write(" - Nonsense: %s\n" % info['nonsense']) + file_summary.write(" - Nonstart: %s\n" % info['nonstart']) + file_summary.write(" - Nonstop: %s\n" % info['nonstop']) + file_summary.write(" Intergenic: %s\n" % info['intergenic']) + + for typ in feature_types: + if typ not in ['intergenic','pseudogene'] and info[typ] != 0: + file_summary.write(" %s: %s\n" % (typ,info[typ])) + file_summary.flush() + +file_overlap.close() +file_summary.close()