Mercurial > repos > petr-novak > dante
comparison parse_aln.py @ 15:3151a72a6671 draft
Uploaded
author | petr-novak |
---|---|
date | Tue, 03 Sep 2019 05:20:02 -0400 |
parents | a6c55d1bdb6c |
children | 0e820310d4dc |
comparison
equal
deleted
inserted
replaced
14:a6c55d1bdb6c | 15:3151a72a6671 |
---|---|
2 ''' | 2 ''' |
3 parse .aln file - output from cap3 program. Output is fasta file and | 3 parse .aln file - output from cap3 program. Output is fasta file and |
4 profile file | 4 profile file |
5 ''' | 5 ''' |
6 import argparse | 6 import argparse |
7 import re | |
7 | 8 |
8 | 9 |
9 def parse_args(): | 10 def parse_args(): |
10 '''Argument parsin''' | 11 '''Argument parsin''' |
11 description = """ | 12 description = """ |
12 parsing cap3 assembly aln output | 13 parsing cap3 assembly aln output |
13 """ | 14 """ |
14 parser = argparse.ArgumentParser(description=description, | 15 |
15 formatter_class=argparse.RawTextHelpFormatter) | 16 parser = argparse.ArgumentParser( |
16 parser.add_argument( | 17 description=description, |
17 '-a', '--aln_file', | 18 formatter_class=argparse.RawTextHelpFormatter) |
18 default=None, required=True, | 19 parser.add_argument('-a', |
19 help="Aln file input", | 20 '--aln_file', |
20 type=str, | 21 default=None, |
21 action='store') | 22 required=True, |
22 parser.add_argument( | 23 help="Aln file input", |
23 '-f', '--fasta', | 24 type=str, |
24 default=None, required=True, | 25 action='store') |
25 help="fasta output file name", | 26 parser.add_argument('-f', |
26 type=str, | 27 '--fasta', |
27 action='store') | 28 default=None, |
28 parser.add_argument( | 29 required=True, |
29 '-p', '--profile', | 30 help="fasta output file name", |
30 default=None, required=True, | 31 type=str, |
31 help="output file for coverage profile", | 32 action='store') |
32 type=str, | 33 parser.add_argument('-p', |
33 action="store" | 34 '--profile', |
34 ) | 35 default=None, |
36 required=True, | |
37 help="output file for coverage profile", | |
38 type=str, | |
39 action="store") | |
35 return parser.parse_args() | 40 return parser.parse_args() |
41 | |
42 | |
43 def get_header(f): | |
44 aln_header = ". : . : . : . : . : . :" | |
45 contig_lead = "******************" | |
46 aln_start = -1 | |
47 while True: | |
48 line = f.readline() | |
49 if not line: | |
50 return None, None | |
51 if line[0:18] == contig_lead: | |
52 line2 = f.readline() | |
53 else: | |
54 continue | |
55 if aln_header in line2: | |
56 aln_start = line2.index(aln_header) | |
57 break | |
58 contig_name = line.split()[1] + line.split()[2] | |
59 return contig_name, aln_start | |
60 | |
61 | |
62 def segment_start(f): | |
63 pos = f.tell() | |
64 line = f.readline() | |
65 # detect next contig or end of file | |
66 if "********" in line or line == "": | |
67 segment = False | |
68 else: | |
69 segment = True | |
70 f.seek(pos) | |
71 return segment | |
72 | |
73 | |
74 def get_segment(f, seq_start): | |
75 if not segment_start(f): | |
76 return None, None | |
77 aln = [] | |
78 while True: | |
79 line = f.readline() | |
80 if ". : . :" in line: | |
81 continue | |
82 if "__________" in line: | |
83 consensus = f.readline().rstrip('\n')[seq_start:] | |
84 f.readline() # empty line | |
85 break | |
86 else: | |
87 aln.append(line.rstrip('\n')[seq_start:]) | |
88 return aln, consensus | |
89 | |
90 | |
91 def aln2coverage(aln): | |
92 coverage = [0] * len(aln[0]) | |
93 for a in aln: | |
94 for i, c in enumerate(a): | |
95 if c not in " -": | |
96 coverage[i] += 1 | |
97 return coverage | |
98 | |
99 | |
100 def read_contig(f, seq_start): | |
101 contig = "" | |
102 coverage = [] | |
103 while True: | |
104 aln, consensus = get_segment(f, seq_start) | |
105 if aln: | |
106 contig += consensus | |
107 coverage += aln2coverage(aln) | |
108 else: | |
109 break | |
110 return contig, coverage | |
111 | |
112 def remove_gaps(consensus, coverage): | |
113 if "-" not in consensus: | |
114 return consensus, coverage | |
115 new_coverage = [cov for cons, cov in zip(consensus, coverage) | |
116 if cons != "-"] | |
117 new_consensus = consensus.replace("-", "") | |
118 return new_consensus, new_coverage | |
119 | |
120 def main(): | |
121 args = parse_args() | |
122 with open(args.aln_file, 'r') as f1, open(args.fasta, 'w') as ffasta, open(args.profile, 'w') as fprofile: | |
123 while True: | |
124 contig_name, seq_start = get_header(f1) | |
125 if contig_name: | |
126 consensus, coverage = remove_gaps(*read_contig(f1, seq_start)) | |
127 ffasta.write(">{}\n".format(contig_name)) | |
128 ffasta.write("{}\n".format(consensus)) | |
129 fprofile.write(">{}\n".format(contig_name)) | |
130 fprofile.write("{}\n".format(" ".join([str(i) for i in coverage]))) | |
131 else: | |
132 break | |
36 | 133 |
37 | 134 |
38 if __name__ == "__main__": | 135 if __name__ == "__main__": |
39 | 136 |
40 args = parse_args() | 137 main() |
41 print(args.profile) | |
42 | |
43 |