Mercurial > repos > cpt > cpt_xmfa_split
diff lcb_split.py @ 1:5d9bc33ec5d3 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:54:34 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lcb_split.py Mon Jun 05 02:54:34 2023 +0000 @@ -0,0 +1,94 @@ +#!/usr/bin/env python +import argparse +import copy +import logging +import xmfa +from itertools import groupby + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + + +def split_lcb(lcb, window_size=10, threshold=0.7): + # Transpose sequence + lines = [] + max_align_num = len(lcb[0]["seq"]) + for i in range(max_align_num): + lines.append([]) + for j in range(len(lcb)): + c = lcb[j]["seq"][i] + if c != "-": + lines[i].append(j) + + count_groups = [] + for i in range(0, len(lines), window_size): + current_lines = lines[i : i + window_size] + flat_list = [a for b in current_lines for a in b] + counts = [] + for i in range(len(lcb)): + value = float(flat_list.count(i)) / window_size + if value >= threshold: + counts.append(i) + count_groups.append(counts) + + # groups = [(next(j), len(list(j)) + 1) for i, j in ] + # [([4], 2), ([2, 3, 4, 5, 6], 2), ([0, 1, 2, 3, 4, 5, 6], 14), ([0, 3], 1)] + # This says for 2 window sizes, we emit a new LCB with just [0:10] and + # [10:20] for lcb #4, then one with all but 0/1 for 2, then all for 14. + new_lcbs = [] + position = 0 + for i, j in groupby(count_groups): + tmp = list(j) + count = len(tmp) + members = tmp[0] + local_members = [] + for member in members: + tmp_member = copy.deepcopy(lcb[member]) + tmp_member["seq"] = tmp_member["seq"][ + window_size * position : window_size * (position + count) + ] + tmp_member["start"] = tmp_member["start"] + (3 * window_size * position) + tmp_member["end"] = tmp_member["start"] + (3 * window_size * count) + local_members.append(tmp_member) + if len(local_members) > 0: + new_lcbs.append(local_members) + + position += count + return new_lcbs + + +def split_lcbs(lcbs, window_size=10, threshold=100): + new_lcbs = [] + for lcb in lcbs: + new_lcbs.extend(split_lcb(lcb, window_size=window_size, threshold=threshold)) + return new_lcbs + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Split XMFA alignments", prog="xmfa2smallerXmfa" + ) + parser.add_argument("xmfa_file", type=argparse.FileType("r"), help="XMFA File") + + parser.add_argument( + "--window_size", type=int, help="Window size for analysis", default=10 + ) + parser.add_argument( + "--threshold", + type=float, + help="All genomes must meet N percent similarity", + default=0.7, + ) + + args = parser.parse_args() + + # Write + xmfa.to_xmfa( + # Split + split_lcbs( + # Parse + xmfa.parse_xmfa(args.xmfa_file), + window_size=args.window_size, + threshold=args.threshold, + ) + )