Mercurial > repos > iuc > medaka_variant_pipeline
diff convert_VCF_info_fields.py @ 13:3fbefde449bc draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/medaka commit 52289bc7b99bfa8a3bda46cb35cea98399419dab"
author | iuc |
---|---|
date | Thu, 18 Nov 2021 20:01:04 +0000 |
parents | 597407d61386 |
children |
line wrap: on
line diff
--- a/convert_VCF_info_fields.py Fri Sep 17 20:22:27 2021 +0000 +++ b/convert_VCF_info_fields.py Thu Nov 18 20:01:04 2021 +0000 @@ -24,10 +24,10 @@ def parseInfoField(info): - info_fields = info.split(';') + info_fields = info.split(";") info_dict = OrderedDict() for info_field in info_fields: - code, val = info_field.split('=') + code, val = info_field.split("=") info_dict[code] = val return info_dict @@ -40,7 +40,7 @@ and multiple alternate alleles with simple ref, alt allele counterparts. """ - in_vcf = open(in_vcf_filepath, 'r') + in_vcf = open(in_vcf_filepath, "r") # medaka INFO fields that do not make sense after splitting of # multi-allelic records # DP will be overwritten with the value of DPSP because medaka tools @@ -48,8 +48,8 @@ # (https://github.com/nanoporetech/medaka/issues/192). # DPS, which is as unreliable as DP, gets skipped and the code # calculates the spanning reads equivalent DPSPS instead. - to_skip = {'SC', 'SR', 'AR', 'DP', 'DPSP', 'DPS'} - struct_meta_pat = re.compile('##(.+)=<ID=([^,]+)(,.+)?>') + to_skip = {"SC", "SR", "AR", "DP", "DPSP", "DPS"} + struct_meta_pat = re.compile("##(.+)=<ID=([^,]+)(,.+)?>") header_lines = [] contig_ids = set() contig_ids_simple = set() @@ -59,8 +59,8 @@ # - redundant contig information while True: line = in_vcf.readline() - if line[:2] != '##': - assert line.startswith('#CHROM') + if line[:2] != "##": + assert line.startswith("#CHROM") break if line in header_lines: # the annotate tool may generate lines already written by @@ -69,12 +69,12 @@ match = struct_meta_pat.match(line) if match: match_type, match_id, match_misc = match.groups() - if match_type == 'INFO': - if match_id == 'DPSP': - line = line.replace('DPSP', 'DP') + if match_type == "INFO": + if match_id == "DPSP": + line = line.replace("DPSP", "DP") elif match_id in to_skip: continue - elif match_type == 'contig': + elif match_type == "contig": contig_ids.add(match_id) if not match_misc: # the annotate tools writes its own contig info, @@ -87,7 +87,7 @@ # Lets check the above assumption about each ID-only contig line # having a more complete counterpart. assert not (contig_ids_simple - contig_ids) - header_lines.insert(1, '##convert_VCF_info_fields=0.2\n') + header_lines.insert(1, "##convert_VCF_info_fields=0.2\n") header_lines += [ '##INFO=<ID=DPSPS,Number=2,Type=Integer,Description="Depth of spanning reads by strand">\n', '##INFO=<ID=AF,Number=1,Type=Float,Description="Spanning Reads Allele Frequency">\n', @@ -96,47 +96,34 @@ '##INFO=<ID=SB,Number=1,Type=Integer,Description="Phred-scaled strand bias of spanning reads at this position">\n', '##INFO=<ID=DP4,Number=4,Type=Integer,Description="Counts for ref-forward bases, ref-reverse, alt-forward and alt-reverse bases in spanning reads">\n', '##INFO=<ID=AS,Number=4,Type=Integer,Description="Total alignment score to ref and alt allele of spanning reads by strand (ref fwd, ref rev, alt fwd, alt rev) aligned with parasail match 5, mismatch -4, open 5, extend 3">\n', - line + line, ] - with open(out_vcf_filepath, 'w') as out_vcf: + with open(out_vcf_filepath, "w") as out_vcf: out_vcf.writelines(header_lines) for line in in_vcf: - fields = line.split('\t') + fields = line.split("\t") info_dict = parseInfoField(fields[7]) - sr_list = [int(x) for x in info_dict["SR"].split(',')] - sc_list = [int(x) for x in info_dict["SC"].split(',')] + sr_list = [int(x) for x in info_dict["SR"].split(",")] + sc_list = [int(x) for x in info_dict["SC"].split(",")] if len(sr_list) != len(sc_list): - print( - 'WARNING - SR and SC are different lengths, ' - 'skipping variant' - ) + print("WARNING - SR and SC are different lengths, " "skipping variant") print(line.strip()) # Print the line for debugging purposes continue - variant_list = fields[4].split(',') - dpsp = int(info_dict['DPSP']) + variant_list = fields[4].split(",") + dpsp = int(info_dict["DPSP"]) ref_fwd, ref_rev = 0, 1 - dpspf, dpspr = (int(x) for x in info_dict['AR'].split(',')) + dpspf, dpspr = (int(x) for x in info_dict["AR"].split(",")) for i in range(0, len(sr_list), 2): dpspf += sr_list[i] dpspr += sr_list[i + 1] for j, i in enumerate(range(2, len(sr_list), 2)): - dp4 = ( - sr_list[ref_fwd], - sr_list[ref_rev], - sr_list[i], - sr_list[i + 1] - ) + dp4 = (sr_list[ref_fwd], sr_list[ref_rev], sr_list[i], sr_list[i + 1]) dp2x2 = [[dp4[0], dp4[1]], [dp4[2], dp4[3]]] _, p_val = scipy.stats.fisher_exact(dp2x2) sb = pval_to_phredqual(p_val) - as_ = ( - sc_list[ref_fwd], - sc_list[ref_rev], - sc_list[i], - sc_list[i + 1] - ) + as_ = (sc_list[ref_fwd], sc_list[ref_rev], sc_list[i], sc_list[i + 1]) info = [] for code in info_dict: @@ -145,31 +132,31 @@ val = info_dict[code] info.append("%s=%s" % (code, val)) - info.append('DP=%d' % dpsp) - info.append('DPSPS=%d,%d' % (dpspf, dpspr)) + info.append("DP=%d" % dpsp) + info.append("DPSPS=%d,%d" % (dpspf, dpspr)) if dpsp == 0: - info.append('AF=NaN') + info.append("AF=NaN") else: af = (dp4[2] + dp4[3]) / dpsp - info.append('AF=%.6f' % af) + info.append("AF=%.6f" % af) if dpspf == 0: - info.append('FAF=NaN') + info.append("FAF=NaN") else: faf = dp4[2] / dpspf - info.append('FAF=%.6f' % faf) + info.append("FAF=%.6f" % faf) if dpspr == 0: - info.append('RAF=NaN') + info.append("RAF=NaN") else: raf = dp4[3] / dpspr - info.append('RAF=%.6f' % raf) - info.append('SB=%d' % sb) - info.append('DP4=%d,%d,%d,%d' % dp4) - info.append('AS=%d,%d,%d,%d' % as_) - new_info = ';'.join(info) + info.append("RAF=%.6f" % raf) + info.append("SB=%d" % sb) + info.append("DP4=%d,%d,%d,%d" % dp4) + info.append("AS=%d,%d,%d,%d" % as_) + new_info = ";".join(info) fields[4] = variant_list[j] fields[7] = new_info - out_vcf.write('\t'.join(fields)) + out_vcf.write("\t".join(fields)) in_vcf.close()