diff vcf2hrdetect.py @ 11:5a326a6fa105 draft default tip

planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/main/tools/lumpy_smoove commit 8b10e8fc832f8ca7c32479e20d5edbd62088a3aa
author artbio
date Fri, 17 Oct 2025 17:21:17 +0000
parents 7dcf61950215
children
line wrap: on
line diff
--- a/vcf2hrdetect.py	Wed Jan 24 19:26:57 2024 +0000
+++ b/vcf2hrdetect.py	Fri Oct 17 17:21:17 2025 +0000
@@ -1,41 +1,119 @@
+#!/usr/bin/env python
+import argparse
+import re
 import sys
 
-handle = open(sys.argv[1], 'r')
-vcfdict = dict()
-tabdict = dict()
-for line in handle:
-    if line[0] == "#":
-        continue
-    else:
-        tabline = line[:-1].split("\t")
-        vcfdict[tabline[2]] = tabline
-for id in vcfdict.keys():
-    if "_1" in id:
-        newid = id[:-2]
-        pointbreak = vcfdict[id][4]
-        if "]" in pointbreak:
-            coordbreak = pointbreak.split("]")[1].split(":")[1]
-            chrom = pointbreak.split("]")[1].split(":")[0]
-        elif "[" in pointbreak:
-            coordbreak = pointbreak.split("[")[1].split(":")[1]
-            chrom = pointbreak.split("[")[1].split(":")[0]
-        if vcfdict[id][0] == chrom:
-            tabdict[newid] = [chrom, vcfdict[id][1], chrom, coordbreak, "INV"]
-        else:
-            tabdict[newid] = [vcfdict[id][0], vcfdict[id][1],
-                              chrom, coordbreak, "TRA"]
-for id in list(vcfdict):
-    if "_" in id:
-        del vcfdict[id]
-for id in vcfdict.keys():  # only sv that are not of type TRA or INV
-    chr1 = vcfdict[id][0]
-    chr2 = vcfdict[id][0]
-    pos1 = vcfdict[id][1]
-    pos2 = vcfdict[id][7].split("END=")[1].split(";")[0]
-    type = vcfdict[id][7].split("SVTYPE=")[1].split(";")[0]
-    tabdict[id] = [chr1, pos1, chr2, pos2, type]
-out = open(sys.argv[2], 'w')
-out.write("chr1\tpos1\tchr2\tpos2\ttype\n")
-for key in tabdict:
-    line = "\t".join(tabdict[key]) + "\n"
-    out.write(line)
+
+def create_arg_parser():
+    """Creates and returns the argument parser."""
+    parser = argparse.ArgumentParser(
+        description=(
+            "Convert a VCF file from lumpy-smoove to a tabular format "
+            "compatible with the HRDetect pipeline."
+        )
+    )
+    parser.add_argument(
+        'vcf_file',
+        help='Path to the input VCF file.'
+    )
+    parser.add_argument(
+        'output_file',
+        help='Path to the output tabular file.'
+    )
+    return parser
+
+
+def parse_breakend_alt(alt_field):
+    """
+    Parses the ALT field for a breakend and returns chromosome and position.
+
+    Args:
+        alt_field (str): The ALT field (column 5) of a VCF line.
+
+    Returns:
+        tuple: A tuple containing (chromosome, position) or (None, None)
+               if parsing fails.
+    """
+    # Search for patterns ]chr:pos] or [chr:pos[
+    pattern = (
+        r"\](?P<chrom1>[^:]+):(?P<pos1>\d+)\]|"
+        r"\[(?P<chrom2>[^:]+):(?P<pos2>\d+)\["
+    )
+    match = re.search(pattern, alt_field)
+
+    if not match:
+        return None, None
+
+    groups = match.groupdict()
+    chrom = groups['chrom1'] or groups['chrom2']
+    pos = groups['pos1'] or groups['pos2']
+    return chrom, pos
+
+
+def process_vcf(vcf_path, output_path):
+    """
+    Reads a VCF file, converts it, and writes the result to a tabular file.
+
+    Args:
+        vcf_path (str): Path to the input VCF file.
+        output_path (str): Path to the output tabular file.
+    """
+    header = ["chr1", "pos1", "chr2", "pos2", "type"]
+    try:
+        with open(vcf_path, 'r') as infile, open(output_path, 'w') as outfile:
+            outfile.write("\t".join(header) + "\n")
+
+            for line in infile:
+                if line.startswith('#'):
+                    continue
+
+                fields = line.strip().split('\t')
+                if len(fields) < 8:
+                    continue
+
+                chrom1 = fields[0]
+                pos1 = fields[1]
+                info = fields[7]
+
+                # Attempt to extract the structural variant type from the info
+                svtype_match = re.search(r'SVTYPE=([^;]+)', info)
+                if not svtype_match:
+                    continue  # Skip lines without SVTYPE tag
+                svtype = svtype_match.group(1)
+
+                if svtype == "BND":  # Breakend (INV or TRA)
+                    alt_field = fields[4]
+                    chrom2, pos2 = parse_breakend_alt(alt_field)
+                    if not (chrom2 and pos2):
+                        continue
+                    event_type = "INV" if chrom1 == chrom2 else "TRA"
+                    row = [chrom1, pos1, chrom2, pos2, event_type]
+                    outfile.write("\t".join(row) + "\n")
+
+                else:  # Other SV types (DEL, DUP, etc.)
+                    end_match = re.search(r'END=([^;]+)', info)
+                    if not end_match:
+                        continue
+                    pos2 = end_match.group(1)
+                    chrom2 = chrom1
+                    row = [chrom1, pos1, chrom2, pos2, svtype]
+                    outfile.write("\t".join(row) + "\n")
+
+    except FileNotFoundError:
+        print(f"Error: File '{vcf_path}' not found.",
+              file=sys.stderr)
+        sys.exit(1)
+    except IOError as e:
+        print(f"IO Error: {e}", file=sys.stderr)
+        sys.exit(1)
+
+
+def main():
+    """Main function of the script."""
+    parser = create_arg_parser()
+    args = parser.parse_args()
+    process_vcf(args.vcf_file, args.output_file)
+
+
+if __name__ == '__main__':
+    main()