Mercurial > repos > cpt > cpt_xmfa_split
view lcb_split.py @ 1:5d9bc33ec5d3 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:54:34 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python import argparse import copy import logging import xmfa from itertools import groupby logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) def split_lcb(lcb, window_size=10, threshold=0.7): # Transpose sequence lines = [] max_align_num = len(lcb[0]["seq"]) for i in range(max_align_num): lines.append([]) for j in range(len(lcb)): c = lcb[j]["seq"][i] if c != "-": lines[i].append(j) count_groups = [] for i in range(0, len(lines), window_size): current_lines = lines[i : i + window_size] flat_list = [a for b in current_lines for a in b] counts = [] for i in range(len(lcb)): value = float(flat_list.count(i)) / window_size if value >= threshold: counts.append(i) count_groups.append(counts) # groups = [(next(j), len(list(j)) + 1) for i, j in ] # [([4], 2), ([2, 3, 4, 5, 6], 2), ([0, 1, 2, 3, 4, 5, 6], 14), ([0, 3], 1)] # This says for 2 window sizes, we emit a new LCB with just [0:10] and # [10:20] for lcb #4, then one with all but 0/1 for 2, then all for 14. new_lcbs = [] position = 0 for i, j in groupby(count_groups): tmp = list(j) count = len(tmp) members = tmp[0] local_members = [] for member in members: tmp_member = copy.deepcopy(lcb[member]) tmp_member["seq"] = tmp_member["seq"][ window_size * position : window_size * (position + count) ] tmp_member["start"] = tmp_member["start"] + (3 * window_size * position) tmp_member["end"] = tmp_member["start"] + (3 * window_size * count) local_members.append(tmp_member) if len(local_members) > 0: new_lcbs.append(local_members) position += count return new_lcbs def split_lcbs(lcbs, window_size=10, threshold=100): new_lcbs = [] for lcb in lcbs: new_lcbs.extend(split_lcb(lcb, window_size=window_size, threshold=threshold)) return new_lcbs if __name__ == "__main__": parser = argparse.ArgumentParser( description="Split XMFA alignments", prog="xmfa2smallerXmfa" ) parser.add_argument("xmfa_file", type=argparse.FileType("r"), help="XMFA File") parser.add_argument( "--window_size", type=int, help="Window size for analysis", default=10 ) parser.add_argument( "--threshold", type=float, help="All genomes must meet N percent similarity", default=0.7, ) args = parser.parse_args() # Write xmfa.to_xmfa( # Split split_lcbs( # Parse xmfa.parse_xmfa(args.xmfa_file), window_size=args.window_size, threshold=args.threshold, ) )