diff split_imgt_file.py @ 92:cf8ad181628f draft

planemo upload commit 36be3b053802693392f935e6619ba3f2b1704e3c
author rhpvorderman
date Mon, 12 Dec 2022 12:32:44 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/split_imgt_file.py	Mon Dec 12 12:32:44 2022 +0000
@@ -0,0 +1,147 @@
+#!/usr/bin/env python3
+
+"""
+Script to split IMGT file into several archives for each of the genes
+
+Rather than creating each new archive individually this script will read
+the input files only once and as such enormously shorten processing time.
+"""
+
+import argparse
+import io
+import os
+import tarfile
+import tempfile
+from typing import Iterator, List, Tuple
+
+
+def merged_txt_to_match_dict(merged: str):
+    with open(merged, "rt") as f:
+        header = next(f).strip("\n")
+        column_names = header.split("\t")
+        # For the baseline result there is no best_match column
+        if "best_match" in column_names:
+            best_match_index = column_names.index("best_match")
+        else:
+            best_match_index = None
+        sequence_id_index = column_names.index("Sequence.ID")
+        match_dict = {}
+        for line in f:
+            values = line.strip().split("\t")
+            sequence_id = values[sequence_id_index]
+            if best_match_index is not None:
+                best_match = values[best_match_index]
+                if "unmatched" in best_match:
+                    # For some reason the table has values such as: unmatched, IGA2
+                    continue
+            else:
+                best_match = ""
+            match_dict[sequence_id] = best_match
+    return match_dict
+
+
+def imgt_to_tables(imgt_file: str) -> Iterator[Tuple[str, io.TextIOWrapper]]:
+    print(f"opening IMGT file: {imgt_file}")
+    with tarfile.open(imgt_file, "r") as archive:
+        while True:
+            member = archive.next()
+            if member is None:
+                return
+            if member.name in {"README.txt"}:
+                continue
+            if member.name.startswith("11_"):
+                continue
+            f = archive.extractfile(member)
+            f_text = io.TextIOWrapper(f)
+            yield member.name, f_text
+            f_text.close()
+
+
+def split_imgt(imgt_file: str, merged_file: str, outdir: str, genes: List[str],
+               prefix: str):
+    """
+    This function creates a separate tar file for each of the gene matches
+    based on the merged file. Unmatched genes are left out.
+    :param imgt_file: The original IMGT file
+    :param merged_file: The merged data file generated by SHM&CSR pipeline
+    :param outdir: The output directory.
+    :param genes: The genes to split out. Use '-' for all identified genes.
+    :return:
+    """
+    match_dict = merged_txt_to_match_dict(merged_file)
+    gene_tarfiles = []
+    os.makedirs(outdir, exist_ok=True)
+    for gene in genes:
+        new_filename = f"{prefix}_{gene}.txz" if gene else f"{prefix}.txz"
+        gene_tarfiles.append(
+            tarfile.open(os.path.join(outdir, new_filename), mode="w:xz")
+        )
+    for name, table in imgt_to_tables(imgt_file):
+        # Read each table one by one and per line select in which output
+        # files it should go.
+        gene_files = []
+        for gene in genes:
+            fp, fname = tempfile.mkstemp()
+            # The file pointer fp will be wrapped in a python file object
+            # so we can ensure there remain no open files.
+            f = open(fp, mode="wt")
+            gene_files.append((gene, f, fname))
+        header = next(table)
+        header_number_of_tabs = header.count('\t')
+        column_names = header.strip("\n").split("\t")
+        fr1_columns = [index for index, column in enumerate(column_names)
+                       if column.startswith("FR1")]
+        sequence_id_index = column_names.index("Sequence ID")
+        for _, gene_file, _ in gene_files:
+            gene_file.write(header)
+        for line in table:
+            # IMGT sometimes delivers half-empty rows.
+            row_number_of_tabs = line.count("\t")
+            missing_tabs = header_number_of_tabs - row_number_of_tabs
+            if missing_tabs:
+                line = line.strip("\n") + missing_tabs * "\t" + "\n"
+            values = line.strip("\n").split("\t")
+            sequence_id = values[sequence_id_index]
+            match = match_dict.get(sequence_id)
+            if match is None:
+                continue
+            if name.startswith("8_"):
+                # change the FR1 columns to 0 in the "8_..." file
+                for index in fr1_columns:
+                    values[index] = "0"
+                line = "\t".join(values) + "\n"
+            for gene, gene_file, _ in gene_files:
+                if gene in match:
+                    gene_file.write(line)
+        for gene_tarfile, (_, gene_file, fname) in zip(gene_tarfiles, gene_files):
+            gene_file.flush()
+            gene_tarfile.add(fname, name)
+            gene_file.close()
+            os.remove(fname)
+    for gene_tarfile in gene_tarfiles:
+        gene_tarfile.close()
+
+
+def argument_parser() -> argparse.ArgumentParser:
+    parser = argparse.ArgumentParser()
+    parser.add_argument("imgt_file", help="The original IMGT FILE")
+    parser.add_argument("merged", help="merged.txt file")
+    parser.add_argument("--outdir", help="output directory")
+    parser.add_argument(
+        "genes",
+        nargs="+",
+        help="The genes to split out. Use '-' for all identified genes.")
+    parser.add_argument("--prefix", help="Prefix for the archives and "
+                                         "directories")
+    return parser
+
+
+def main():
+    args = argument_parser().parse_args()
+    genes = ["" if gene == "-" else gene for gene in args.genes]
+    split_imgt(args.imgt_file, args.merged, args.outdir, genes,
+               args.prefix)
+
+
+if __name__ == "__main__":
+    main()