Mercurial > repos > cpt > cpt_xmfa_to_table
comparison cpt_convert_xmfa/xmfa2tbl.py @ 0:06d8e28d0bd7 draft default tip
Uploaded
| author | cpt |
|---|---|
| date | Fri, 10 Jun 2022 08:49:43 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:06d8e28d0bd7 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 from Bio import SeqIO | |
| 3 import sys | |
| 4 from xmfa import parse_xmfa, percent_identity | |
| 5 import argparse | |
| 6 import logging | |
| 7 import itertools | |
| 8 from collections import Counter | |
| 9 | |
| 10 logging.basicConfig(level=logging.INFO) | |
| 11 log = logging.getLogger(__name__) | |
| 12 | |
| 13 | |
| 14 def _id_tn_dict(sequences): | |
| 15 """Figure out sequence IDs AND sequence lengths from fasta file | |
| 16 """ | |
| 17 label_convert = {} | |
| 18 if sequences is not None: | |
| 19 if len(sequences) == 1: | |
| 20 for i, record in enumerate(SeqIO.parse(sequences[0], "fasta")): | |
| 21 label_convert[str(i + 1)] = {"id": record.id, "len": len(record)} | |
| 22 else: | |
| 23 for i, sequence in enumerate(sequences): | |
| 24 for record in SeqIO.parse(sequence, "fasta"): | |
| 25 label_convert[str(i + 1)] = {"id": record.id, "len": len(record)} | |
| 26 continue | |
| 27 # check for repeated sequence IDs | |
| 28 id_dupes = Counter(list(x["id"] for x in label_convert.values())) | |
| 29 for dupe in id_dupes: | |
| 30 if id_dupes[dupe] > 1: | |
| 31 log.debug("Duplicate FASTA ID Found: %s\n" % (dupe)) | |
| 32 for xmfaid in label_convert.keys(): | |
| 33 # Change the duplicate labels to have the XMFA identifer | |
| 34 # as a prefix so not to break the table generation later | |
| 35 if label_convert[xmfaid]["id"] == dupe: | |
| 36 label_convert[xmfaid]["id"] = "%s_%s" % (xmfaid, dupe) | |
| 37 | |
| 38 return label_convert | |
| 39 | |
| 40 | |
| 41 def total_similarity(xmfa_file, sequences=None, dice=False): | |
| 42 if sequences is None: | |
| 43 raise Exception("Must provide a non-zero number of sequence files") | |
| 44 | |
| 45 label_convert = _id_tn_dict(sequences) | |
| 46 lcbs = parse_xmfa(xmfa_file) | |
| 47 | |
| 48 # make a matrix based on number of sequences | |
| 49 table = {} | |
| 50 | |
| 51 for lcb in lcbs: | |
| 52 # ignore LCBs containing only one sequence | |
| 53 if len(lcb) == 0: | |
| 54 continue | |
| 55 | |
| 56 # permutations based on num sequences to compare for current LCB | |
| 57 compare_seqs = list(itertools.permutations(range(0, len(lcb)), 2)) | |
| 58 for permutation in compare_seqs: | |
| 59 (i, j) = permutation | |
| 60 similarity = percent_identity(lcb[i]["seq"], lcb[j]["seq"]) | |
| 61 | |
| 62 i_name = label_convert[lcb[i]["id"]]["id"] | |
| 63 j_name = label_convert[lcb[j]["id"]]["id"] | |
| 64 # find length of sequence in LCB | |
| 65 length_seq_lcb = lcb[i]["end"] - (lcb[i]["start"] - 1) | |
| 66 # populate table with normalized similarity value based on length_seq_lcb | |
| 67 if (i_name, j_name) not in table: | |
| 68 table[(i_name, j_name)] = 0 | |
| 69 table[(i_name, j_name)] += length_seq_lcb * similarity | |
| 70 | |
| 71 # finalize total percent similarity by dividing by length of parent sequence | |
| 72 for i in label_convert.keys(): | |
| 73 for j in label_convert.keys(): | |
| 74 i_name = label_convert[i]["id"] | |
| 75 j_name = label_convert[j]["id"] | |
| 76 if (i_name, j_name) in table: | |
| 77 if dice: | |
| 78 table[(i_name, j_name)] = ( | |
| 79 2 | |
| 80 * table[(i_name, j_name)] | |
| 81 / (label_convert[i]["len"] + label_convert[j]["len"]) | |
| 82 ) | |
| 83 else: | |
| 84 table[(i_name, j_name)] = ( | |
| 85 table[(i_name, j_name)] / label_convert[i]["len"] | |
| 86 ) | |
| 87 else: | |
| 88 table[(i_name, j_name)] = 0 | |
| 89 | |
| 90 if i_name == j_name: | |
| 91 table[(i_name, j_name)] = 100 | |
| 92 | |
| 93 # print table | |
| 94 names = [] | |
| 95 table_keys = sorted(label_convert.keys()) | |
| 96 | |
| 97 for i in table_keys: | |
| 98 names.append(label_convert[i]["id"]) | |
| 99 | |
| 100 sys.stdout.write("\t" + "\t".join(names) + "\n") | |
| 101 for j in table_keys: | |
| 102 j_key = label_convert[j]["id"] | |
| 103 sys.stdout.write(j_key) | |
| 104 for i in table_keys: | |
| 105 i_key = label_convert[i]["id"] | |
| 106 sys.stdout.write("\t%0.2f" % table[(i_key, j_key)]) | |
| 107 sys.stdout.write("\n") | |
| 108 | |
| 109 | |
| 110 if __name__ == "__main__": | |
| 111 parser = argparse.ArgumentParser( | |
| 112 description="Convert XMFA alignments to gff3", prog="xmfa2gff3" | |
| 113 ) | |
| 114 parser.add_argument("xmfa_file", type=argparse.FileType("r"), help="XMFA File") | |
| 115 parser.add_argument( | |
| 116 "sequences", | |
| 117 type=argparse.FileType("r"), | |
| 118 nargs="+", | |
| 119 help="Fasta files (in same order) passed to parent for reconstructing proper IDs", | |
| 120 ) | |
| 121 parser.add_argument( | |
| 122 "--dice", action="store_true", help="Use dice method for calculating % identity" | |
| 123 ) | |
| 124 parser.add_argument("--version", action="version", version="%(prog)s 1.0") | |
| 125 | |
| 126 args = parser.parse_args() | |
| 127 | |
| 128 total_similarity(**vars(args)) |
