Mercurial > repos > cpt > cpt_xmfa_split
comparison lcb_split.py @ 1:5d9bc33ec5d3 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:54:34 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:21d00cf83137 | 1:5d9bc33ec5d3 |
---|---|
1 #!/usr/bin/env python | |
2 import argparse | |
3 import copy | |
4 import logging | |
5 import xmfa | |
6 from itertools import groupby | |
7 | |
8 logging.basicConfig(level=logging.INFO) | |
9 log = logging.getLogger(__name__) | |
10 | |
11 | |
12 def split_lcb(lcb, window_size=10, threshold=0.7): | |
13 # Transpose sequence | |
14 lines = [] | |
15 max_align_num = len(lcb[0]["seq"]) | |
16 for i in range(max_align_num): | |
17 lines.append([]) | |
18 for j in range(len(lcb)): | |
19 c = lcb[j]["seq"][i] | |
20 if c != "-": | |
21 lines[i].append(j) | |
22 | |
23 count_groups = [] | |
24 for i in range(0, len(lines), window_size): | |
25 current_lines = lines[i : i + window_size] | |
26 flat_list = [a for b in current_lines for a in b] | |
27 counts = [] | |
28 for i in range(len(lcb)): | |
29 value = float(flat_list.count(i)) / window_size | |
30 if value >= threshold: | |
31 counts.append(i) | |
32 count_groups.append(counts) | |
33 | |
34 # groups = [(next(j), len(list(j)) + 1) for i, j in ] | |
35 # [([4], 2), ([2, 3, 4, 5, 6], 2), ([0, 1, 2, 3, 4, 5, 6], 14), ([0, 3], 1)] | |
36 # This says for 2 window sizes, we emit a new LCB with just [0:10] and | |
37 # [10:20] for lcb #4, then one with all but 0/1 for 2, then all for 14. | |
38 new_lcbs = [] | |
39 position = 0 | |
40 for i, j in groupby(count_groups): | |
41 tmp = list(j) | |
42 count = len(tmp) | |
43 members = tmp[0] | |
44 local_members = [] | |
45 for member in members: | |
46 tmp_member = copy.deepcopy(lcb[member]) | |
47 tmp_member["seq"] = tmp_member["seq"][ | |
48 window_size * position : window_size * (position + count) | |
49 ] | |
50 tmp_member["start"] = tmp_member["start"] + (3 * window_size * position) | |
51 tmp_member["end"] = tmp_member["start"] + (3 * window_size * count) | |
52 local_members.append(tmp_member) | |
53 if len(local_members) > 0: | |
54 new_lcbs.append(local_members) | |
55 | |
56 position += count | |
57 return new_lcbs | |
58 | |
59 | |
60 def split_lcbs(lcbs, window_size=10, threshold=100): | |
61 new_lcbs = [] | |
62 for lcb in lcbs: | |
63 new_lcbs.extend(split_lcb(lcb, window_size=window_size, threshold=threshold)) | |
64 return new_lcbs | |
65 | |
66 | |
67 if __name__ == "__main__": | |
68 parser = argparse.ArgumentParser( | |
69 description="Split XMFA alignments", prog="xmfa2smallerXmfa" | |
70 ) | |
71 parser.add_argument("xmfa_file", type=argparse.FileType("r"), help="XMFA File") | |
72 | |
73 parser.add_argument( | |
74 "--window_size", type=int, help="Window size for analysis", default=10 | |
75 ) | |
76 parser.add_argument( | |
77 "--threshold", | |
78 type=float, | |
79 help="All genomes must meet N percent similarity", | |
80 default=0.7, | |
81 ) | |
82 | |
83 args = parser.parse_args() | |
84 | |
85 # Write | |
86 xmfa.to_xmfa( | |
87 # Split | |
88 split_lcbs( | |
89 # Parse | |
90 xmfa.parse_xmfa(args.xmfa_file), | |
91 window_size=args.window_size, | |
92 threshold=args.threshold, | |
93 ) | |
94 ) |