Mercurial > repos > mheinzl > variant_analyzer2
view mut2sscs.py @ 88:63e4e5d9a98f draft
planemo upload for repository https://github.com/Single-Molecule-Genetics/VariantAnalyzerGalaxy/tree/master/tools/variant_analyzer commit ee4a8e6cf290e6c8a4d55f9cd2839d60ab3b11c8-dirty
| author | mheinzl | 
|---|---|
| date | Sat, 22 Apr 2023 10:06:51 +0000 | 
| parents | fdfe9a919ff7 | 
| children | 
line wrap: on
 line source
#!/usr/bin/env python """mut2sscs.py Author -- Gundula Povysil Contact -- povysil@bioinf.jku.at Takes a tabular file with mutations from DCS and a BAM file of SSCS as input and extracts all tags of reads that carry the mutation. Calculates statistics about number of ab/ba/duplex per mutation. ======= ========== ================= ================================ Version Date Author Description 0.2.1 2019-10-27 Gundula Povysil - ======= ========== ================= ================================ USAGE: python mut2sscs.py DCS_Mutations.tabular SSCS.bam SSCS_counts.json """ from __future__ import division import argparse import json import os import re import sys import numpy as np import pysam from cyvcf2 import VCF def make_argparser(): parser = argparse.ArgumentParser(description='Takes a vcf file with mutations and a BAM file as input and prints all tags of reads that carry the mutation to a user specified output file.') parser.add_argument('--mutFile', help='VCR file with DCS mutations.') parser.add_argument('--bamFile', help='BAM file with aligned SSCS reads.') parser.add_argument('--outputJson', help='Output JSON file to store SSCS counts.') return parser def mut2sscs(argv): parser = make_argparser() args = parser.parse_args(argv[1:]) file1 = args.mutFile file2 = args.bamFile sscs_counts_json = args.outputJson if os.path.isfile(file1) is False: sys.exit("Error: Could not find '{}'".format(file1)) if os.path.isfile(file2) is False: sys.exit("Error: Could not find '{}'".format(file2)) # read SSCS bam file # pysam.index(file2) bam = pysam.AlignmentFile(file2, "rb") # get tags mut_pos_dict = {} ref_pos_dict = {} for variant in VCF(file1): chrom = variant.CHROM stop_pos = variant.start ref = variant.REF if len(variant.ALT) == 0: continue else: alt = variant.ALT[0] alt = alt.upper() ref = ref.upper() if "N" in alt: # skip indels with N in alt allele --> it is not an indel but just a mismatch at the position where the N is (checked this in IGV) continue chrom_stop_pos = str(chrom) + "#" + str(stop_pos) + "#" + ref + "#" + alt for pileupcolumn in bam.pileup(chrom, stop_pos - 1, stop_pos + 1, max_depth=1000000000): if pileupcolumn.reference_pos == stop_pos: count_alt = 0 count_ref = 0 count_indel = 0 for pileupread in pileupcolumn.pileups: if not pileupread.is_refskip: tag = pileupread.alignment.query_name abba = tag[-2:] if pileupread.is_del: p = pileupread.query_position_or_next e = p + len(alt) - 1 else: p = pileupread.query_position e = p + len(alt) tag = pileupread.alignment.query_name if "_" in tag: tag = re.split('_', tag)[0] s = p split_cigar = re.split('(\d+)', pileupread.alignment.cigarstring) if len(ref) < len(alt): # insertion if "I" in split_cigar: all_insertions = [inser_i for inser_i, ins in enumerate(split_cigar) if ins == "I"] for ai in all_insertions: # if multiple insertions in DCS ins_index = [int(ci) for ci in split_cigar[:ai - 1] if ci.isdigit()] ins_count = split_cigar[ai - 1] # nr of insertions should match with alt allele if "I" in split_cigar and sum(ins_index) == p + 1 and len(alt) - 1 == int(ins_count): nuc = pileupread.alignment.query_sequence[s:e] break else: nuc = pileupread.alignment.query_sequence[s] else: nuc = pileupread.alignment.query_sequence[s] elif len(ref) > len(alt): # deletion ref_positions = pileupread.alignment.get_reference_positions(full_length=True)[s:p + len(ref)] if "D" in split_cigar: all_deletions = [del_i for del_i, dele in enumerate(split_cigar) if dele == "D"] for di, ai in enumerate(all_deletions): # if multiple insertions in DCS if di > 0: # more than 1 deletion, don't count previous deletion to position all_deletions_mod = split_cigar[:ai - 1] prev_del_idx = [all_deletions_mod.index("D") - 1, all_deletions_mod.index("D")] split_cigar_no_prev = [ad for i, ad in enumerate(all_deletions_mod) if i not in prev_del_idx] del_index = [int(ci) for ci in split_cigar_no_prev[:ai - 1] if ci.isdigit()] else: # first deletion in read, sum all previous (mis)matches and insertions to position del_index = [int(ci) for ci in split_cigar[:ai - 1] if ci.isdigit()] del_count = split_cigar[ai - 1] # nr of deletions should match with ref allele if "D" in split_cigar and sum(del_index) == p + 1 and len(ref) - 1 == int(del_count): nuc = pileupread.alignment.query_sequence[s:e] if nuc == "": nuc = str(alt) break else: nuc = pileupread.alignment.query_sequence[s:s + len(ref)] elif len(ref_positions) < len(ref): # DCS has reference but the position is at the very end of the DCS and therefore not the full reference positions are there nuc = pileupread.alignment.get_reference_sequence()[s:s + len(ref)] if nuc.upper() == ref[:len(nuc)]: nuc = str(ref) else: nuc = pileupread.alignment.query_sequence[s:s + len(ref)] else: # SNV: query position is None if is_del or is_refskip is set. nuc = pileupread.alignment.query_sequence[s] nuc = nuc.upper() if nuc == alt: count_alt += 1 if chrom_stop_pos in mut_pos_dict: if abba in mut_pos_dict[chrom_stop_pos]: mut_pos_dict[chrom_stop_pos][abba] += 1 else: mut_pos_dict[chrom_stop_pos][abba] = 1 else: mut_pos_dict[chrom_stop_pos] = {} mut_pos_dict[chrom_stop_pos][abba] = 1 if chrom_stop_pos not in ref_pos_dict: ref_pos_dict[chrom_stop_pos] = {} ref_pos_dict[chrom_stop_pos][abba] = 0 elif nuc == ref: count_ref += 1 if chrom_stop_pos in ref_pos_dict: if abba in ref_pos_dict[chrom_stop_pos]: ref_pos_dict[chrom_stop_pos][abba] += 1 else: ref_pos_dict[chrom_stop_pos][abba] = 1 else: ref_pos_dict[chrom_stop_pos] = {} ref_pos_dict[chrom_stop_pos][abba] = 1 else: count_indel += 1 print("coverage at pos %s = %s, ref = %s, alt = %s, other = %s,\n" % (pileupcolumn.pos, count_ref + count_alt, count_ref, count_alt, count_indel)) # if mutation is in DCS file but not in SSCS, then set counts to NA if chrom_stop_pos not in mut_pos_dict.keys(): mut_pos_dict[chrom_stop_pos] = {} mut_pos_dict[chrom_stop_pos]["ab"] = 0 mut_pos_dict[chrom_stop_pos]["ba"] = 0 ref_pos_dict[chrom_stop_pos] = {} ref_pos_dict[chrom_stop_pos]["ab"] = 0 ref_pos_dict[chrom_stop_pos]["ba"] = 0 bam.close() # save counts with open(sscs_counts_json, "w") as f: json.dump((mut_pos_dict, ref_pos_dict), f) if __name__ == '__main__': sys.exit(mut2sscs(sys.argv))
