Mercurial > repos > artbio > lumpy_smoove
diff vcf2hrdetect.py @ 11:5a326a6fa105 draft default tip
planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/main/tools/lumpy_smoove commit 8b10e8fc832f8ca7c32479e20d5edbd62088a3aa
author | artbio |
---|---|
date | Fri, 17 Oct 2025 17:21:17 +0000 |
parents | 7dcf61950215 |
children |
line wrap: on
line diff
--- a/vcf2hrdetect.py Wed Jan 24 19:26:57 2024 +0000 +++ b/vcf2hrdetect.py Fri Oct 17 17:21:17 2025 +0000 @@ -1,41 +1,119 @@ +#!/usr/bin/env python +import argparse +import re import sys -handle = open(sys.argv[1], 'r') -vcfdict = dict() -tabdict = dict() -for line in handle: - if line[0] == "#": - continue - else: - tabline = line[:-1].split("\t") - vcfdict[tabline[2]] = tabline -for id in vcfdict.keys(): - if "_1" in id: - newid = id[:-2] - pointbreak = vcfdict[id][4] - if "]" in pointbreak: - coordbreak = pointbreak.split("]")[1].split(":")[1] - chrom = pointbreak.split("]")[1].split(":")[0] - elif "[" in pointbreak: - coordbreak = pointbreak.split("[")[1].split(":")[1] - chrom = pointbreak.split("[")[1].split(":")[0] - if vcfdict[id][0] == chrom: - tabdict[newid] = [chrom, vcfdict[id][1], chrom, coordbreak, "INV"] - else: - tabdict[newid] = [vcfdict[id][0], vcfdict[id][1], - chrom, coordbreak, "TRA"] -for id in list(vcfdict): - if "_" in id: - del vcfdict[id] -for id in vcfdict.keys(): # only sv that are not of type TRA or INV - chr1 = vcfdict[id][0] - chr2 = vcfdict[id][0] - pos1 = vcfdict[id][1] - pos2 = vcfdict[id][7].split("END=")[1].split(";")[0] - type = vcfdict[id][7].split("SVTYPE=")[1].split(";")[0] - tabdict[id] = [chr1, pos1, chr2, pos2, type] -out = open(sys.argv[2], 'w') -out.write("chr1\tpos1\tchr2\tpos2\ttype\n") -for key in tabdict: - line = "\t".join(tabdict[key]) + "\n" - out.write(line) + +def create_arg_parser(): + """Creates and returns the argument parser.""" + parser = argparse.ArgumentParser( + description=( + "Convert a VCF file from lumpy-smoove to a tabular format " + "compatible with the HRDetect pipeline." + ) + ) + parser.add_argument( + 'vcf_file', + help='Path to the input VCF file.' + ) + parser.add_argument( + 'output_file', + help='Path to the output tabular file.' + ) + return parser + + +def parse_breakend_alt(alt_field): + """ + Parses the ALT field for a breakend and returns chromosome and position. + + Args: + alt_field (str): The ALT field (column 5) of a VCF line. + + Returns: + tuple: A tuple containing (chromosome, position) or (None, None) + if parsing fails. + """ + # Search for patterns ]chr:pos] or [chr:pos[ + pattern = ( + r"\](?P<chrom1>[^:]+):(?P<pos1>\d+)\]|" + r"\[(?P<chrom2>[^:]+):(?P<pos2>\d+)\[" + ) + match = re.search(pattern, alt_field) + + if not match: + return None, None + + groups = match.groupdict() + chrom = groups['chrom1'] or groups['chrom2'] + pos = groups['pos1'] or groups['pos2'] + return chrom, pos + + +def process_vcf(vcf_path, output_path): + """ + Reads a VCF file, converts it, and writes the result to a tabular file. + + Args: + vcf_path (str): Path to the input VCF file. + output_path (str): Path to the output tabular file. + """ + header = ["chr1", "pos1", "chr2", "pos2", "type"] + try: + with open(vcf_path, 'r') as infile, open(output_path, 'w') as outfile: + outfile.write("\t".join(header) + "\n") + + for line in infile: + if line.startswith('#'): + continue + + fields = line.strip().split('\t') + if len(fields) < 8: + continue + + chrom1 = fields[0] + pos1 = fields[1] + info = fields[7] + + # Attempt to extract the structural variant type from the info + svtype_match = re.search(r'SVTYPE=([^;]+)', info) + if not svtype_match: + continue # Skip lines without SVTYPE tag + svtype = svtype_match.group(1) + + if svtype == "BND": # Breakend (INV or TRA) + alt_field = fields[4] + chrom2, pos2 = parse_breakend_alt(alt_field) + if not (chrom2 and pos2): + continue + event_type = "INV" if chrom1 == chrom2 else "TRA" + row = [chrom1, pos1, chrom2, pos2, event_type] + outfile.write("\t".join(row) + "\n") + + else: # Other SV types (DEL, DUP, etc.) + end_match = re.search(r'END=([^;]+)', info) + if not end_match: + continue + pos2 = end_match.group(1) + chrom2 = chrom1 + row = [chrom1, pos1, chrom2, pos2, svtype] + outfile.write("\t".join(row) + "\n") + + except FileNotFoundError: + print(f"Error: File '{vcf_path}' not found.", + file=sys.stderr) + sys.exit(1) + except IOError as e: + print(f"IO Error: {e}", file=sys.stderr) + sys.exit(1) + + +def main(): + """Main function of the script.""" + parser = create_arg_parser() + args = parser.parse_args() + process_vcf(args.vcf_file, args.output_file) + + +if __name__ == '__main__': + main()