Mercurial > repos > cpt > cpt_gbk_compare
view gbk_compare.py @ 2:b0ab633db780 draft default tip
planemo upload commit f33bdf952d796c5d7a240b132af3c4cbd102decc
author | cpt |
---|---|
date | Fri, 05 Jan 2024 05:51:32 +0000 |
parents | 1909729a1fd3 |
children |
line wrap: on
line source
#!/usr/bin/env python3 """ Copyright 2019 Ryan Wick (rrwick@gmail.com) https://github.com/rrwick/Compare-annotations/ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. """ import argparse from Bio import SeqIO from Bio import pairwise2 from Bio.pairwise2 import format_alignment import itertools import sys def addArr(arrA, arrB): res = [] for x in range(0, min(len(arrA), len(arrB))): res.append(arrA[x] + arrB[x]) return res def get_arguments(): parser = argparse.ArgumentParser( description="Compare GenBank annotations", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "annotation_1", type=str, help="First annotated genome in Genbank format" ) parser.add_argument( "annotation_2", type=str, help="Second annotated genome in Genbank format" ) parser.add_argument( "--match_identity_threshold", type=float, default=0.7, help="Two genes must have at least this identity to be considerd the same (0.0 to 1.0)", ) parser.add_argument( "--allowed_skipped_genes", type=int, default=10, help="This many missing genes are allowed when aligning the annotations", ) parser.add_argument("--addNotes", action="store_true", help="Add Note fields") parser.add_argument("-sumOut", type=argparse.FileType("w"), help="Summary out file") args = parser.parse_args() return args def main(): args = get_arguments() # Load in the CDS features from the two assemblies. old = SeqIO.parse(args.annotation_1, "genbank") new = SeqIO.parse(args.annotation_2, "genbank") old_record = next(old) new_record = next(new) old_features, new_features = [], [] for f in old_record.features: if f.type == "CDS": old_features.append(f) for f in new_record.features: if f.type == "CDS": new_features.append(f) args.sumOut.write( "Features in First Genbank's assembly:\t" + str(len(old_features)) + "\n" ) args.sumOut.write( "Features in Second Genbank's assembly:\t" + str(len(new_features)) + "\n\n" ) # Align the features to each other. offsets = sorted( list( itertools.product( range(args.allowed_skipped_genes + 1), range(args.allowed_skipped_genes + 1), ) ), key=lambda x: x[0] + x[1], ) old_i, new_i = 0, 0 exactRec = 0 inexactRec = [0, 0, 0] hypoRec = [0, 0, 0] newCount = 0 oldCount = 0 if args.addNotes: print( "First Record CDS Product\tSimilarity\tSecond Record CDS Product\tPercent Identity\tLength Difference\tFirst Gbk's CDS Location\tSecond Gbk's CDS Location\tHypothetical Status\tFirst Record's Notes\tSecond Record's Notes\n" ) else: print( "First Record CDS Product\tSimilarity\tSecond Record CDS Product\tPercent Identity\tLength Difference\tFirst Gbk's CDS Location\tSecond Gbk's CDS Location\tHypothetical Status\n" ) while True: if old_i >= len(old_features) and new_i >= len(new_features): break for old_offset, new_offset in offsets: try: old_feature = old_features[old_i + old_offset] except IndexError: old_feature = None try: new_feature = new_features[new_i + new_offset] except IndexError: new_feature = None try: match, identity, length_diff = compare_features( old_feature, new_feature, old_record, new_record, args.match_identity_threshold, ) except TypeError: break if match: for j in range(old_offset): print_in_old_not_new(old_features[old_i + j]) oldCount += 1 for j in range(new_offset): print_in_new_not_old(new_features[new_i + j]) newCount += 1 if identity == 1.0: exactRec += 1 res1, res2 = print_match( old_features[old_i + old_offset], new_features[new_i + new_offset], identity, length_diff, args.addNotes, ) inexactRec = addArr(inexactRec, res1) hypoRec = addArr(hypoRec, res2) old_i += old_offset new_i += new_offset break else: sys.stderr.write( "Exceeded allowed number of skipped genes (" + str(args.allowed_skipped_genes) + "), unable to maintain alignment and continue comparison.\n" ) exit(2) if old_feature is None and new_feature is None: break old_i += 1 new_i += 1 args.sumOut.write("Exact Match:\t" + str(exactRec) + "\n\n") args.sumOut.write( "Inexact Match:\t" + str(inexactRec[0] + inexactRec[1] + inexactRec[2]) + "\n" ) args.sumOut.write(" Same length:\t" + str(inexactRec[0]) + "\n") args.sumOut.write(" Second Gbk Seq longer:\t" + str(inexactRec[2]) + "\n") args.sumOut.write(" First Gbk Seq longer:\t" + str(inexactRec[1]) + "\n\n") args.sumOut.write("In Second Gbk but not in first:\t" + str(newCount) + "\n") args.sumOut.write("In First Gbk but not in second:\t" + str(oldCount) + "\n\n") args.sumOut.write( "Hypothetical Annotation Change:\t" + str(hypoRec[1] + hypoRec[2]) + "\n" ) args.sumOut.write("Hypothetical:\t" + str(hypoRec[0] + hypoRec[2]) + "\n") def print_match(f1, f2, identity, length_diff, outNotes): # print('', flush=True) line = f1.qualifiers["product"][0] + "\t" matchArr = [0, 0, 0] hypoArr = [0, 0, 0] if identity == 1.0: # print('Exact match') line += "Exact match\t" + f2.qualifiers["product"][0] + "\t100.0\tSame Length\t" else: # print('Inexact match (' + '%.2f' % (identity * 100.0) + '% ID, ', end='') line += ( "Inexact match\t" + f2.qualifiers["product"][0] + "\t%.2f\t" % (identity * 100.0) ) if length_diff == 0: # print('same length)') line += "Same Length\t" matchArr[0] += 1 elif length_diff > 0: # print('old seq longer)') line += "First Gbk Seq Longer\t" matchArr[1] += 1 elif length_diff < 0: # print('new seq longer)') line += "Second Gbk Seq Longer\t" matchArr[2] += 1 # print(' old: ', end='') # print_feature_one_line(f1) line += print_feature_one_line(f1) + "\t" # print(' new: ', end='') # print_feature_one_line(f2) line += print_feature_one_line(f2) + "\t" p1 = f1.qualifiers["product"][0].lower() p2 = f2.qualifiers["product"][0].lower() if "hypothetical" in p1 and "hypothetical" in p2: # print(' still hypothetical') line += "Hypothetical\t" hypoArr[0] += 1 elif "hypothetical" in p1 and "hypothetical" not in p2: # print(' no longer hypothetical') line += "No Longer Hypothetical\t" hypoArr[1] += 1 elif "hypothetical" not in p1 and "hypothetical" in p2: # print(' became hypothetical') line += "Became Hypothetical\t" hypoArr[2] += 1 else: line += "'Hypothetical' not in second nor first Gbk's product tag" if outNotes: line += "\t" if "note" in f1.qualifiers.keys(): for x in f1.qualifiers["note"]: line += x line += "\t" else: line += "N/A\t" if "note" in f2.qualifiers.keys(): for x in f2.qualifiers["note"]: line += x else: line += "N/A" print(line) return matchArr, hypoArr def print_in_old_not_new(f): # rename file outputs line = ( f.qualifiers["product"][0] + "\tIn First Gbk but not Second\tN/A\t0.00\t" + str(f.location.end - f.location.start) + "\t" + print_feature_one_line(f) + "\tN/A\tN/A" ) # print('') # print('In old but not in new:') # print(' ', end='') # print_feature_one_line(f) print(line) def print_in_new_not_old(f): # rename file outputs line = ( "N/A\tIn Second Gbk but not First\t" + f.qualifiers["product"][0] + "\t0.00\t" + str(f.location.end - f.location.start) + "\tN/A\t" + print_feature_one_line(f) + "\tN/A" ) # print('') # print('In new but not in old:') # print(' ', end='') # print_feature_one_line(f) print(line) def print_feature_one_line(f): # f_str = f.qualifiers['product'][0] f_str = "" strand = "+" if f.location.strand == 1 else "-" f_str += ( "(" + str(f.location.start) + "-" + str(f.location.end) + " " + strand + ", " ) f_str += str(f.location.end - f.location.start) + " bp)" return f_str def compare_features(f1, f2, r1, r2, match_identity_threshold): if f1 is None or f2 is None: return False s1 = f1.extract(r1).seq s2 = f2.extract(r2).seq score = pairwise2.align.globalms(s1, s2, 1, 0, 0, 0, score_only=True) identity = score / max(len(s1), len(s2)) match = identity >= match_identity_threshold length_diff = len(s1) - len(s2) return match, identity, length_diff if __name__ == "__main__": main()