Mercurial > repos > petr-novak > repeat_annotation_pipeline2
comparison get_contigs_from_re_archive.py @ 0:cf3cea0a3039 draft
Uploaded
| author | petr-novak |
|---|---|
| date | Thu, 07 Oct 2021 06:07:34 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:cf3cea0a3039 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 ''' | |
| 3 parse .aln file - output from cap3 program. Output is fasta file and | |
| 4 profile file | |
| 5 ''' | |
| 6 import argparse | |
| 7 import re | |
| 8 import zipfile | |
| 9 import tempfile | |
| 10 import textwrap | |
| 11 | |
| 12 def parse_args(): | |
| 13 '''Argument parsin''' | |
| 14 description = """ | |
| 15 parsing cap3 assembly aln output | |
| 16 """ | |
| 17 | |
| 18 parser = argparse.ArgumentParser( | |
| 19 description=description, formatter_class=argparse.RawTextHelpFormatter) | |
| 20 parser.add_argument('-re', | |
| 21 '--re_file', | |
| 22 default=None, | |
| 23 required=True, | |
| 24 help="RepeatExlorer archive or directory", | |
| 25 type=str, | |
| 26 action='store') | |
| 27 parser.add_argument('-f', | |
| 28 '--fasta', | |
| 29 default=None, | |
| 30 required=True, | |
| 31 help="fasta output file name", | |
| 32 type=str, | |
| 33 action='store') | |
| 34 parser.add_argument('-m', | |
| 35 '--min_coverage', | |
| 36 default=5, | |
| 37 required=False, | |
| 38 help="minimum contig coverage", | |
| 39 type=int, | |
| 40 action="store") | |
| 41 parser.add_argument('-L', | |
| 42 '--min_contig_length', | |
| 43 default=50, | |
| 44 required=False, | |
| 45 help="minimum contig length", | |
| 46 type=int, | |
| 47 action="store") | |
| 48 return parser.parse_args() | |
| 49 | |
| 50 | |
| 51 def get_header(f): | |
| 52 aln_header = " . : . : . : . : . : . :" | |
| 53 contig_lead = "******************" | |
| 54 aln_start = -1 | |
| 55 while True: | |
| 56 line = f.readline() | |
| 57 if not line: | |
| 58 return None, None | |
| 59 if line[0:18] == contig_lead: | |
| 60 line2 = f.readline() | |
| 61 else: | |
| 62 continue | |
| 63 if aln_header in line2: | |
| 64 aln_start = line2.index(aln_header) | |
| 65 break | |
| 66 contig_name = line.split()[1] + line.split()[2] | |
| 67 return contig_name, aln_start | |
| 68 | |
| 69 | |
| 70 def segment_start(f): | |
| 71 pos = f.tell() | |
| 72 line = f.readline() | |
| 73 # detect next contig or end of file | |
| 74 if "********" in line or line == "" or "Number of segment pairs = " in line: | |
| 75 segment = False | |
| 76 else: | |
| 77 segment = True | |
| 78 f.seek(pos) | |
| 79 return segment | |
| 80 | |
| 81 | |
| 82 def get_segment(f, seq_start): | |
| 83 if not segment_start(f): | |
| 84 return None, None | |
| 85 aln = [] | |
| 86 while True: | |
| 87 line = f.readline() | |
| 88 if ". : . :" in line: | |
| 89 continue | |
| 90 if "__________" in line: | |
| 91 consensus = f.readline().rstrip('\n')[seq_start:] | |
| 92 f.readline() # empty line | |
| 93 break | |
| 94 else: | |
| 95 aln.append(line.rstrip('\n')[seq_start:]) | |
| 96 return aln, consensus | |
| 97 | |
| 98 | |
| 99 def aln2coverage(aln): | |
| 100 coverage = [0] * len(aln[0]) | |
| 101 for a in aln: | |
| 102 for i, c in enumerate(a): | |
| 103 if c not in " -": | |
| 104 coverage[i] += 1 | |
| 105 return coverage | |
| 106 | |
| 107 | |
| 108 def read_contig(f, seq_start): | |
| 109 contig = "" | |
| 110 coverage = [] | |
| 111 while True: | |
| 112 aln, consensus = get_segment(f, seq_start) | |
| 113 if aln: | |
| 114 contig += consensus | |
| 115 coverage += aln2coverage(aln) | |
| 116 else: | |
| 117 break | |
| 118 return contig, coverage | |
| 119 | |
| 120 | |
| 121 def remove_gaps(consensus, coverage): | |
| 122 if "-" not in consensus: | |
| 123 return consensus, coverage | |
| 124 new_coverage = [ | |
| 125 cov for cons, cov in zip(consensus, coverage) if cons != "-" | |
| 126 ] | |
| 127 new_consensus = consensus.replace("-", "") | |
| 128 return new_consensus, new_coverage | |
| 129 | |
| 130 | |
| 131 def extract_contigs_from_re_archive(archive, aln_output): | |
| 132 with zipfile.ZipFile(archive, 'r') as zip_object, open(aln_output, | |
| 133 'w') as fout: | |
| 134 flist = zip_object.infolist() | |
| 135 for fn in flist: | |
| 136 if re.match('seqclust.+[.]aln$', fn.filename): | |
| 137 with zip_object.open(fn.filename) as aln: | |
| 138 for l in aln: | |
| 139 fout.write(l.decode('utf-8')) | |
| 140 return aln_output | |
| 141 | |
| 142 def read_tarean_fasta(fobj): | |
| 143 ids = [] | |
| 144 s = [] | |
| 145 for i in fobj: | |
| 146 ii = i.decode('utf-8') | |
| 147 if ii[0] == ">": | |
| 148 ids.append(ii) | |
| 149 s.append("") | |
| 150 else: | |
| 151 s[-1] = s[-1] + ii.strip() | |
| 152 return ids, s | |
| 153 | |
| 154 def extract_tarean_contigs_from_re_archive(archive): | |
| 155 with zipfile.ZipFile(archive, 'r') as zip_object: | |
| 156 flist = zip_object.infolist() | |
| 157 for fn in flist: | |
| 158 if re.match("seqclust.+dir_CL[0-9]+[/]tarean_contigs.fasta", fn.filename): | |
| 159 print(fn.filename) | |
| 160 with zip_object.open(fn.filename) as fobj: | |
| 161 ids, seqs = read_tarean_fasta(fobj) | |
| 162 # wrap sequences | |
| 163 seqs = ["\n".join(textwrap.wrap(s + s, 80)) for s in seqs] | |
| 164 return ids, seqs | |
| 165 | |
| 166 | |
| 167 def extract_contigs_from_re_directory(dir, aln_output): | |
| 168 # TODO | |
| 169 pass | |
| 170 | |
| 171 | |
| 172 def filter_contigs(consensus, coverage, min_coverage=5): | |
| 173 x = "".join([ | |
| 174 s if cov >= min_coverage else " " | |
| 175 for s, cov in zip(consensus, coverage) | |
| 176 ]).strip() | |
| 177 consensus_N = "\n".join(textwrap.wrap(x.replace(" ", "N"),80)) | |
| 178 return consensus_N | |
| 179 | |
| 180 | |
| 181 def main(): | |
| 182 args = parse_args() | |
| 183 # extract aln from archive | |
| 184 ids, seqs = extract_tarean_contigs_from_re_archive(args.re_file) | |
| 185 aln_file = extract_contigs_from_re_archive( | |
| 186 args.re_file, | |
| 187 tempfile.NamedTemporaryFile().name) | |
| 188 with open(aln_file, 'r') as f1, open(args.fasta, 'w') as ffasta: | |
| 189 while True: | |
| 190 contig_name, seq_start = get_header(f1) | |
| 191 if contig_name: | |
| 192 consensus, coverage = remove_gaps(*read_contig(f1, seq_start)) | |
| 193 clean_consensus = filter_contigs(consensus, coverage, | |
| 194 args.min_coverage) | |
| 195 if len(clean_consensus) >= args.min_contig_length: | |
| 196 ffasta.write(">{}\n".format(contig_name)) | |
| 197 ffasta.write("{}\n".format(clean_consensus)) | |
| 198 else: | |
| 199 break | |
| 200 | |
| 201 # write tarean sequences: | |
| 202 for i, s in zip(ids, seqs): | |
| 203 ffasta.write(i) | |
| 204 ffasta.write(s + "\n") | |
| 205 | |
| 206 | |
| 207 | |
| 208 if __name__ == "__main__": | |
| 209 | |
| 210 main() |
