Mercurial > repos > petr-novak > dante
comparison parse_aln.py @ 15:3151a72a6671 draft
Uploaded
| author | petr-novak |
|---|---|
| date | Tue, 03 Sep 2019 05:20:02 -0400 |
| parents | a6c55d1bdb6c |
| children | 0e820310d4dc |
comparison
equal
deleted
inserted
replaced
| 14:a6c55d1bdb6c | 15:3151a72a6671 |
|---|---|
| 2 ''' | 2 ''' |
| 3 parse .aln file - output from cap3 program. Output is fasta file and | 3 parse .aln file - output from cap3 program. Output is fasta file and |
| 4 profile file | 4 profile file |
| 5 ''' | 5 ''' |
| 6 import argparse | 6 import argparse |
| 7 import re | |
| 7 | 8 |
| 8 | 9 |
| 9 def parse_args(): | 10 def parse_args(): |
| 10 '''Argument parsin''' | 11 '''Argument parsin''' |
| 11 description = """ | 12 description = """ |
| 12 parsing cap3 assembly aln output | 13 parsing cap3 assembly aln output |
| 13 """ | 14 """ |
| 14 parser = argparse.ArgumentParser(description=description, | 15 |
| 15 formatter_class=argparse.RawTextHelpFormatter) | 16 parser = argparse.ArgumentParser( |
| 16 parser.add_argument( | 17 description=description, |
| 17 '-a', '--aln_file', | 18 formatter_class=argparse.RawTextHelpFormatter) |
| 18 default=None, required=True, | 19 parser.add_argument('-a', |
| 19 help="Aln file input", | 20 '--aln_file', |
| 20 type=str, | 21 default=None, |
| 21 action='store') | 22 required=True, |
| 22 parser.add_argument( | 23 help="Aln file input", |
| 23 '-f', '--fasta', | 24 type=str, |
| 24 default=None, required=True, | 25 action='store') |
| 25 help="fasta output file name", | 26 parser.add_argument('-f', |
| 26 type=str, | 27 '--fasta', |
| 27 action='store') | 28 default=None, |
| 28 parser.add_argument( | 29 required=True, |
| 29 '-p', '--profile', | 30 help="fasta output file name", |
| 30 default=None, required=True, | 31 type=str, |
| 31 help="output file for coverage profile", | 32 action='store') |
| 32 type=str, | 33 parser.add_argument('-p', |
| 33 action="store" | 34 '--profile', |
| 34 ) | 35 default=None, |
| 36 required=True, | |
| 37 help="output file for coverage profile", | |
| 38 type=str, | |
| 39 action="store") | |
| 35 return parser.parse_args() | 40 return parser.parse_args() |
| 41 | |
| 42 | |
| 43 def get_header(f): | |
| 44 aln_header = ". : . : . : . : . : . :" | |
| 45 contig_lead = "******************" | |
| 46 aln_start = -1 | |
| 47 while True: | |
| 48 line = f.readline() | |
| 49 if not line: | |
| 50 return None, None | |
| 51 if line[0:18] == contig_lead: | |
| 52 line2 = f.readline() | |
| 53 else: | |
| 54 continue | |
| 55 if aln_header in line2: | |
| 56 aln_start = line2.index(aln_header) | |
| 57 break | |
| 58 contig_name = line.split()[1] + line.split()[2] | |
| 59 return contig_name, aln_start | |
| 60 | |
| 61 | |
| 62 def segment_start(f): | |
| 63 pos = f.tell() | |
| 64 line = f.readline() | |
| 65 # detect next contig or end of file | |
| 66 if "********" in line or line == "": | |
| 67 segment = False | |
| 68 else: | |
| 69 segment = True | |
| 70 f.seek(pos) | |
| 71 return segment | |
| 72 | |
| 73 | |
| 74 def get_segment(f, seq_start): | |
| 75 if not segment_start(f): | |
| 76 return None, None | |
| 77 aln = [] | |
| 78 while True: | |
| 79 line = f.readline() | |
| 80 if ". : . :" in line: | |
| 81 continue | |
| 82 if "__________" in line: | |
| 83 consensus = f.readline().rstrip('\n')[seq_start:] | |
| 84 f.readline() # empty line | |
| 85 break | |
| 86 else: | |
| 87 aln.append(line.rstrip('\n')[seq_start:]) | |
| 88 return aln, consensus | |
| 89 | |
| 90 | |
| 91 def aln2coverage(aln): | |
| 92 coverage = [0] * len(aln[0]) | |
| 93 for a in aln: | |
| 94 for i, c in enumerate(a): | |
| 95 if c not in " -": | |
| 96 coverage[i] += 1 | |
| 97 return coverage | |
| 98 | |
| 99 | |
| 100 def read_contig(f, seq_start): | |
| 101 contig = "" | |
| 102 coverage = [] | |
| 103 while True: | |
| 104 aln, consensus = get_segment(f, seq_start) | |
| 105 if aln: | |
| 106 contig += consensus | |
| 107 coverage += aln2coverage(aln) | |
| 108 else: | |
| 109 break | |
| 110 return contig, coverage | |
| 111 | |
| 112 def remove_gaps(consensus, coverage): | |
| 113 if "-" not in consensus: | |
| 114 return consensus, coverage | |
| 115 new_coverage = [cov for cons, cov in zip(consensus, coverage) | |
| 116 if cons != "-"] | |
| 117 new_consensus = consensus.replace("-", "") | |
| 118 return new_consensus, new_coverage | |
| 119 | |
| 120 def main(): | |
| 121 args = parse_args() | |
| 122 with open(args.aln_file, 'r') as f1, open(args.fasta, 'w') as ffasta, open(args.profile, 'w') as fprofile: | |
| 123 while True: | |
| 124 contig_name, seq_start = get_header(f1) | |
| 125 if contig_name: | |
| 126 consensus, coverage = remove_gaps(*read_contig(f1, seq_start)) | |
| 127 ffasta.write(">{}\n".format(contig_name)) | |
| 128 ffasta.write("{}\n".format(consensus)) | |
| 129 fprofile.write(">{}\n".format(contig_name)) | |
| 130 fprofile.write("{}\n".format(" ".join([str(i) for i in coverage]))) | |
| 131 else: | |
| 132 break | |
| 36 | 133 |
| 37 | 134 |
| 38 if __name__ == "__main__": | 135 if __name__ == "__main__": |
| 39 | 136 |
| 40 args = parse_args() | 137 main() |
| 41 print(args.profile) | |
| 42 | |
| 43 |
