Mercurial > repos > davidvanzessen > shm_csr
comparison split_imgt_file.py @ 92:cf8ad181628f draft
planemo upload commit 36be3b053802693392f935e6619ba3f2b1704e3c
author | rhpvorderman |
---|---|
date | Mon, 12 Dec 2022 12:32:44 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
91:f387cc1580c6 | 92:cf8ad181628f |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 """ | |
4 Script to split IMGT file into several archives for each of the genes | |
5 | |
6 Rather than creating each new archive individually this script will read | |
7 the input files only once and as such enormously shorten processing time. | |
8 """ | |
9 | |
10 import argparse | |
11 import io | |
12 import os | |
13 import tarfile | |
14 import tempfile | |
15 from typing import Iterator, List, Tuple | |
16 | |
17 | |
18 def merged_txt_to_match_dict(merged: str): | |
19 with open(merged, "rt") as f: | |
20 header = next(f).strip("\n") | |
21 column_names = header.split("\t") | |
22 # For the baseline result there is no best_match column | |
23 if "best_match" in column_names: | |
24 best_match_index = column_names.index("best_match") | |
25 else: | |
26 best_match_index = None | |
27 sequence_id_index = column_names.index("Sequence.ID") | |
28 match_dict = {} | |
29 for line in f: | |
30 values = line.strip().split("\t") | |
31 sequence_id = values[sequence_id_index] | |
32 if best_match_index is not None: | |
33 best_match = values[best_match_index] | |
34 if "unmatched" in best_match: | |
35 # For some reason the table has values such as: unmatched, IGA2 | |
36 continue | |
37 else: | |
38 best_match = "" | |
39 match_dict[sequence_id] = best_match | |
40 return match_dict | |
41 | |
42 | |
43 def imgt_to_tables(imgt_file: str) -> Iterator[Tuple[str, io.TextIOWrapper]]: | |
44 print(f"opening IMGT file: {imgt_file}") | |
45 with tarfile.open(imgt_file, "r") as archive: | |
46 while True: | |
47 member = archive.next() | |
48 if member is None: | |
49 return | |
50 if member.name in {"README.txt"}: | |
51 continue | |
52 if member.name.startswith("11_"): | |
53 continue | |
54 f = archive.extractfile(member) | |
55 f_text = io.TextIOWrapper(f) | |
56 yield member.name, f_text | |
57 f_text.close() | |
58 | |
59 | |
60 def split_imgt(imgt_file: str, merged_file: str, outdir: str, genes: List[str], | |
61 prefix: str): | |
62 """ | |
63 This function creates a separate tar file for each of the gene matches | |
64 based on the merged file. Unmatched genes are left out. | |
65 :param imgt_file: The original IMGT file | |
66 :param merged_file: The merged data file generated by SHM&CSR pipeline | |
67 :param outdir: The output directory. | |
68 :param genes: The genes to split out. Use '-' for all identified genes. | |
69 :return: | |
70 """ | |
71 match_dict = merged_txt_to_match_dict(merged_file) | |
72 gene_tarfiles = [] | |
73 os.makedirs(outdir, exist_ok=True) | |
74 for gene in genes: | |
75 new_filename = f"{prefix}_{gene}.txz" if gene else f"{prefix}.txz" | |
76 gene_tarfiles.append( | |
77 tarfile.open(os.path.join(outdir, new_filename), mode="w:xz") | |
78 ) | |
79 for name, table in imgt_to_tables(imgt_file): | |
80 # Read each table one by one and per line select in which output | |
81 # files it should go. | |
82 gene_files = [] | |
83 for gene in genes: | |
84 fp, fname = tempfile.mkstemp() | |
85 # The file pointer fp will be wrapped in a python file object | |
86 # so we can ensure there remain no open files. | |
87 f = open(fp, mode="wt") | |
88 gene_files.append((gene, f, fname)) | |
89 header = next(table) | |
90 header_number_of_tabs = header.count('\t') | |
91 column_names = header.strip("\n").split("\t") | |
92 fr1_columns = [index for index, column in enumerate(column_names) | |
93 if column.startswith("FR1")] | |
94 sequence_id_index = column_names.index("Sequence ID") | |
95 for _, gene_file, _ in gene_files: | |
96 gene_file.write(header) | |
97 for line in table: | |
98 # IMGT sometimes delivers half-empty rows. | |
99 row_number_of_tabs = line.count("\t") | |
100 missing_tabs = header_number_of_tabs - row_number_of_tabs | |
101 if missing_tabs: | |
102 line = line.strip("\n") + missing_tabs * "\t" + "\n" | |
103 values = line.strip("\n").split("\t") | |
104 sequence_id = values[sequence_id_index] | |
105 match = match_dict.get(sequence_id) | |
106 if match is None: | |
107 continue | |
108 if name.startswith("8_"): | |
109 # change the FR1 columns to 0 in the "8_..." file | |
110 for index in fr1_columns: | |
111 values[index] = "0" | |
112 line = "\t".join(values) + "\n" | |
113 for gene, gene_file, _ in gene_files: | |
114 if gene in match: | |
115 gene_file.write(line) | |
116 for gene_tarfile, (_, gene_file, fname) in zip(gene_tarfiles, gene_files): | |
117 gene_file.flush() | |
118 gene_tarfile.add(fname, name) | |
119 gene_file.close() | |
120 os.remove(fname) | |
121 for gene_tarfile in gene_tarfiles: | |
122 gene_tarfile.close() | |
123 | |
124 | |
125 def argument_parser() -> argparse.ArgumentParser: | |
126 parser = argparse.ArgumentParser() | |
127 parser.add_argument("imgt_file", help="The original IMGT FILE") | |
128 parser.add_argument("merged", help="merged.txt file") | |
129 parser.add_argument("--outdir", help="output directory") | |
130 parser.add_argument( | |
131 "genes", | |
132 nargs="+", | |
133 help="The genes to split out. Use '-' for all identified genes.") | |
134 parser.add_argument("--prefix", help="Prefix for the archives and " | |
135 "directories") | |
136 return parser | |
137 | |
138 | |
139 def main(): | |
140 args = argument_parser().parse_args() | |
141 genes = ["" if gene == "-" else gene for gene in args.genes] | |
142 split_imgt(args.imgt_file, args.merged, args.outdir, genes, | |
143 args.prefix) | |
144 | |
145 | |
146 if __name__ == "__main__": | |
147 main() |