Mercurial > repos > davidvanzessen > shm_csr
view split_imgt_file.py @ 98:d714f5ea83d7 draft default tip
planemo upload commit 1a01065a084a817382872154f779b94090a35ebf
author | rhpvorderman |
---|---|
date | Wed, 10 Jan 2024 12:32:47 +0000 |
parents | cf8ad181628f |
children |
line wrap: on
line source
#!/usr/bin/env python3 """ Script to split IMGT file into several archives for each of the genes Rather than creating each new archive individually this script will read the input files only once and as such enormously shorten processing time. """ import argparse import io import os import tarfile import tempfile from typing import Iterator, List, Tuple def merged_txt_to_match_dict(merged: str): with open(merged, "rt") as f: header = next(f).strip("\n") column_names = header.split("\t") # For the baseline result there is no best_match column if "best_match" in column_names: best_match_index = column_names.index("best_match") else: best_match_index = None sequence_id_index = column_names.index("Sequence.ID") match_dict = {} for line in f: values = line.strip().split("\t") sequence_id = values[sequence_id_index] if best_match_index is not None: best_match = values[best_match_index] if "unmatched" in best_match: # For some reason the table has values such as: unmatched, IGA2 continue else: best_match = "" match_dict[sequence_id] = best_match return match_dict def imgt_to_tables(imgt_file: str) -> Iterator[Tuple[str, io.TextIOWrapper]]: print(f"opening IMGT file: {imgt_file}") with tarfile.open(imgt_file, "r") as archive: while True: member = archive.next() if member is None: return if member.name in {"README.txt"}: continue if member.name.startswith("11_"): continue f = archive.extractfile(member) f_text = io.TextIOWrapper(f) yield member.name, f_text f_text.close() def split_imgt(imgt_file: str, merged_file: str, outdir: str, genes: List[str], prefix: str): """ This function creates a separate tar file for each of the gene matches based on the merged file. Unmatched genes are left out. :param imgt_file: The original IMGT file :param merged_file: The merged data file generated by SHM&CSR pipeline :param outdir: The output directory. :param genes: The genes to split out. Use '-' for all identified genes. :return: """ match_dict = merged_txt_to_match_dict(merged_file) gene_tarfiles = [] os.makedirs(outdir, exist_ok=True) for gene in genes: new_filename = f"{prefix}_{gene}.txz" if gene else f"{prefix}.txz" gene_tarfiles.append( tarfile.open(os.path.join(outdir, new_filename), mode="w:xz") ) for name, table in imgt_to_tables(imgt_file): # Read each table one by one and per line select in which output # files it should go. gene_files = [] for gene in genes: fp, fname = tempfile.mkstemp() # The file pointer fp will be wrapped in a python file object # so we can ensure there remain no open files. f = open(fp, mode="wt") gene_files.append((gene, f, fname)) header = next(table) header_number_of_tabs = header.count('\t') column_names = header.strip("\n").split("\t") fr1_columns = [index for index, column in enumerate(column_names) if column.startswith("FR1")] sequence_id_index = column_names.index("Sequence ID") for _, gene_file, _ in gene_files: gene_file.write(header) for line in table: # IMGT sometimes delivers half-empty rows. row_number_of_tabs = line.count("\t") missing_tabs = header_number_of_tabs - row_number_of_tabs if missing_tabs: line = line.strip("\n") + missing_tabs * "\t" + "\n" values = line.strip("\n").split("\t") sequence_id = values[sequence_id_index] match = match_dict.get(sequence_id) if match is None: continue if name.startswith("8_"): # change the FR1 columns to 0 in the "8_..." file for index in fr1_columns: values[index] = "0" line = "\t".join(values) + "\n" for gene, gene_file, _ in gene_files: if gene in match: gene_file.write(line) for gene_tarfile, (_, gene_file, fname) in zip(gene_tarfiles, gene_files): gene_file.flush() gene_tarfile.add(fname, name) gene_file.close() os.remove(fname) for gene_tarfile in gene_tarfiles: gene_tarfile.close() def argument_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() parser.add_argument("imgt_file", help="The original IMGT FILE") parser.add_argument("merged", help="merged.txt file") parser.add_argument("--outdir", help="output directory") parser.add_argument( "genes", nargs="+", help="The genes to split out. Use '-' for all identified genes.") parser.add_argument("--prefix", help="Prefix for the archives and " "directories") return parser def main(): args = argument_parser().parse_args() genes = ["" if gene == "-" else gene for gene in args.genes] split_imgt(args.imgt_file, args.merged, args.outdir, genes, args.prefix) if __name__ == "__main__": main()