Mercurial > repos > cpt > cpt_compare_gbk
diff cpt_gbk_compare/gbk_compare.py @ 0:fc603e665d75 draft default tip
Uploaded
author | cpt |
---|---|
date | Tue, 21 Jun 2022 19:46:32 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cpt_gbk_compare/gbk_compare.py Tue Jun 21 19:46:32 2022 +0000 @@ -0,0 +1,245 @@ +#!/usr/bin/env python3 +""" +Copyright 2019 Ryan Wick (rrwick@gmail.com) +https://github.com/rrwick/Compare-annotations/ +This program is free software: you can redistribute it and/or modify it under the terms of the GNU +General Public License as published by the Free Software Foundation, either version 3 of the +License, or (at your option) any later version. +This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without +even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. +You should have received a copy of the GNU General Public License along with this program. If not, +see <https://www.gnu.org/licenses/>. +""" + +import argparse +from Bio import SeqIO +from Bio import pairwise2 +from Bio.pairwise2 import format_alignment +import itertools +import sys + + +def addArr(arrA, arrB): + res = [] + for x in range(0, min(len(arrA), len(arrB))): + res.append(arrA[x] + arrB[x]) + return res + +def get_arguments(): + parser = argparse.ArgumentParser(description='Compare GenBank annotations', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument('annotation_1', type=str, + help='First annotated genome in Genbank format') + parser.add_argument('annotation_2', type=str, + help='Second annotated genome in Genbank format') + + + parser.add_argument('--match_identity_threshold', type=float, default=0.7, + help='Two genes must have at least this identity to be considerd the same (0.0 to 1.0)') + parser.add_argument('--allowed_skipped_genes', type=int, default=10, + help='This many missing genes are allowed when aligning the annotations') + parser.add_argument("--addNotes", action="store_true", help="Add Note fields") + + parser.add_argument( + "-sumOut", type=argparse.FileType("w"), help="Summary out file" + ) + args = parser.parse_args() + return args + + +def main(): + args = get_arguments() + + # Load in the CDS features from the two assemblies. + old = SeqIO.parse(args.annotation_1, 'genbank') + new = SeqIO.parse(args.annotation_2, 'genbank') + old_record = next(old) + new_record = next(new) + old_features, new_features = [], [] + for f in old_record.features: + if f.type == 'CDS': + old_features.append(f) + for f in new_record.features: + if f.type == 'CDS': + new_features.append(f) + + args.sumOut.write('Features in First Genbank\'s assembly:\t' + str(len(old_features)) + "\n") + args.sumOut.write('Features in Second Genbank\'s assembly:\t' + str(len(new_features)) + "\n\n") + + # Align the features to each other. + offsets = sorted(list(itertools.product(range(args.allowed_skipped_genes + 1), + range(args.allowed_skipped_genes + 1))), + key=lambda x: x[0]+x[1]) + old_i, new_i = 0, 0 + exactRec = 0 + inexactRec = [0, 0 ,0] + hypoRec = [0, 0, 0] + newCount = 0 + oldCount = 0 + if args.addNotes: + print("First Record CDS Product\tSimilarity\tSecond Record CDS Product\tPercent Identity\tLength Difference\tFirst Gbk's CDS Location\tSecond Gbk's CDS Location\tHypothetical Status\tFirst Record's Notes\tSecond Record's Notes\n") + else: + print("First Record CDS Product\tSimilarity\tSecond Record CDS Product\tPercent Identity\tLength Difference\tFirst Gbk's CDS Location\tSecond Gbk's CDS Location\tHypothetical Status\n") + while True: + if old_i >= len(old_features) and new_i >= len(new_features): + break + + for old_offset, new_offset in offsets: + try: + old_feature = old_features[old_i + old_offset] + except IndexError: + old_feature = None + try: + new_feature = new_features[new_i + new_offset] + except IndexError: + new_feature = None + try: + match, identity, length_diff = compare_features(old_feature, new_feature, old_record, new_record, args.match_identity_threshold) + except TypeError: + break + if match: + for j in range(old_offset): + print_in_old_not_new(old_features[old_i + j]) + oldCount += 1 + for j in range(new_offset): + print_in_new_not_old(new_features[new_i + j]) + newCount += 1 + if identity == 1.0: + exactRec += 1 + res1, res2 = print_match(old_features[old_i + old_offset], new_features[new_i + new_offset], identity, length_diff, args.addNotes) + inexactRec = addArr(inexactRec, res1) + hypoRec = addArr(hypoRec, res2) + old_i += old_offset + new_i += new_offset + break + else: + sys.stderr.write("Exceeded allowed number of skipped genes (" + str(args.allowed_skipped_genes) + "), unable to maintain alignment and continue comparison.\n") + exit(2) + + if old_feature is None and new_feature is None: + break + + old_i += 1 + new_i += 1 + + args.sumOut.write('Exact Match:\t' + str(exactRec) + "\n\n") + + args.sumOut.write('Inexact Match:\t' + str(inexactRec[0] + inexactRec[1] + inexactRec[2]) + "\n") + args.sumOut.write(' Same length:\t' + str(inexactRec[0]) + "\n") + args.sumOut.write(' Second Gbk Seq longer:\t' + str(inexactRec[2]) + "\n") + args.sumOut.write(' First Gbk Seq longer:\t' + str(inexactRec[1]) + "\n\n") + + args.sumOut.write('In Second Gbk but not in first:\t' + str(newCount) + "\n") + args.sumOut.write('In First Gbk but not in second:\t' + str(oldCount) + "\n\n") + + args.sumOut.write('Hypothetical Annotation Change:\t' + str(hypoRec[1] + hypoRec[2]) + "\n") + args.sumOut.write('Hypothetical:\t' + str(hypoRec[0] + hypoRec[2]) + "\n") + + +def print_match(f1, f2, identity, length_diff, outNotes): + #print('', flush=True) + line = f1.qualifiers['product'][0] + "\t" + matchArr = [0, 0, 0] + hypoArr = [0, 0, 0] + if identity == 1.0: +# print('Exact match') + line += 'Exact match\t' + f2.qualifiers['product'][0] + "\t100.0\tSame Length\t" + else: +# print('Inexact match (' + '%.2f' % (identity * 100.0) + '% ID, ', end='') + line += 'Inexact match\t' + f2.qualifiers['product'][0] + "\t%.2f\t" % (identity * 100.0) + if length_diff == 0: +# print('same length)') + line +="Same Length\t" + matchArr[0] += 1 + elif length_diff > 0: +# print('old seq longer)') + line +="First Gbk Seq Longer\t" + matchArr[1] += 1 + elif length_diff < 0: +# print('new seq longer)') + line +="Second Gbk Seq Longer\t" + matchArr[2] += 1 +# print(' old: ', end='') +# print_feature_one_line(f1) + line += print_feature_one_line(f1) + "\t" +# print(' new: ', end='') +# print_feature_one_line(f2) + line += print_feature_one_line(f2) + "\t" + p1 = f1.qualifiers['product'][0].lower() + p2 = f2.qualifiers['product'][0].lower() + if 'hypothetical' in p1 and 'hypothetical' in p2: +# print(' still hypothetical') + line += "Hypothetical\t" + hypoArr[0] += 1 + elif 'hypothetical' in p1 and 'hypothetical' not in p2: +# print(' no longer hypothetical') + line += "No Longer Hypothetical\t" + hypoArr[1] += 1 + elif 'hypothetical' not in p1 and 'hypothetical' in p2: +# print(' became hypothetical') + line += "Became Hypothetical\t" + hypoArr[2] += 1 + else: + line += "'Hypothetical' not in second nor first Gbk's product tag" + + if outNotes: + line += "\t" + if "note" in f1.qualifiers.keys(): + for x in f1.qualifiers["note"]: + line += x + line += "\t" + else: + line += "N/A\t" + if "note" in f2.qualifiers.keys(): + for x in f2.qualifiers["note"]: + line += x + else: + line += "N/A" + + print(line) + return matchArr, hypoArr + +def print_in_old_not_new(f): # rename file outputs + line = f.qualifiers['product'][0] + "\tIn First Gbk but not Second\tN/A\t0.00\t" + str(f.location.end - f.location.start) + "\t" + print_feature_one_line(f) + "\tN/A\tN/A" +# print('') +# print('In old but not in new:') +# print(' ', end='') +# print_feature_one_line(f) + print(line) + + +def print_in_new_not_old(f): # rename file outputs + line = "N/A\tIn Second Gbk but not First\t" + f.qualifiers['product'][0] + "\t0.00\t" + str(f.location.end - f.location.start) + "\tN/A\t" + print_feature_one_line(f) + "\tN/A" + #print('') + #print('In new but not in old:') + #print(' ', end='') + #print_feature_one_line(f) + print(line) + + +def print_feature_one_line(f): + #f_str = f.qualifiers['product'][0] + f_str = "" + strand = '+' if f.location.strand == 1 else '-' + f_str += '(' + str(f.location.start) + '-' + str(f.location.end) + ' ' + strand + ', ' + f_str += str(f.location.end - f.location.start) + ' bp)' + return(f_str) + + +def compare_features(f1, f2, r1, r2, match_identity_threshold): + if f1 is None or f2 is None: + return False + + s1 = f1.extract(r1).seq + s2 = f2.extract(r2).seq + score = pairwise2.align.globalms(s1, s2, 1, 0, 0, 0, score_only=True) + identity = score / max(len(s1), len(s2)) + match = identity >= match_identity_threshold + length_diff = len(s1) - len(s2) + return match, identity, length_diff + + +if __name__ == '__main__': + main()