comparison split_imgt_file.py @ 92:cf8ad181628f draft

planemo upload commit 36be3b053802693392f935e6619ba3f2b1704e3c
author rhpvorderman
date Mon, 12 Dec 2022 12:32:44 +0000
parents
children
comparison
equal deleted inserted replaced
91:f387cc1580c6 92:cf8ad181628f
1 #!/usr/bin/env python3
2
3 """
4 Script to split IMGT file into several archives for each of the genes
5
6 Rather than creating each new archive individually this script will read
7 the input files only once and as such enormously shorten processing time.
8 """
9
10 import argparse
11 import io
12 import os
13 import tarfile
14 import tempfile
15 from typing import Iterator, List, Tuple
16
17
18 def merged_txt_to_match_dict(merged: str):
19 with open(merged, "rt") as f:
20 header = next(f).strip("\n")
21 column_names = header.split("\t")
22 # For the baseline result there is no best_match column
23 if "best_match" in column_names:
24 best_match_index = column_names.index("best_match")
25 else:
26 best_match_index = None
27 sequence_id_index = column_names.index("Sequence.ID")
28 match_dict = {}
29 for line in f:
30 values = line.strip().split("\t")
31 sequence_id = values[sequence_id_index]
32 if best_match_index is not None:
33 best_match = values[best_match_index]
34 if "unmatched" in best_match:
35 # For some reason the table has values such as: unmatched, IGA2
36 continue
37 else:
38 best_match = ""
39 match_dict[sequence_id] = best_match
40 return match_dict
41
42
43 def imgt_to_tables(imgt_file: str) -> Iterator[Tuple[str, io.TextIOWrapper]]:
44 print(f"opening IMGT file: {imgt_file}")
45 with tarfile.open(imgt_file, "r") as archive:
46 while True:
47 member = archive.next()
48 if member is None:
49 return
50 if member.name in {"README.txt"}:
51 continue
52 if member.name.startswith("11_"):
53 continue
54 f = archive.extractfile(member)
55 f_text = io.TextIOWrapper(f)
56 yield member.name, f_text
57 f_text.close()
58
59
60 def split_imgt(imgt_file: str, merged_file: str, outdir: str, genes: List[str],
61 prefix: str):
62 """
63 This function creates a separate tar file for each of the gene matches
64 based on the merged file. Unmatched genes are left out.
65 :param imgt_file: The original IMGT file
66 :param merged_file: The merged data file generated by SHM&CSR pipeline
67 :param outdir: The output directory.
68 :param genes: The genes to split out. Use '-' for all identified genes.
69 :return:
70 """
71 match_dict = merged_txt_to_match_dict(merged_file)
72 gene_tarfiles = []
73 os.makedirs(outdir, exist_ok=True)
74 for gene in genes:
75 new_filename = f"{prefix}_{gene}.txz" if gene else f"{prefix}.txz"
76 gene_tarfiles.append(
77 tarfile.open(os.path.join(outdir, new_filename), mode="w:xz")
78 )
79 for name, table in imgt_to_tables(imgt_file):
80 # Read each table one by one and per line select in which output
81 # files it should go.
82 gene_files = []
83 for gene in genes:
84 fp, fname = tempfile.mkstemp()
85 # The file pointer fp will be wrapped in a python file object
86 # so we can ensure there remain no open files.
87 f = open(fp, mode="wt")
88 gene_files.append((gene, f, fname))
89 header = next(table)
90 header_number_of_tabs = header.count('\t')
91 column_names = header.strip("\n").split("\t")
92 fr1_columns = [index for index, column in enumerate(column_names)
93 if column.startswith("FR1")]
94 sequence_id_index = column_names.index("Sequence ID")
95 for _, gene_file, _ in gene_files:
96 gene_file.write(header)
97 for line in table:
98 # IMGT sometimes delivers half-empty rows.
99 row_number_of_tabs = line.count("\t")
100 missing_tabs = header_number_of_tabs - row_number_of_tabs
101 if missing_tabs:
102 line = line.strip("\n") + missing_tabs * "\t" + "\n"
103 values = line.strip("\n").split("\t")
104 sequence_id = values[sequence_id_index]
105 match = match_dict.get(sequence_id)
106 if match is None:
107 continue
108 if name.startswith("8_"):
109 # change the FR1 columns to 0 in the "8_..." file
110 for index in fr1_columns:
111 values[index] = "0"
112 line = "\t".join(values) + "\n"
113 for gene, gene_file, _ in gene_files:
114 if gene in match:
115 gene_file.write(line)
116 for gene_tarfile, (_, gene_file, fname) in zip(gene_tarfiles, gene_files):
117 gene_file.flush()
118 gene_tarfile.add(fname, name)
119 gene_file.close()
120 os.remove(fname)
121 for gene_tarfile in gene_tarfiles:
122 gene_tarfile.close()
123
124
125 def argument_parser() -> argparse.ArgumentParser:
126 parser = argparse.ArgumentParser()
127 parser.add_argument("imgt_file", help="The original IMGT FILE")
128 parser.add_argument("merged", help="merged.txt file")
129 parser.add_argument("--outdir", help="output directory")
130 parser.add_argument(
131 "genes",
132 nargs="+",
133 help="The genes to split out. Use '-' for all identified genes.")
134 parser.add_argument("--prefix", help="Prefix for the archives and "
135 "directories")
136 return parser
137
138
139 def main():
140 args = argument_parser().parse_args()
141 genes = ["" if gene == "-" else gene for gene in args.genes]
142 split_imgt(args.imgt_file, args.merged, args.outdir, genes,
143 args.prefix)
144
145
146 if __name__ == "__main__":
147 main()