view convert_VCF_info_fields.py @ 14:b2aae698b9d3 draft

"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/medaka commit 52289bc7b99bfa8a3bda46cb35cea98399419dab"
author iuc
date Thu, 18 Nov 2021 20:01:50 +0000
parents fa11aa8103b2
children
line wrap: on
line source

#!/usr/bin/env python3

# Takes in VCF file annotated with medaka tools annotate and converts
#
# Usage statement:
# python convert_VCF_info_fields.py in_vcf.vcf out_vcf.vcf

# 10/21/2020 - Nathan P. Roach, natproach@gmail.com

import re
import sys
from collections import OrderedDict
from math import log10

import scipy.stats


def pval_to_phredqual(pval):
    try:
        ret = round(-10 * log10(pval))
    except ValueError:
        ret = 2147483647  # transform pval of 0.0 to max signed 32 bit int
    return ret


def parseInfoField(info):
    info_fields = info.split(";")
    info_dict = OrderedDict()
    for info_field in info_fields:
        code, val = info_field.split("=")
        info_dict[code] = val
    return info_dict


def annotateVCF(in_vcf_filepath, out_vcf_filepath):
    """Postprocess output of medaka tools annotate.

    Splits multiallelic sites into separate records.
    Replaces medaka INFO fields that might represent information of the ref
    and multiple alternate alleles with simple ref, alt allele counterparts.
    """

    in_vcf = open(in_vcf_filepath, "r")
    # medaka INFO fields that do not make sense after splitting of
    # multi-allelic records
    # DP will be overwritten with the value of DPSP because medaka tools
    # annotate currently only calculates the latter correctly
    # (https://github.com/nanoporetech/medaka/issues/192).
    # DPS, which is as unreliable as DP, gets skipped and the code
    # calculates the spanning reads equivalent DPSPS instead.
    to_skip = {"SC", "SR", "AR", "DP", "DPSP", "DPS"}
    struct_meta_pat = re.compile("##(.+)=<ID=([^,]+)(,.+)?>")
    header_lines = []
    contig_ids = set()
    contig_ids_simple = set()
    # parse the metadata lines of the input VCF and drop:
    # - duplicate lines
    # - INFO lines declaring keys we are not going to write
    # - redundant contig information
    while True:
        line = in_vcf.readline()
        if line[:2] != "##":
            assert line.startswith("#CHROM")
            break
        if line in header_lines:
            # the annotate tool may generate lines already written by
            # medaka variant again (example: medaka version line)
            continue
        match = struct_meta_pat.match(line)
        if match:
            match_type, match_id, match_misc = match.groups()
            if match_type == "INFO":
                if match_id == "DPSP":
                    line = line.replace("DPSP", "DP")
                elif match_id in to_skip:
                    continue
            elif match_type == "contig":
                contig_ids.add(match_id)
                if not match_misc:
                    # the annotate tools writes its own contig info,
                    # which is redundant with contig info generated by
                    # medaka variant, but lacks a length value.
                    # We don't need the incomplete line.
                    contig_ids_simple.add(match_id)
                    continue
        header_lines.append(line)
    # Lets check the above assumption about each ID-only contig line
    # having a more complete counterpart.
    assert not (contig_ids_simple - contig_ids)
    header_lines.insert(1, "##convert_VCF_info_fields=0.2\n")
    header_lines += [
        '##INFO=<ID=DPSPS,Number=2,Type=Integer,Description="Depth of spanning reads by strand">\n',
        '##INFO=<ID=AF,Number=1,Type=Float,Description="Spanning Reads Allele Frequency">\n',
        '##INFO=<ID=FAF,Number=1,Type=Float,Description="Forward Spanning Reads Allele Frequency">\n',
        '##INFO=<ID=RAF,Number=1,Type=Float,Description="Reverse Spanning Reads Allele Frequency">\n',
        '##INFO=<ID=SB,Number=1,Type=Integer,Description="Phred-scaled strand bias of spanning reads at this position">\n',
        '##INFO=<ID=DP4,Number=4,Type=Integer,Description="Counts for ref-forward bases, ref-reverse, alt-forward and alt-reverse bases in spanning reads">\n',
        '##INFO=<ID=AS,Number=4,Type=Integer,Description="Total alignment score to ref and alt allele of spanning reads by strand (ref fwd, ref rev, alt fwd, alt rev) aligned with parasail match 5, mismatch -4, open 5, extend 3">\n',
        line,
    ]

    with open(out_vcf_filepath, "w") as out_vcf:
        out_vcf.writelines(header_lines)
        for line in in_vcf:
            fields = line.split("\t")
            info_dict = parseInfoField(fields[7])
            sr_list = [int(x) for x in info_dict["SR"].split(",")]
            sc_list = [int(x) for x in info_dict["SC"].split(",")]
            if len(sr_list) != len(sc_list):
                print("WARNING - SR and SC are different lengths, " "skipping variant")
                print(line.strip())  # Print the line for debugging purposes
                continue
            variant_list = fields[4].split(",")
            dpsp = int(info_dict["DPSP"])
            ref_fwd, ref_rev = 0, 1
            dpspf, dpspr = (int(x) for x in info_dict["AR"].split(","))
            for i in range(0, len(sr_list), 2):
                dpspf += sr_list[i]
                dpspr += sr_list[i + 1]
            for j, i in enumerate(range(2, len(sr_list), 2)):
                dp4 = (sr_list[ref_fwd], sr_list[ref_rev], sr_list[i], sr_list[i + 1])
                dp2x2 = [[dp4[0], dp4[1]], [dp4[2], dp4[3]]]
                _, p_val = scipy.stats.fisher_exact(dp2x2)
                sb = pval_to_phredqual(p_val)

                as_ = (sc_list[ref_fwd], sc_list[ref_rev], sc_list[i], sc_list[i + 1])

                info = []
                for code in info_dict:
                    if code in to_skip:
                        continue
                    val = info_dict[code]
                    info.append("%s=%s" % (code, val))

                info.append("DP=%d" % dpsp)
                info.append("DPSPS=%d,%d" % (dpspf, dpspr))

                if dpsp == 0:
                    info.append("AF=NaN")
                else:
                    af = (dp4[2] + dp4[3]) / dpsp
                    info.append("AF=%.6f" % af)
                if dpspf == 0:
                    info.append("FAF=NaN")
                else:
                    faf = dp4[2] / dpspf
                    info.append("FAF=%.6f" % faf)
                if dpspr == 0:
                    info.append("RAF=NaN")
                else:
                    raf = dp4[3] / dpspr
                    info.append("RAF=%.6f" % raf)
                info.append("SB=%d" % sb)
                info.append("DP4=%d,%d,%d,%d" % dp4)
                info.append("AS=%d,%d,%d,%d" % as_)
                new_info = ";".join(info)
                fields[4] = variant_list[j]
                fields[7] = new_info
                out_vcf.write("\t".join(fields))
    in_vcf.close()


if __name__ == "__main__":
    annotateVCF(sys.argv[1], sys.argv[2])