Mercurial > repos > iuc > ivar_consensus
diff ivar_variants_to_vcf.py @ 6:3bc2ef3d4a17 draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ivar/ commit 847ec10cd36ea4f3cd4c257d5742f0fb401e364e"
author | iuc |
---|---|
date | Thu, 10 Jun 2021 22:07:43 +0000 |
parents | |
children | 17f911830a8c |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ivar_variants_to_vcf.py Thu Jun 10 22:07:43 2021 +0000 @@ -0,0 +1,158 @@ +#!/usr/bin/env python +import argparse +import errno +import os +import re +import sys + + +def parse_args(args=None): + Description = "Convert iVar variants tsv file to vcf format." + Epilog = """Example usage: python ivar_variants_to_vcf.py <FILE_IN> <FILE_OUT>""" + + parser = argparse.ArgumentParser(description=Description, epilog=Epilog) + parser.add_argument("FILE_IN", help="Input tsv file.") + parser.add_argument("FILE_OUT", help="Full path to output vcf file.") + parser.add_argument( + "-po", + "--pass_only", + dest="PASS_ONLY", + help="Only output variants that PASS all filters.", + action="store_true", + ) + + return parser.parse_args(args) + + +def make_dir(path): + if not len(path) == 0: + try: + os.makedirs(path) + except OSError as exception: + if exception.errno != errno.EEXIST: + raise + + +def info_line(info_keys, kv): + info_strings = [] + for key in info_keys: + if key not in kv: + raise KeyError( + 'Expected key {} missing from INFO field key value pairs' + .format(key) + ) + if kv[key] is False: + # a FLAG element, which should not be set + continue + if kv[key] is True: + # a FLAG element => write the key only + info_strings.append(key) + else: + info_strings.append('{}={}'.format(key, kv[key])) + return ';'.join(info_strings) + + +def ivar_variants_to_vcf(FileIn, FileOut, passOnly=False): + filename = os.path.splitext(FileIn)[0] + header = ( + "##fileformat=VCFv4.2\n" + "##source=iVar\n" + '##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">\n' + '##INFO=<ID=REF_DP,Number=1,Type=Integer,Description="Depth of reference base">\n' + '##INFO=<ID=REF_RV,Number=1,Type=Integer,Description="Depth of reference base on reverse reads">\n' + '##INFO=<ID=REF_QUAL,Number=1,Type=Integer,Description="Mean quality of reference base">\n' + '##INFO=<ID=ALT_DP,Number=1,Type=Integer,Description="Depth of alternate base">\n' + '##INFO=<ID=ALT_RV,Number=1,Type=Integer,Description="Deapth of alternate base on reverse reads">\n' + '##INFO=<ID=ALT_QUAL,Number=1,Type=Integer,Description="Mean quality of alternate base">\n' + '##INFO=<ID=AF,Number=1,Type=Float,Description="Frequency of alternate base">\n' + '##INFO=<ID=INDEL,Number=0,Type=Flag,Description="Indicates that the variant is an INDEL.">\n' + '##FILTER=<ID=PASS,Description="Result of p-value <= 0.05">\n' + '##FILTER=<ID=FAIL,Description="Result of p-value > 0.05">\n' + ) + info_keys = [ + re.match(r'##INFO=<ID=([^,]+),', line).group(1) + for line in header.splitlines() + if line.startswith('##INFO=') + ] + header += ( + "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n" + ) + + vars_seen = set() + varCountDict = {"SNP": 0, "INS": 0, "DEL": 0} + OutDir = os.path.dirname(FileOut) + make_dir(OutDir) + with open(FileIn) as f, open(FileOut, "w") as fout: + fout.write(header) + for line in f: + if line.startswith("REGION"): + continue + + line = line.split("\t") + CHROM = line[0] + POS = line[1] + ID = "." + REF = line[2] + ALT = line[3] + if ALT[0] == "+": + ALT = REF + ALT[1:] + var_type = "INS" + elif ALT[0] == "-": + REF += ALT[1:] + ALT = line[2] + var_type = "DEL" + else: + var_type = "SNP" + QUAL = "." + pass_test = line[13] + if pass_test == "TRUE": + FILTER = "PASS" + else: + FILTER = "FAIL" + + if (passOnly and FILTER != "PASS"): + continue + var = (CHROM, POS, REF, ALT) + if var in vars_seen: + continue + + info_elements = { + 'DP': line[11], + 'REF_DP': line[4], + 'REF_RV': line[5], + 'REF_QUAL': line[6], + 'ALT_DP': line[7], + 'ALT_RV': line[8], + 'ALT_QUAL': line[9], + 'AF': line[10] + } + if var_type in ['INS', 'DEL']: + # add INDEL FLAG + info_elements['INDEL'] = True + else: + info_elements['INDEL'] = False + INFO = info_line(info_keys, info_elements) + + vars_seen.add(var) + varCountDict[var_type] += 1 + fout.write( + '\t'.join( + [CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO] + ) + '\n' + ) + + # Print variant counts to pass to MultiQC + varCountList = [(k, str(v)) for k, v in sorted(varCountDict.items())] + print("\t".join(["sample"] + [x[0] for x in varCountList])) + print("\t".join([filename] + [x[1] for x in varCountList])) + + +def main(args=None): + args = parse_args(args) + ivar_variants_to_vcf( + args.FILE_IN, args.FILE_OUT, args.PASS_ONLY + ) + + +if __name__ == "__main__": + sys.exit(main())