Mercurial > repos > iuc > ivar_consensus
comparison ivar_variants_to_vcf.py @ 6:3bc2ef3d4a17 draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ivar/ commit 847ec10cd36ea4f3cd4c257d5742f0fb401e364e"
author | iuc |
---|---|
date | Thu, 10 Jun 2021 22:07:43 +0000 |
parents | |
children | 17f911830a8c |
comparison
equal
deleted
inserted
replaced
5:5e668dc9f379 | 6:3bc2ef3d4a17 |
---|---|
1 #!/usr/bin/env python | |
2 import argparse | |
3 import errno | |
4 import os | |
5 import re | |
6 import sys | |
7 | |
8 | |
9 def parse_args(args=None): | |
10 Description = "Convert iVar variants tsv file to vcf format." | |
11 Epilog = """Example usage: python ivar_variants_to_vcf.py <FILE_IN> <FILE_OUT>""" | |
12 | |
13 parser = argparse.ArgumentParser(description=Description, epilog=Epilog) | |
14 parser.add_argument("FILE_IN", help="Input tsv file.") | |
15 parser.add_argument("FILE_OUT", help="Full path to output vcf file.") | |
16 parser.add_argument( | |
17 "-po", | |
18 "--pass_only", | |
19 dest="PASS_ONLY", | |
20 help="Only output variants that PASS all filters.", | |
21 action="store_true", | |
22 ) | |
23 | |
24 return parser.parse_args(args) | |
25 | |
26 | |
27 def make_dir(path): | |
28 if not len(path) == 0: | |
29 try: | |
30 os.makedirs(path) | |
31 except OSError as exception: | |
32 if exception.errno != errno.EEXIST: | |
33 raise | |
34 | |
35 | |
36 def info_line(info_keys, kv): | |
37 info_strings = [] | |
38 for key in info_keys: | |
39 if key not in kv: | |
40 raise KeyError( | |
41 'Expected key {} missing from INFO field key value pairs' | |
42 .format(key) | |
43 ) | |
44 if kv[key] is False: | |
45 # a FLAG element, which should not be set | |
46 continue | |
47 if kv[key] is True: | |
48 # a FLAG element => write the key only | |
49 info_strings.append(key) | |
50 else: | |
51 info_strings.append('{}={}'.format(key, kv[key])) | |
52 return ';'.join(info_strings) | |
53 | |
54 | |
55 def ivar_variants_to_vcf(FileIn, FileOut, passOnly=False): | |
56 filename = os.path.splitext(FileIn)[0] | |
57 header = ( | |
58 "##fileformat=VCFv4.2\n" | |
59 "##source=iVar\n" | |
60 '##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">\n' | |
61 '##INFO=<ID=REF_DP,Number=1,Type=Integer,Description="Depth of reference base">\n' | |
62 '##INFO=<ID=REF_RV,Number=1,Type=Integer,Description="Depth of reference base on reverse reads">\n' | |
63 '##INFO=<ID=REF_QUAL,Number=1,Type=Integer,Description="Mean quality of reference base">\n' | |
64 '##INFO=<ID=ALT_DP,Number=1,Type=Integer,Description="Depth of alternate base">\n' | |
65 '##INFO=<ID=ALT_RV,Number=1,Type=Integer,Description="Deapth of alternate base on reverse reads">\n' | |
66 '##INFO=<ID=ALT_QUAL,Number=1,Type=Integer,Description="Mean quality of alternate base">\n' | |
67 '##INFO=<ID=AF,Number=1,Type=Float,Description="Frequency of alternate base">\n' | |
68 '##INFO=<ID=INDEL,Number=0,Type=Flag,Description="Indicates that the variant is an INDEL.">\n' | |
69 '##FILTER=<ID=PASS,Description="Result of p-value <= 0.05">\n' | |
70 '##FILTER=<ID=FAIL,Description="Result of p-value > 0.05">\n' | |
71 ) | |
72 info_keys = [ | |
73 re.match(r'##INFO=<ID=([^,]+),', line).group(1) | |
74 for line in header.splitlines() | |
75 if line.startswith('##INFO=') | |
76 ] | |
77 header += ( | |
78 "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n" | |
79 ) | |
80 | |
81 vars_seen = set() | |
82 varCountDict = {"SNP": 0, "INS": 0, "DEL": 0} | |
83 OutDir = os.path.dirname(FileOut) | |
84 make_dir(OutDir) | |
85 with open(FileIn) as f, open(FileOut, "w") as fout: | |
86 fout.write(header) | |
87 for line in f: | |
88 if line.startswith("REGION"): | |
89 continue | |
90 | |
91 line = line.split("\t") | |
92 CHROM = line[0] | |
93 POS = line[1] | |
94 ID = "." | |
95 REF = line[2] | |
96 ALT = line[3] | |
97 if ALT[0] == "+": | |
98 ALT = REF + ALT[1:] | |
99 var_type = "INS" | |
100 elif ALT[0] == "-": | |
101 REF += ALT[1:] | |
102 ALT = line[2] | |
103 var_type = "DEL" | |
104 else: | |
105 var_type = "SNP" | |
106 QUAL = "." | |
107 pass_test = line[13] | |
108 if pass_test == "TRUE": | |
109 FILTER = "PASS" | |
110 else: | |
111 FILTER = "FAIL" | |
112 | |
113 if (passOnly and FILTER != "PASS"): | |
114 continue | |
115 var = (CHROM, POS, REF, ALT) | |
116 if var in vars_seen: | |
117 continue | |
118 | |
119 info_elements = { | |
120 'DP': line[11], | |
121 'REF_DP': line[4], | |
122 'REF_RV': line[5], | |
123 'REF_QUAL': line[6], | |
124 'ALT_DP': line[7], | |
125 'ALT_RV': line[8], | |
126 'ALT_QUAL': line[9], | |
127 'AF': line[10] | |
128 } | |
129 if var_type in ['INS', 'DEL']: | |
130 # add INDEL FLAG | |
131 info_elements['INDEL'] = True | |
132 else: | |
133 info_elements['INDEL'] = False | |
134 INFO = info_line(info_keys, info_elements) | |
135 | |
136 vars_seen.add(var) | |
137 varCountDict[var_type] += 1 | |
138 fout.write( | |
139 '\t'.join( | |
140 [CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO] | |
141 ) + '\n' | |
142 ) | |
143 | |
144 # Print variant counts to pass to MultiQC | |
145 varCountList = [(k, str(v)) for k, v in sorted(varCountDict.items())] | |
146 print("\t".join(["sample"] + [x[0] for x in varCountList])) | |
147 print("\t".join([filename] + [x[1] for x in varCountList])) | |
148 | |
149 | |
150 def main(args=None): | |
151 args = parse_args(args) | |
152 ivar_variants_to_vcf( | |
153 args.FILE_IN, args.FILE_OUT, args.PASS_ONLY | |
154 ) | |
155 | |
156 | |
157 if __name__ == "__main__": | |
158 sys.exit(main()) |