view ivar_variants_to_vcf.py @ 8:aea7008fe1f1 draft

"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ivar/ commit 6dae6f97a45a61b1f10be4227d978584624c3b3d"
author iuc
date Thu, 05 Aug 2021 12:44:05 +0000
parents 147465efa99c
children 045d6d00f606
line wrap: on
line source

#!/usr/bin/env python
import argparse
import errno
import os
import re
import sys


def parse_args(args=None):
    Description = "Convert iVar variants tsv file to vcf format."
    Epilog = """Example usage: python ivar_variants_to_vcf.py <FILE_IN> <FILE_OUT>"""

    parser = argparse.ArgumentParser(description=Description, epilog=Epilog)
    parser.add_argument("FILE_IN", help="Input tsv file.")
    parser.add_argument("FILE_OUT", help="Full path to output vcf file.")
    parser.add_argument(
        "-po",
        "--pass_only",
        dest="PASS_ONLY",
        help="Only output variants that PASS all filters.",
        action="store_true",
    )

    return parser.parse_args(args)


def make_dir(path):
    if not len(path) == 0:
        try:
            os.makedirs(path)
        except OSError as exception:
            if exception.errno != errno.EEXIST:
                raise


def info_line(info_keys, kv):
    info_strings = []
    for key in info_keys:
        if key not in kv:
            raise KeyError(
                'Expected key {} missing from INFO field key value pairs'
                .format(key)
            )
        if kv[key] is False:
            # a FLAG element, which should not be set
            continue
        if kv[key] is True:
            # a FLAG element => write the key only
            info_strings.append(key)
        else:
            info_strings.append('{}={}'.format(key, kv[key]))
    return ';'.join(info_strings)


def ivar_variants_to_vcf(FileIn, FileOut, passOnly=False):
    filename = os.path.splitext(FileIn)[0]
    header = (
        "##fileformat=VCFv4.2\n"
        "##source=iVar\n"
        '##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">\n'
        '##INFO=<ID=REF_DP,Number=1,Type=Integer,Description="Depth of reference base">\n'
        '##INFO=<ID=REF_RV,Number=1,Type=Integer,Description="Depth of reference base on reverse reads">\n'
        '##INFO=<ID=REF_QUAL,Number=1,Type=Integer,Description="Mean quality of reference base">\n'
        '##INFO=<ID=ALT_DP,Number=1,Type=Integer,Description="Depth of alternate base">\n'
        '##INFO=<ID=ALT_RV,Number=1,Type=Integer,Description="Deapth of alternate base on reverse reads">\n'
        '##INFO=<ID=ALT_QUAL,Number=1,Type=Integer,Description="Mean quality of alternate base">\n'
        '##INFO=<ID=AF,Number=1,Type=Float,Description="Frequency of alternate base">\n'
        '##INFO=<ID=INDEL,Number=0,Type=Flag,Description="Indicates that the variant is an INDEL.">\n'
        '##FILTER=<ID=PASS,Description="Result of p-value <= 0.05">\n'
        '##FILTER=<ID=FAIL,Description="Result of p-value > 0.05">\n'
    )
    info_keys = [
        re.match(r'##INFO=<ID=([^,]+),', line).group(1)
        for line in header.splitlines()
        if line.startswith('##INFO=')
    ]
    header += (
        "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n"
    )

    vars_seen = set()
    varCountDict = {"SNP": 0, "INS": 0, "DEL": 0}
    OutDir = os.path.dirname(FileOut)
    make_dir(OutDir)
    with open(FileIn) as f, open(FileOut, "w") as fout:
        fout.write(header)
        for line in f:
            if line.startswith("REGION"):
                continue

            line = line.split("\t")
            CHROM = line[0]
            POS = line[1]
            ID = "."
            REF = line[2]
            ALT = line[3]
            if ALT[0] == "+":
                ALT = REF + ALT[1:]
                var_type = "INS"
            elif ALT[0] == "-":
                REF += ALT[1:]
                ALT = line[2]
                var_type = "DEL"
            else:
                var_type = "SNP"
            QUAL = "."
            pass_test = line[13]
            if pass_test == "TRUE":
                FILTER = "PASS"
            else:
                FILTER = "FAIL"

            if (passOnly and FILTER != "PASS"):
                continue
            var = (CHROM, POS, REF, ALT)
            if var in vars_seen:
                continue

            info_elements = {
                'DP': line[11],
                'REF_DP': line[4],
                'REF_RV': line[5],
                'REF_QUAL': line[6],
                'ALT_DP': line[7],
                'ALT_RV': line[8],
                'ALT_QUAL': line[9],
                'AF': line[10]
            }
            if var_type in ['INS', 'DEL']:
                # add INDEL FLAG
                info_elements['INDEL'] = True
            else:
                info_elements['INDEL'] = False
            INFO = info_line(info_keys, info_elements)

            vars_seen.add(var)
            varCountDict[var_type] += 1
            fout.write(
                '\t'.join(
                    [CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO]
                ) + '\n'
            )

    # Print variant counts to pass to MultiQC
    varCountList = [(k, str(v)) for k, v in sorted(varCountDict.items())]
    print("\t".join(["sample"] + [x[0] for x in varCountList]))
    print("\t".join([filename] + [x[1] for x in varCountList]))


def main(args=None):
    args = parse_args(args)
    ivar_variants_to_vcf(
        args.FILE_IN, args.FILE_OUT, args.PASS_ONLY
    )


if __name__ == "__main__":
    sys.exit(main())