view mut2sscs.py @ 89:1a5974404d4f draft

planemo upload for repository https://github.com/Single-Molecule-Genetics/VariantAnalyzerGalaxy/tree/master/tools/variant_analyzer commit ee4a8e6cf290e6c8a4d55f9cd2839d60ab3b11c8-dirty
author mheinzl
date Tue, 25 Apr 2023 17:06:38 +0000
parents fdfe9a919ff7
children
line wrap: on
line source

#!/usr/bin/env python

"""mut2sscs.py

Author -- Gundula Povysil
Contact -- povysil@bioinf.jku.at

Takes a tabular file with mutations from DCS and a BAM file of SSCS as input
and extracts all tags of reads that carry the mutation.
Calculates statistics about number of ab/ba/duplex per mutation.

=======  ==========  =================  ================================
Version  Date        Author             Description
0.2.1    2019-10-27  Gundula Povysil    -
=======  ==========  =================  ================================

USAGE: python mut2sscs.py DCS_Mutations.tabular SSCS.bam SSCS_counts.json

"""

from __future__ import division

import argparse
import json
import os
import re
import sys

import numpy as np
import pysam
from cyvcf2 import VCF


def make_argparser():
    parser = argparse.ArgumentParser(description='Takes a vcf file with mutations and a BAM file as input and prints all tags of reads that carry the mutation to a user specified output file.')
    parser.add_argument('--mutFile',
                        help='VCR file with DCS mutations.')
    parser.add_argument('--bamFile',
                        help='BAM file with aligned SSCS reads.')
    parser.add_argument('--outputJson',
                        help='Output JSON file to store SSCS counts.')
    return parser


def mut2sscs(argv):
    parser = make_argparser()
    args = parser.parse_args(argv[1:])

    file1 = args.mutFile
    file2 = args.bamFile
    sscs_counts_json = args.outputJson

    if os.path.isfile(file1) is False:
        sys.exit("Error: Could not find '{}'".format(file1))

    if os.path.isfile(file2) is False:
        sys.exit("Error: Could not find '{}'".format(file2))

    # read SSCS bam file
#    pysam.index(file2)
    bam = pysam.AlignmentFile(file2, "rb")

    # get tags
    mut_pos_dict = {}
    ref_pos_dict = {}

    for variant in VCF(file1):
        chrom = variant.CHROM
        stop_pos = variant.start
        ref = variant.REF
        if len(variant.ALT) == 0:
            continue
        else:
            alt = variant.ALT[0]
        alt = alt.upper()
        ref = ref.upper()
        if "N" in alt:  # skip indels with N in alt allele --> it is not an indel but just a mismatch at the position where the N is (checked this in IGV)
            continue
        chrom_stop_pos = str(chrom) + "#" + str(stop_pos) + "#" + ref + "#" + alt
        for pileupcolumn in bam.pileup(chrom, stop_pos - 1, stop_pos + 1, max_depth=1000000000):
            if pileupcolumn.reference_pos == stop_pos:
                count_alt = 0
                count_ref = 0
                count_indel = 0
                for pileupread in pileupcolumn.pileups:
                    if not pileupread.is_refskip:
                        tag = pileupread.alignment.query_name
                        abba = tag[-2:]
                        if pileupread.is_del:
                            p = pileupread.query_position_or_next
                            e = p + len(alt) - 1
                        else:
                            p = pileupread.query_position
                            e = p + len(alt)
                        tag = pileupread.alignment.query_name
                        if "_" in tag:
                            tag = re.split('_', tag)[0]
                        s = p
                        split_cigar = re.split('(\d+)', pileupread.alignment.cigarstring)
                        if len(ref) < len(alt):  # insertion
                            if "I" in split_cigar:
                                all_insertions = [inser_i for inser_i, ins in enumerate(split_cigar) if ins == "I"]
                                for ai in all_insertions:  # if multiple insertions in DCS
                                    ins_index = [int(ci) for ci in split_cigar[:ai - 1] if ci.isdigit()]
                                    ins_count = split_cigar[ai - 1]  # nr of insertions should match with alt allele
                                    if "I" in split_cigar and sum(ins_index) == p + 1 and len(alt) - 1 == int(ins_count):
                                        nuc = pileupread.alignment.query_sequence[s:e]
                                        break
                                    else:
                                        nuc = pileupread.alignment.query_sequence[s]
                            else:
                                nuc = pileupread.alignment.query_sequence[s]
                        elif len(ref) > len(alt):  # deletion
                            ref_positions = pileupread.alignment.get_reference_positions(full_length=True)[s:p + len(ref)]
                            if "D" in split_cigar:
                                all_deletions = [del_i for del_i, dele in enumerate(split_cigar) if dele == "D"]
                                for di, ai in enumerate(all_deletions):  # if multiple insertions in DCS
                                    if di > 0:  # more than 1 deletion, don't count previous deletion to position
                                        all_deletions_mod = split_cigar[:ai - 1]
                                        prev_del_idx = [all_deletions_mod.index("D") - 1, all_deletions_mod.index("D")]
                                        split_cigar_no_prev = [ad for i, ad in enumerate(all_deletions_mod) if i not in prev_del_idx]
                                        del_index = [int(ci) for ci in split_cigar_no_prev[:ai - 1] if ci.isdigit()]
                                    else:  # first deletion in read, sum all previous (mis)matches and insertions to position
                                        del_index = [int(ci) for ci in split_cigar[:ai - 1] if ci.isdigit()]
                                    del_count = split_cigar[ai - 1]  # nr of deletions should match with ref allele
                                    if "D" in split_cigar and sum(del_index) == p + 1 and len(ref) - 1 == int(del_count):
                                        nuc = pileupread.alignment.query_sequence[s:e]
                                        if nuc == "":
                                            nuc = str(alt)
                                        break
                                    else:
                                        nuc = pileupread.alignment.query_sequence[s:s + len(ref)]
                            elif len(ref_positions) < len(ref):  # DCS has reference but the position is at the very end of the DCS and therefore not the full reference positions are there
                                nuc = pileupread.alignment.get_reference_sequence()[s:s + len(ref)]
                                if nuc.upper() == ref[:len(nuc)]:
                                    nuc = str(ref)
                            else:
                                nuc = pileupread.alignment.query_sequence[s:s + len(ref)]
                        else:  # SNV: query position is None if is_del or is_refskip is set.
                            nuc = pileupread.alignment.query_sequence[s]

                        nuc = nuc.upper()

                        if nuc == alt:
                            count_alt += 1
                            if chrom_stop_pos in mut_pos_dict:
                                if abba in mut_pos_dict[chrom_stop_pos]:
                                    mut_pos_dict[chrom_stop_pos][abba] += 1
                                else:
                                    mut_pos_dict[chrom_stop_pos][abba] = 1
                            else:
                                mut_pos_dict[chrom_stop_pos] = {}
                                mut_pos_dict[chrom_stop_pos][abba] = 1
                            if chrom_stop_pos not in ref_pos_dict:
                                ref_pos_dict[chrom_stop_pos] = {}
                                ref_pos_dict[chrom_stop_pos][abba] = 0
                        elif nuc == ref:
                            count_ref += 1
                            if chrom_stop_pos in ref_pos_dict:
                                if abba in ref_pos_dict[chrom_stop_pos]:
                                    ref_pos_dict[chrom_stop_pos][abba] += 1
                                else:
                                    ref_pos_dict[chrom_stop_pos][abba] = 1
                            else:
                                ref_pos_dict[chrom_stop_pos] = {}
                                ref_pos_dict[chrom_stop_pos][abba] = 1
                        else:
                            count_indel += 1
                print("coverage at pos %s = %s, ref = %s, alt = %s, other = %s,\n" %
                      (pileupcolumn.pos, count_ref + count_alt, count_ref, count_alt, count_indel))

        # if mutation is in DCS file but not in SSCS, then set counts to NA
        if chrom_stop_pos not in mut_pos_dict.keys():
            mut_pos_dict[chrom_stop_pos] = {}
            mut_pos_dict[chrom_stop_pos]["ab"] = 0
            mut_pos_dict[chrom_stop_pos]["ba"] = 0
            ref_pos_dict[chrom_stop_pos] = {}
            ref_pos_dict[chrom_stop_pos]["ab"] = 0
            ref_pos_dict[chrom_stop_pos]["ba"] = 0
    bam.close()

    # save counts
    with open(sscs_counts_json, "w") as f:
        json.dump((mut_pos_dict, ref_pos_dict), f)


if __name__ == '__main__':
    sys.exit(mut2sscs(sys.argv))