Mercurial > repos > davidvanzessen > shm_csr
comparison sequence_overview.py @ 92:cf8ad181628f draft
planemo upload commit 36be3b053802693392f935e6619ba3f2b1704e3c
author | rhpvorderman |
---|---|
date | Mon, 12 Dec 2022 12:32:44 +0000 |
parents | |
children | 8fcf31272f6e |
comparison
equal
deleted
inserted
replaced
91:f387cc1580c6 | 92:cf8ad181628f |
---|---|
1 #!/usr/bin/env/python3 | |
2 | |
3 """Create a HTML sequence overview""" | |
4 | |
5 import argparse | |
6 import os | |
7 import typing | |
8 from collections import defaultdict | |
9 from pathlib import Path | |
10 from typing import Dict, Iterable, List | |
11 | |
12 | |
13 class SequenceTableRow(typing.NamedTuple): | |
14 sequence_id: str | |
15 sequence: str | |
16 best_match: str | |
17 functionality: str | |
18 | |
19 | |
20 class SequenceStats: | |
21 __slots__ = ("counts", "table_rows") | |
22 | |
23 def __init__(self): | |
24 self.counts: Dict[str, int] = { | |
25 "IGA1": 0, | |
26 "IGA2": 0, | |
27 "IGE": 0, | |
28 "IGG1": 0, | |
29 "IGG2": 0, | |
30 "IGG3": 0, | |
31 "IGG4": 0, | |
32 "IGM": 0, | |
33 "unmatched": 0} | |
34 self.table_rows: List[SequenceTableRow] = [] | |
35 | |
36 | |
37 def get_sequence_stats(before_unique: str, | |
38 sequence_columns: List[str]): | |
39 sequence_statistics = defaultdict(SequenceStats) | |
40 with open(before_unique, "rt") as table: | |
41 header = next(table) | |
42 header_columns = header.strip("\n").split("\t") | |
43 for line in table: | |
44 values = line.strip("\n").split("\t") | |
45 row_dict = dict(zip(header_columns, values)) | |
46 sequence = " ".join(row_dict[column] for column in sequence_columns) | |
47 best_match = row_dict["best_match"] | |
48 original_match = best_match | |
49 if best_match.startswith("unmatched"): | |
50 best_match = "unmatched" | |
51 sequence_statistics[sequence].counts[best_match] += 1 | |
52 functionality = row_dict["Functionality"] | |
53 sequence_statistics[sequence].table_rows.append( | |
54 SequenceTableRow(row_dict["Sequence.ID"], sequence, | |
55 original_match, functionality)) | |
56 return sequence_statistics | |
57 | |
58 | |
59 def get_background_color(value: str): | |
60 if value in ("TRUE", "T"): | |
61 return "#eafaf1" | |
62 elif value in ("FALSE", "F"): | |
63 return "#f9ebea" | |
64 try: | |
65 flt = float(value) | |
66 except ValueError: | |
67 return "white" | |
68 if flt > 0: | |
69 return "#eaecee" | |
70 return "white" | |
71 | |
72 | |
73 def td(val): | |
74 return f"<td bgcolor='{get_background_color(val)}'>{val}</td>" | |
75 | |
76 | |
77 def tr(val: Iterable[str]): | |
78 return f"<tr>{''.join(td(v) for v in val)}</tr>\n" | |
79 | |
80 | |
81 def make_link(link, val): | |
82 return f"<a href='{link}'>{val}</a>" | |
83 | |
84 | |
85 def tbl(df: Iterable[Iterable[str]]): | |
86 return f"<table border='1'>{''.join(tr(v) for v in df)}</table>\n" | |
87 | |
88 | |
89 def to_bool_str(cond): | |
90 return "TRUE" if cond else "FALSE" | |
91 | |
92 | |
93 def sequence_overview(before_unique: str, | |
94 outdir: str, | |
95 empty_region_filter: str): | |
96 os.makedirs(outdir, exist_ok=True) | |
97 sequence_columns = [ | |
98 "FR1.IMGT.seq", "CDR1.IMGT.seq", "FR2.IMGT.seq", "CDR2.IMGT.seq", | |
99 "FR3.IMGT.seq", "CDR3.IMGT.seq"] | |
100 if empty_region_filter == "leader": | |
101 sequence_columns = sequence_columns | |
102 elif empty_region_filter == "FR1": | |
103 sequence_columns = sequence_columns[1:] | |
104 elif empty_region_filter == "CDR1": | |
105 sequence_columns = sequence_columns[2:] | |
106 elif empty_region_filter == "FR2": | |
107 sequence_columns = sequence_columns[3:] | |
108 else: | |
109 raise ValueError(f"Unknown region filter: {empty_region_filter}") | |
110 main_html_file = os.path.join(outdir, "index.html") | |
111 by_id_file = os.path.join(outdir, "by_id.html") | |
112 with open(main_html_file, "wt") as main_html, open(by_id_file, "wt") as by_id: | |
113 main_html.write("<center><img src='data:image/png;base64," | |
114 "iVBORw0KGgoAAAANSUhEUgAAAA8AAAAPCAYAAAA71pVKAAAAzElEQ" | |
115 "VQoka2TwQ2CQBBFpwTshw4ImW8ogJMlUIMmhNCDxgasAi50oSXA8X" | |
116 "lAjCG7aqKTzGX/vsnM31mzR0gk7tTudO5MEizpzvQ4ryUSe408J3X" | |
117 "n+grE0p1rnpOamVmWsZG4rS+dzzAMsN8Hi9yyjI1JNGtxu4VxBJgL" | |
118 "RLpoTKIPiW0LlwtUVRTubW2OBGUJu92cZRmdfbKQMAw8o+vi5v0fL" | |
119 "orZ7Y9waGYJjsf38DJz0O1PsEQffOcv4Sa6YYfDDJ5Obzbsp93+5Vf" | |
120 "dATueO1fdLdI0AAAAAElFTkSuQmCC'" | |
121 "> Please note that this tab is based on all " | |
122 "sequences before filter unique sequences and the " | |
123 "remove duplicates based on filters are applied. In " | |
124 "this table only sequences occuring more than once " | |
125 "are included. </center>") | |
126 main_html.write("<table border='1' class='pure-table pure-table-striped'>") | |
127 main_html.write(f"<caption>" | |
128 f"{'+'.join(column.split('.')[0] for column in sequence_columns)} sequences " | |
129 f"that show up more than once</caption>") | |
130 main_html.write("<tr>") | |
131 main_html.write("<th>Sequence</th><th>Functionality</th><th>IGA1</th>" | |
132 "<th>IGA2</th><th>IGG1</th><th>IGG2</th><th>IGG3</th>" | |
133 "<th>IGG4</th><th>IGM</th><th>IGE</th><th>UN</th>") | |
134 main_html.write("<th>total IGA</th><th>total IGG</th><th>total IGM</th>" | |
135 "<th>total IGE</th><th>number of subclasses</th>" | |
136 "<th>present in both IGA and IGG</th>" | |
137 "<th>present in IGA, IGG and IGM</th>" | |
138 "<th>present in IGA, IGG and IGE</th>" | |
139 "<th>present in IGA, IGG, IGM and IGE</th>" | |
140 "<th>IGA1+IGA2</th>") | |
141 main_html.write("<th>IGG1+IGG2</th><th>IGG1+IGG3</th>" | |
142 "<th>IGG1+IGG4</th><th>IGG2+IGG3</th>" | |
143 "<th>IGG2+IGG4</th><th>IGG3+IGG4</th>") | |
144 main_html.write("<th>IGG1+IGG2+IGG3</th><th>IGG2+IGG3+IGG4</th>" | |
145 "<th>IGG1+IGG2+IGG4</th><th>IGG1+IGG3+IGG4</th>" | |
146 "<th>IGG1+IGG2+IGG3+IGG4</th>") | |
147 main_html.write("</tr>\n") | |
148 sequence_stats = get_sequence_stats(before_unique, sequence_columns) | |
149 sorted_sequences = sorted(sequence_stats.keys()) | |
150 | |
151 single_sequences = 0 # sequence only found once, skipped | |
152 in_multiple = 0 # same sequence across multiple subclasses | |
153 multiple_in_one = 0 # same sequence multiple times in one subclass | |
154 unmatched = 0 # all the sequences are unmatched | |
155 some_unmatched = 0 # one or more sequences in a clone are unmatched | |
156 matched = 0 # should be the same als matched sequences | |
157 | |
158 for i, sequence in enumerate(sorted_sequences, start=1): | |
159 sequence_stat: SequenceStats = sequence_stats[sequence] | |
160 count_dict = sequence_stat.counts | |
161 class_sum = sum(count_dict.values()) | |
162 if class_sum == 1: | |
163 single_sequences += 1 | |
164 continue | |
165 if count_dict["unmatched"] == class_sum: | |
166 unmatched += 1 | |
167 continue | |
168 in_classes = sum(1 for key, value in count_dict.items() | |
169 if value > 0 and key != "unmatched") | |
170 matched += in_classes | |
171 if any(value == class_sum for value in count_dict.values()): | |
172 multiple_in_one += 1 | |
173 elif count_dict["unmatched"] > 0: | |
174 some_unmatched += 1 | |
175 else: | |
176 in_multiple += 1 | |
177 # Use a dict so we can preserve the order and get all the unique | |
178 # items. With a set the order is not preserved. | |
179 functionality_dict = {row.functionality: None | |
180 for row in sequence_stat.table_rows} | |
181 functionality = ",".join(functionality_dict.keys()) | |
182 links: Dict[str, str] = {} | |
183 for key, value in count_dict.items(): | |
184 name_key = "un" if key == "unmatched" else key | |
185 html_file = f"{name_key}_{i}.html" | |
186 links[key] = html_file | |
187 if value > 0: | |
188 rows = [row for row in sequence_stat.table_rows | |
189 # Startswith to also get unmatched columns | |
190 if row.best_match.startswith(key)] | |
191 Path(outdir, html_file).write_text(tbl(rows)) | |
192 for row in rows: | |
193 by_id.write(make_link(html_file, row.sequence_id) + "<br />\n") | |
194 iga_count = count_dict["IGA1"] + count_dict["IGA2"] | |
195 igg_count = count_dict["IGG1"] + count_dict["IGG2"] + \ | |
196 count_dict["IGG3"] + count_dict["IGG4"] | |
197 | |
198 contained_classes = set(key for key, value | |
199 in count_dict.items() if value > 0) | |
200 if iga_count: | |
201 contained_classes.add("IGA") | |
202 if igg_count: | |
203 contained_classes.add("IGG") | |
204 main_row = [ | |
205 sequence, functionality, | |
206 make_link(links["IGA1"], count_dict["IGA1"]), | |
207 make_link(links["IGA2"], count_dict["IGA2"]), | |
208 make_link(links["IGG1"], count_dict["IGG1"]), | |
209 make_link(links["IGG2"], count_dict["IGG2"]), | |
210 make_link(links["IGG3"], count_dict["IGG3"]), | |
211 make_link(links["IGG4"], count_dict["IGG4"]), | |
212 make_link(links["IGM"], count_dict["IGM"]), | |
213 make_link(links["IGE"], count_dict["IGE"]), | |
214 make_link(links["unmatched"], count_dict["unmatched"]), | |
215 iga_count, | |
216 igg_count, | |
217 count_dict["IGM"], | |
218 count_dict["IGE"], | |
219 in_classes, | |
220 to_bool_str({"IGA", "IGG"}.issubset(contained_classes)), | |
221 to_bool_str({"IGA", "IGG", "IGM"}.issubset(contained_classes)), | |
222 to_bool_str({"IGA", "IGG", "IGE"}.issubset(contained_classes)), | |
223 to_bool_str({"IGA", "IGG", "IGM", "IGE"}.issubset(contained_classes)), | |
224 to_bool_str({"IGA1", "IGA2"}.issubset(contained_classes)), | |
225 to_bool_str({"IGG1", "IGG2"}.issubset(contained_classes)), | |
226 to_bool_str({"IGG1", "IGG3"}.issubset(contained_classes)), | |
227 to_bool_str({"IGG1", "IGG4"}.issubset(contained_classes)), | |
228 to_bool_str({"IGG2", "IGG3"}.issubset(contained_classes)), | |
229 to_bool_str({"IGG2", "IGG4"}.issubset(contained_classes)), | |
230 to_bool_str({"IGG3", "IGG4"}.issubset(contained_classes)), | |
231 to_bool_str({"IGG1", "IGG2", "IGG3"}.issubset(contained_classes)), | |
232 to_bool_str({"IGG2", "IGG3", "IGG4"}.issubset(contained_classes)), | |
233 to_bool_str({"IGG1", "IGG2", "IGG4"}.issubset(contained_classes)), | |
234 to_bool_str({"IGG1", "IGG3", "IGG4"}.issubset(contained_classes)), | |
235 to_bool_str({"IGG1", "IGG2", "IGG3", "IGG4"}.issubset(contained_classes)), | |
236 ] | |
237 main_html.write(tr(main_row)) | |
238 main_html.write("</table>") | |
239 | |
240 | |
241 def argument_parser() -> argparse.ArgumentParser: | |
242 parser = argparse.ArgumentParser() | |
243 parser.add_argument("--before-unique", help="File with the overview before unique filters") | |
244 parser.add_argument("--outdir", help="Output directory") | |
245 parser.add_argument("--empty-region-filter") | |
246 return parser | |
247 | |
248 | |
249 def main(): | |
250 args = argument_parser().parse_args() | |
251 sequence_overview(args.before_unique, | |
252 args.outdir, | |
253 args.empty_region_filter) | |
254 | |
255 | |
256 if __name__ == "__main__": | |
257 main() |