Mercurial > repos > iuc > ivar_consensus
view ivar_variants_to_vcf.py @ 6:3bc2ef3d4a17 draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ivar/ commit 847ec10cd36ea4f3cd4c257d5742f0fb401e364e"
author | iuc |
---|---|
date | Thu, 10 Jun 2021 22:07:43 +0000 |
parents | |
children | 17f911830a8c |
line wrap: on
line source
#!/usr/bin/env python import argparse import errno import os import re import sys def parse_args(args=None): Description = "Convert iVar variants tsv file to vcf format." Epilog = """Example usage: python ivar_variants_to_vcf.py <FILE_IN> <FILE_OUT>""" parser = argparse.ArgumentParser(description=Description, epilog=Epilog) parser.add_argument("FILE_IN", help="Input tsv file.") parser.add_argument("FILE_OUT", help="Full path to output vcf file.") parser.add_argument( "-po", "--pass_only", dest="PASS_ONLY", help="Only output variants that PASS all filters.", action="store_true", ) return parser.parse_args(args) def make_dir(path): if not len(path) == 0: try: os.makedirs(path) except OSError as exception: if exception.errno != errno.EEXIST: raise def info_line(info_keys, kv): info_strings = [] for key in info_keys: if key not in kv: raise KeyError( 'Expected key {} missing from INFO field key value pairs' .format(key) ) if kv[key] is False: # a FLAG element, which should not be set continue if kv[key] is True: # a FLAG element => write the key only info_strings.append(key) else: info_strings.append('{}={}'.format(key, kv[key])) return ';'.join(info_strings) def ivar_variants_to_vcf(FileIn, FileOut, passOnly=False): filename = os.path.splitext(FileIn)[0] header = ( "##fileformat=VCFv4.2\n" "##source=iVar\n" '##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">\n' '##INFO=<ID=REF_DP,Number=1,Type=Integer,Description="Depth of reference base">\n' '##INFO=<ID=REF_RV,Number=1,Type=Integer,Description="Depth of reference base on reverse reads">\n' '##INFO=<ID=REF_QUAL,Number=1,Type=Integer,Description="Mean quality of reference base">\n' '##INFO=<ID=ALT_DP,Number=1,Type=Integer,Description="Depth of alternate base">\n' '##INFO=<ID=ALT_RV,Number=1,Type=Integer,Description="Deapth of alternate base on reverse reads">\n' '##INFO=<ID=ALT_QUAL,Number=1,Type=Integer,Description="Mean quality of alternate base">\n' '##INFO=<ID=AF,Number=1,Type=Float,Description="Frequency of alternate base">\n' '##INFO=<ID=INDEL,Number=0,Type=Flag,Description="Indicates that the variant is an INDEL.">\n' '##FILTER=<ID=PASS,Description="Result of p-value <= 0.05">\n' '##FILTER=<ID=FAIL,Description="Result of p-value > 0.05">\n' ) info_keys = [ re.match(r'##INFO=<ID=([^,]+),', line).group(1) for line in header.splitlines() if line.startswith('##INFO=') ] header += ( "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n" ) vars_seen = set() varCountDict = {"SNP": 0, "INS": 0, "DEL": 0} OutDir = os.path.dirname(FileOut) make_dir(OutDir) with open(FileIn) as f, open(FileOut, "w") as fout: fout.write(header) for line in f: if line.startswith("REGION"): continue line = line.split("\t") CHROM = line[0] POS = line[1] ID = "." REF = line[2] ALT = line[3] if ALT[0] == "+": ALT = REF + ALT[1:] var_type = "INS" elif ALT[0] == "-": REF += ALT[1:] ALT = line[2] var_type = "DEL" else: var_type = "SNP" QUAL = "." pass_test = line[13] if pass_test == "TRUE": FILTER = "PASS" else: FILTER = "FAIL" if (passOnly and FILTER != "PASS"): continue var = (CHROM, POS, REF, ALT) if var in vars_seen: continue info_elements = { 'DP': line[11], 'REF_DP': line[4], 'REF_RV': line[5], 'REF_QUAL': line[6], 'ALT_DP': line[7], 'ALT_RV': line[8], 'ALT_QUAL': line[9], 'AF': line[10] } if var_type in ['INS', 'DEL']: # add INDEL FLAG info_elements['INDEL'] = True else: info_elements['INDEL'] = False INFO = info_line(info_keys, info_elements) vars_seen.add(var) varCountDict[var_type] += 1 fout.write( '\t'.join( [CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO] ) + '\n' ) # Print variant counts to pass to MultiQC varCountList = [(k, str(v)) for k, v in sorted(varCountDict.items())] print("\t".join(["sample"] + [x[0] for x in varCountList])) print("\t".join([filename] + [x[1] for x in varCountList])) def main(args=None): args = parse_args(args) ivar_variants_to_vcf( args.FILE_IN, args.FILE_OUT, args.PASS_ONLY ) if __name__ == "__main__": sys.exit(main())