Mercurial > repos > davidvanzessen > shm_csr
comparison sequence_overview.py @ 92:cf8ad181628f draft
planemo upload commit 36be3b053802693392f935e6619ba3f2b1704e3c
| author | rhpvorderman | 
|---|---|
| date | Mon, 12 Dec 2022 12:32:44 +0000 | 
| parents | |
| children | 8fcf31272f6e | 
   comparison
  equal
  deleted
  inserted
  replaced
| 91:f387cc1580c6 | 92:cf8ad181628f | 
|---|---|
| 1 #!/usr/bin/env/python3 | |
| 2 | |
| 3 """Create a HTML sequence overview""" | |
| 4 | |
| 5 import argparse | |
| 6 import os | |
| 7 import typing | |
| 8 from collections import defaultdict | |
| 9 from pathlib import Path | |
| 10 from typing import Dict, Iterable, List | |
| 11 | |
| 12 | |
| 13 class SequenceTableRow(typing.NamedTuple): | |
| 14 sequence_id: str | |
| 15 sequence: str | |
| 16 best_match: str | |
| 17 functionality: str | |
| 18 | |
| 19 | |
| 20 class SequenceStats: | |
| 21 __slots__ = ("counts", "table_rows") | |
| 22 | |
| 23 def __init__(self): | |
| 24 self.counts: Dict[str, int] = { | |
| 25 "IGA1": 0, | |
| 26 "IGA2": 0, | |
| 27 "IGE": 0, | |
| 28 "IGG1": 0, | |
| 29 "IGG2": 0, | |
| 30 "IGG3": 0, | |
| 31 "IGG4": 0, | |
| 32 "IGM": 0, | |
| 33 "unmatched": 0} | |
| 34 self.table_rows: List[SequenceTableRow] = [] | |
| 35 | |
| 36 | |
| 37 def get_sequence_stats(before_unique: str, | |
| 38 sequence_columns: List[str]): | |
| 39 sequence_statistics = defaultdict(SequenceStats) | |
| 40 with open(before_unique, "rt") as table: | |
| 41 header = next(table) | |
| 42 header_columns = header.strip("\n").split("\t") | |
| 43 for line in table: | |
| 44 values = line.strip("\n").split("\t") | |
| 45 row_dict = dict(zip(header_columns, values)) | |
| 46 sequence = " ".join(row_dict[column] for column in sequence_columns) | |
| 47 best_match = row_dict["best_match"] | |
| 48 original_match = best_match | |
| 49 if best_match.startswith("unmatched"): | |
| 50 best_match = "unmatched" | |
| 51 sequence_statistics[sequence].counts[best_match] += 1 | |
| 52 functionality = row_dict["Functionality"] | |
| 53 sequence_statistics[sequence].table_rows.append( | |
| 54 SequenceTableRow(row_dict["Sequence.ID"], sequence, | |
| 55 original_match, functionality)) | |
| 56 return sequence_statistics | |
| 57 | |
| 58 | |
| 59 def get_background_color(value: str): | |
| 60 if value in ("TRUE", "T"): | |
| 61 return "#eafaf1" | |
| 62 elif value in ("FALSE", "F"): | |
| 63 return "#f9ebea" | |
| 64 try: | |
| 65 flt = float(value) | |
| 66 except ValueError: | |
| 67 return "white" | |
| 68 if flt > 0: | |
| 69 return "#eaecee" | |
| 70 return "white" | |
| 71 | |
| 72 | |
| 73 def td(val): | |
| 74 return f"<td bgcolor='{get_background_color(val)}'>{val}</td>" | |
| 75 | |
| 76 | |
| 77 def tr(val: Iterable[str]): | |
| 78 return f"<tr>{''.join(td(v) for v in val)}</tr>\n" | |
| 79 | |
| 80 | |
| 81 def make_link(link, val): | |
| 82 return f"<a href='{link}'>{val}</a>" | |
| 83 | |
| 84 | |
| 85 def tbl(df: Iterable[Iterable[str]]): | |
| 86 return f"<table border='1'>{''.join(tr(v) for v in df)}</table>\n" | |
| 87 | |
| 88 | |
| 89 def to_bool_str(cond): | |
| 90 return "TRUE" if cond else "FALSE" | |
| 91 | |
| 92 | |
| 93 def sequence_overview(before_unique: str, | |
| 94 outdir: str, | |
| 95 empty_region_filter: str): | |
| 96 os.makedirs(outdir, exist_ok=True) | |
| 97 sequence_columns = [ | |
| 98 "FR1.IMGT.seq", "CDR1.IMGT.seq", "FR2.IMGT.seq", "CDR2.IMGT.seq", | |
| 99 "FR3.IMGT.seq", "CDR3.IMGT.seq"] | |
| 100 if empty_region_filter == "leader": | |
| 101 sequence_columns = sequence_columns | |
| 102 elif empty_region_filter == "FR1": | |
| 103 sequence_columns = sequence_columns[1:] | |
| 104 elif empty_region_filter == "CDR1": | |
| 105 sequence_columns = sequence_columns[2:] | |
| 106 elif empty_region_filter == "FR2": | |
| 107 sequence_columns = sequence_columns[3:] | |
| 108 else: | |
| 109 raise ValueError(f"Unknown region filter: {empty_region_filter}") | |
| 110 main_html_file = os.path.join(outdir, "index.html") | |
| 111 by_id_file = os.path.join(outdir, "by_id.html") | |
| 112 with open(main_html_file, "wt") as main_html, open(by_id_file, "wt") as by_id: | |
| 113 main_html.write("<center><img src='data:image/png;base64," | |
| 114 "iVBORw0KGgoAAAANSUhEUgAAAA8AAAAPCAYAAAA71pVKAAAAzElEQ" | |
| 115 "VQoka2TwQ2CQBBFpwTshw4ImW8ogJMlUIMmhNCDxgasAi50oSXA8X" | |
| 116 "lAjCG7aqKTzGX/vsnM31mzR0gk7tTudO5MEizpzvQ4ryUSe408J3X" | |
| 117 "n+grE0p1rnpOamVmWsZG4rS+dzzAMsN8Hi9yyjI1JNGtxu4VxBJgL" | |
| 118 "RLpoTKIPiW0LlwtUVRTubW2OBGUJu92cZRmdfbKQMAw8o+vi5v0fL" | |
| 119 "orZ7Y9waGYJjsf38DJz0O1PsEQffOcv4Sa6YYfDDJ5Obzbsp93+5Vf" | |
| 120 "dATueO1fdLdI0AAAAAElFTkSuQmCC'" | |
| 121 "> Please note that this tab is based on all " | |
| 122 "sequences before filter unique sequences and the " | |
| 123 "remove duplicates based on filters are applied. In " | |
| 124 "this table only sequences occuring more than once " | |
| 125 "are included. </center>") | |
| 126 main_html.write("<table border='1' class='pure-table pure-table-striped'>") | |
| 127 main_html.write(f"<caption>" | |
| 128 f"{'+'.join(column.split('.')[0] for column in sequence_columns)} sequences " | |
| 129 f"that show up more than once</caption>") | |
| 130 main_html.write("<tr>") | |
| 131 main_html.write("<th>Sequence</th><th>Functionality</th><th>IGA1</th>" | |
| 132 "<th>IGA2</th><th>IGG1</th><th>IGG2</th><th>IGG3</th>" | |
| 133 "<th>IGG4</th><th>IGM</th><th>IGE</th><th>UN</th>") | |
| 134 main_html.write("<th>total IGA</th><th>total IGG</th><th>total IGM</th>" | |
| 135 "<th>total IGE</th><th>number of subclasses</th>" | |
| 136 "<th>present in both IGA and IGG</th>" | |
| 137 "<th>present in IGA, IGG and IGM</th>" | |
| 138 "<th>present in IGA, IGG and IGE</th>" | |
| 139 "<th>present in IGA, IGG, IGM and IGE</th>" | |
| 140 "<th>IGA1+IGA2</th>") | |
| 141 main_html.write("<th>IGG1+IGG2</th><th>IGG1+IGG3</th>" | |
| 142 "<th>IGG1+IGG4</th><th>IGG2+IGG3</th>" | |
| 143 "<th>IGG2+IGG4</th><th>IGG3+IGG4</th>") | |
| 144 main_html.write("<th>IGG1+IGG2+IGG3</th><th>IGG2+IGG3+IGG4</th>" | |
| 145 "<th>IGG1+IGG2+IGG4</th><th>IGG1+IGG3+IGG4</th>" | |
| 146 "<th>IGG1+IGG2+IGG3+IGG4</th>") | |
| 147 main_html.write("</tr>\n") | |
| 148 sequence_stats = get_sequence_stats(before_unique, sequence_columns) | |
| 149 sorted_sequences = sorted(sequence_stats.keys()) | |
| 150 | |
| 151 single_sequences = 0 # sequence only found once, skipped | |
| 152 in_multiple = 0 # same sequence across multiple subclasses | |
| 153 multiple_in_one = 0 # same sequence multiple times in one subclass | |
| 154 unmatched = 0 # all the sequences are unmatched | |
| 155 some_unmatched = 0 # one or more sequences in a clone are unmatched | |
| 156 matched = 0 # should be the same als matched sequences | |
| 157 | |
| 158 for i, sequence in enumerate(sorted_sequences, start=1): | |
| 159 sequence_stat: SequenceStats = sequence_stats[sequence] | |
| 160 count_dict = sequence_stat.counts | |
| 161 class_sum = sum(count_dict.values()) | |
| 162 if class_sum == 1: | |
| 163 single_sequences += 1 | |
| 164 continue | |
| 165 if count_dict["unmatched"] == class_sum: | |
| 166 unmatched += 1 | |
| 167 continue | |
| 168 in_classes = sum(1 for key, value in count_dict.items() | |
| 169 if value > 0 and key != "unmatched") | |
| 170 matched += in_classes | |
| 171 if any(value == class_sum for value in count_dict.values()): | |
| 172 multiple_in_one += 1 | |
| 173 elif count_dict["unmatched"] > 0: | |
| 174 some_unmatched += 1 | |
| 175 else: | |
| 176 in_multiple += 1 | |
| 177 # Use a dict so we can preserve the order and get all the unique | |
| 178 # items. With a set the order is not preserved. | |
| 179 functionality_dict = {row.functionality: None | |
| 180 for row in sequence_stat.table_rows} | |
| 181 functionality = ",".join(functionality_dict.keys()) | |
| 182 links: Dict[str, str] = {} | |
| 183 for key, value in count_dict.items(): | |
| 184 name_key = "un" if key == "unmatched" else key | |
| 185 html_file = f"{name_key}_{i}.html" | |
| 186 links[key] = html_file | |
| 187 if value > 0: | |
| 188 rows = [row for row in sequence_stat.table_rows | |
| 189 # Startswith to also get unmatched columns | |
| 190 if row.best_match.startswith(key)] | |
| 191 Path(outdir, html_file).write_text(tbl(rows)) | |
| 192 for row in rows: | |
| 193 by_id.write(make_link(html_file, row.sequence_id) + "<br />\n") | |
| 194 iga_count = count_dict["IGA1"] + count_dict["IGA2"] | |
| 195 igg_count = count_dict["IGG1"] + count_dict["IGG2"] + \ | |
| 196 count_dict["IGG3"] + count_dict["IGG4"] | |
| 197 | |
| 198 contained_classes = set(key for key, value | |
| 199 in count_dict.items() if value > 0) | |
| 200 if iga_count: | |
| 201 contained_classes.add("IGA") | |
| 202 if igg_count: | |
| 203 contained_classes.add("IGG") | |
| 204 main_row = [ | |
| 205 sequence, functionality, | |
| 206 make_link(links["IGA1"], count_dict["IGA1"]), | |
| 207 make_link(links["IGA2"], count_dict["IGA2"]), | |
| 208 make_link(links["IGG1"], count_dict["IGG1"]), | |
| 209 make_link(links["IGG2"], count_dict["IGG2"]), | |
| 210 make_link(links["IGG3"], count_dict["IGG3"]), | |
| 211 make_link(links["IGG4"], count_dict["IGG4"]), | |
| 212 make_link(links["IGM"], count_dict["IGM"]), | |
| 213 make_link(links["IGE"], count_dict["IGE"]), | |
| 214 make_link(links["unmatched"], count_dict["unmatched"]), | |
| 215 iga_count, | |
| 216 igg_count, | |
| 217 count_dict["IGM"], | |
| 218 count_dict["IGE"], | |
| 219 in_classes, | |
| 220 to_bool_str({"IGA", "IGG"}.issubset(contained_classes)), | |
| 221 to_bool_str({"IGA", "IGG", "IGM"}.issubset(contained_classes)), | |
| 222 to_bool_str({"IGA", "IGG", "IGE"}.issubset(contained_classes)), | |
| 223 to_bool_str({"IGA", "IGG", "IGM", "IGE"}.issubset(contained_classes)), | |
| 224 to_bool_str({"IGA1", "IGA2"}.issubset(contained_classes)), | |
| 225 to_bool_str({"IGG1", "IGG2"}.issubset(contained_classes)), | |
| 226 to_bool_str({"IGG1", "IGG3"}.issubset(contained_classes)), | |
| 227 to_bool_str({"IGG1", "IGG4"}.issubset(contained_classes)), | |
| 228 to_bool_str({"IGG2", "IGG3"}.issubset(contained_classes)), | |
| 229 to_bool_str({"IGG2", "IGG4"}.issubset(contained_classes)), | |
| 230 to_bool_str({"IGG3", "IGG4"}.issubset(contained_classes)), | |
| 231 to_bool_str({"IGG1", "IGG2", "IGG3"}.issubset(contained_classes)), | |
| 232 to_bool_str({"IGG2", "IGG3", "IGG4"}.issubset(contained_classes)), | |
| 233 to_bool_str({"IGG1", "IGG2", "IGG4"}.issubset(contained_classes)), | |
| 234 to_bool_str({"IGG1", "IGG3", "IGG4"}.issubset(contained_classes)), | |
| 235 to_bool_str({"IGG1", "IGG2", "IGG3", "IGG4"}.issubset(contained_classes)), | |
| 236 ] | |
| 237 main_html.write(tr(main_row)) | |
| 238 main_html.write("</table>") | |
| 239 | |
| 240 | |
| 241 def argument_parser() -> argparse.ArgumentParser: | |
| 242 parser = argparse.ArgumentParser() | |
| 243 parser.add_argument("--before-unique", help="File with the overview before unique filters") | |
| 244 parser.add_argument("--outdir", help="Output directory") | |
| 245 parser.add_argument("--empty-region-filter") | |
| 246 return parser | |
| 247 | |
| 248 | |
| 249 def main(): | |
| 250 args = argument_parser().parse_args() | |
| 251 sequence_overview(args.before_unique, | |
| 252 args.outdir, | |
| 253 args.empty_region_filter) | |
| 254 | |
| 255 | |
| 256 if __name__ == "__main__": | |
| 257 main() | 
