Mercurial > repos > petr-novak > repeat_annotation_pipeline3
comparison get_contigs_from_re_archive.py @ 0:ea6a3059a6af draft
Uploaded
author | petr-novak |
---|---|
date | Mon, 18 Oct 2021 11:01:20 +0000 |
parents | |
children | 814cba36e435 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:ea6a3059a6af |
---|---|
1 #!/usr/bin/env python | |
2 ''' | |
3 parse .aln file - output from cap3 program. Output is fasta file and | |
4 profile file | |
5 ''' | |
6 import argparse | |
7 import re | |
8 import zipfile | |
9 import tempfile | |
10 import textwrap | |
11 | |
12 def parse_args(): | |
13 '''Argument parsin''' | |
14 description = """ | |
15 parsing cap3 assembly aln output | |
16 """ | |
17 | |
18 parser = argparse.ArgumentParser( | |
19 description=description, formatter_class=argparse.RawTextHelpFormatter) | |
20 parser.add_argument('-re', | |
21 '--re_file', | |
22 default=None, | |
23 required=True, | |
24 help="RepeatExlorer archive or directory", | |
25 type=str, | |
26 action='store') | |
27 parser.add_argument('-f', | |
28 '--fasta', | |
29 default=None, | |
30 required=True, | |
31 help="fasta output file name", | |
32 type=str, | |
33 action='store') | |
34 parser.add_argument('-m', | |
35 '--min_coverage', | |
36 default=5, | |
37 required=False, | |
38 help="minimum contig coverage", | |
39 type=int, | |
40 action="store") | |
41 parser.add_argument('-L', | |
42 '--min_contig_length', | |
43 default=50, | |
44 required=False, | |
45 help="minimum contig length", | |
46 type=int, | |
47 action="store") | |
48 return parser.parse_args() | |
49 | |
50 | |
51 def get_header(f): | |
52 aln_header = " . : . : . : . : . : . :" | |
53 contig_lead = "******************" | |
54 aln_start = -1 | |
55 while True: | |
56 line = f.readline() | |
57 if not line: | |
58 return None, None | |
59 if line[0:18] == contig_lead: | |
60 line2 = f.readline() | |
61 else: | |
62 continue | |
63 if aln_header in line2: | |
64 aln_start = line2.index(aln_header) | |
65 break | |
66 contig_name = line.split()[1] + line.split()[2] | |
67 return contig_name, aln_start | |
68 | |
69 | |
70 def segment_start(f): | |
71 pos = f.tell() | |
72 line = f.readline() | |
73 # detect next contig or end of file | |
74 if "********" in line or line == "" or "Number of segment pairs = " in line: | |
75 segment = False | |
76 else: | |
77 segment = True | |
78 f.seek(pos) | |
79 return segment | |
80 | |
81 | |
82 def get_segment(f, seq_start): | |
83 if not segment_start(f): | |
84 return None, None | |
85 aln = [] | |
86 while True: | |
87 line = f.readline() | |
88 if ". : . :" in line: | |
89 continue | |
90 if "__________" in line: | |
91 consensus = f.readline().rstrip('\n')[seq_start:] | |
92 f.readline() # empty line | |
93 break | |
94 else: | |
95 aln.append(line.rstrip('\n')[seq_start:]) | |
96 return aln, consensus | |
97 | |
98 | |
99 def aln2coverage(aln): | |
100 coverage = [0] * len(aln[0]) | |
101 for a in aln: | |
102 for i, c in enumerate(a): | |
103 if c not in " -": | |
104 coverage[i] += 1 | |
105 return coverage | |
106 | |
107 | |
108 def read_contig(f, seq_start): | |
109 contig = "" | |
110 coverage = [] | |
111 while True: | |
112 aln, consensus = get_segment(f, seq_start) | |
113 if aln: | |
114 contig += consensus | |
115 coverage += aln2coverage(aln) | |
116 else: | |
117 break | |
118 return contig, coverage | |
119 | |
120 | |
121 def remove_gaps(consensus, coverage): | |
122 if "-" not in consensus: | |
123 return consensus, coverage | |
124 new_coverage = [ | |
125 cov for cons, cov in zip(consensus, coverage) if cons != "-" | |
126 ] | |
127 new_consensus = consensus.replace("-", "") | |
128 return new_consensus, new_coverage | |
129 | |
130 | |
131 def extract_contigs_from_re_archive(archive, aln_output): | |
132 with zipfile.ZipFile(archive, 'r') as zip_object, open(aln_output, | |
133 'w') as fout: | |
134 flist = zip_object.infolist() | |
135 for fn in flist: | |
136 if re.match('seqclust.+[.]aln$', fn.filename): | |
137 with zip_object.open(fn.filename) as aln: | |
138 for l in aln: | |
139 fout.write(l.decode('utf-8')) | |
140 return aln_output | |
141 | |
142 def read_tarean_fasta(fobj): | |
143 ids = [] | |
144 s = [] | |
145 for i in fobj: | |
146 ii = i.decode('utf-8') | |
147 if ii[0] == ">": | |
148 ids.append(ii) | |
149 s.append("") | |
150 else: | |
151 s[-1] = s[-1] + ii.strip() | |
152 return ids, s | |
153 | |
154 def extract_tarean_contigs_from_re_archive(archive): | |
155 with zipfile.ZipFile(archive, 'r') as zip_object: | |
156 flist = zip_object.infolist() | |
157 seqs_all = [] | |
158 ids_all = [] | |
159 for fn in flist: | |
160 if re.match("seqclust.+dir_CL[0-9]+[/]tarean_contigs.fasta", fn.filename): | |
161 print(fn.filename) | |
162 with zip_object.open(fn.filename) as fobj: | |
163 ids, seqs = read_tarean_fasta(fobj) | |
164 # wrap sequences | |
165 seqs = ["\n".join(textwrap.wrap(s + s, 80)) for s in seqs] | |
166 seqs_all += seqs | |
167 ids_all += ids | |
168 return ids_all, seqs_all | |
169 | |
170 | |
171 def extract_contigs_from_re_directory(dir, aln_output): | |
172 # TODO | |
173 pass | |
174 | |
175 | |
176 def filter_contigs(consensus, coverage, min_coverage=5): | |
177 x = "".join([ | |
178 s if cov >= min_coverage else " " | |
179 for s, cov in zip(consensus, coverage) | |
180 ]).strip() | |
181 consensus_N = "\n".join(textwrap.wrap(x.replace(" ", "N"),80)) | |
182 return consensus_N | |
183 | |
184 | |
185 def main(): | |
186 args = parse_args() | |
187 # extract aln from archive | |
188 ids, seqs = extract_tarean_contigs_from_re_archive(args.re_file) | |
189 aln_file = extract_contigs_from_re_archive( | |
190 args.re_file, | |
191 tempfile.NamedTemporaryFile().name) | |
192 with open(aln_file, 'r') as f1, open(args.fasta, 'w') as ffasta: | |
193 while True: | |
194 contig_name, seq_start = get_header(f1) | |
195 if contig_name: | |
196 consensus, coverage = remove_gaps(*read_contig(f1, seq_start)) | |
197 clean_consensus = filter_contigs(consensus, coverage, | |
198 args.min_coverage) | |
199 if len(clean_consensus) >= args.min_contig_length: | |
200 ffasta.write(">{}\n".format(contig_name)) | |
201 ffasta.write("{}\n".format(clean_consensus)) | |
202 else: | |
203 break | |
204 | |
205 # write tarean sequences: | |
206 for i, s in zip(ids, seqs): | |
207 ffasta.write(i) | |
208 ffasta.write(s + "\n") | |
209 | |
210 | |
211 | |
212 if __name__ == "__main__": | |
213 | |
214 main() |