view cpt_gbk_compare/ @ 0:cae700761678 draft

author cpt
date Fri, 01 Jul 2022 13:47:46 +0000
line wrap: on
line source

#!/usr/bin/env python3
Copyright 2019 Ryan Wick (
This program is free software: you can redistribute it and/or modify it under the terms of the GNU
General Public License as published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License along with this program.  If not,
see <>.

import argparse
from Bio import SeqIO
from Bio import pairwise2
from Bio.pairwise2 import format_alignment
import itertools
import sys

def addArr(arrA, arrB):
    res = []
    for x in range(0, min(len(arrA), len(arrB))):
      res.append(arrA[x] + arrB[x])
    return res

def get_arguments():
    parser = argparse.ArgumentParser(description='Compare GenBank annotations',

    parser.add_argument('annotation_1', type=str,
                        help='First annotated genome in Genbank format')
    parser.add_argument('annotation_2', type=str,
                        help='Second annotated genome in Genbank format')

    parser.add_argument('--match_identity_threshold', type=float, default=0.7,
                        help='Two genes must have at least this identity to be considerd the same (0.0 to 1.0)')
    parser.add_argument('--allowed_skipped_genes', type=int, default=10,
                        help='This many missing genes are allowed when aligning the annotations')
    parser.add_argument("--addNotes", action="store_true", help="Add Note fields")

        "-sumOut", type=argparse.FileType("w"), help="Summary out file"
    args = parser.parse_args()
    return args

def main():
    args = get_arguments()

    # Load in the CDS features from the two assemblies.
    old = SeqIO.parse(args.annotation_1, 'genbank')
    new = SeqIO.parse(args.annotation_2, 'genbank')
    old_record = next(old)
    new_record = next(new)
    old_features, new_features = [], []
    for f in old_record.features:
        if f.type == 'CDS':
    for f in new_record.features:
        if f.type == 'CDS':

    args.sumOut.write('Features in First Genbank\'s assembly:\t' + str(len(old_features)) + "\n")
    args.sumOut.write('Features in Second Genbank\'s assembly:\t' + str(len(new_features)) + "\n\n")

    # Align the features to each other.
    offsets = sorted(list(itertools.product(range(args.allowed_skipped_genes + 1),
                                            range(args.allowed_skipped_genes + 1))),
                     key=lambda x: x[0]+x[1])
    old_i, new_i = 0, 0
    exactRec = 0
    inexactRec = [0, 0 ,0]
    hypoRec = [0, 0, 0]
    newCount = 0
    oldCount = 0
    if args.addNotes:
      print("First Record CDS Product\tSimilarity\tSecond Record CDS Product\tPercent Identity\tLength Difference\tFirst Gbk's CDS Location\tSecond Gbk's CDS Location\tHypothetical Status\tFirst Record's Notes\tSecond Record's Notes\n")
      print("First Record CDS Product\tSimilarity\tSecond Record CDS Product\tPercent Identity\tLength Difference\tFirst Gbk's CDS Location\tSecond Gbk's CDS Location\tHypothetical Status\n")
    while True:
        if old_i >= len(old_features) and new_i >= len(new_features):

        for old_offset, new_offset in offsets:
                old_feature = old_features[old_i + old_offset]
            except IndexError:
                old_feature = None
                new_feature = new_features[new_i + new_offset]
            except IndexError:
                new_feature = None
                match, identity, length_diff = compare_features(old_feature, new_feature, old_record, new_record, args.match_identity_threshold)
            except TypeError:
            if match:
                for j in range(old_offset):
                    print_in_old_not_new(old_features[old_i + j])
                    oldCount += 1
                for j in range(new_offset):
                    print_in_new_not_old(new_features[new_i + j])
                    newCount += 1
                if identity == 1.0:
                  exactRec += 1
                res1, res2 = print_match(old_features[old_i + old_offset], new_features[new_i + new_offset], identity, length_diff, args.addNotes)
                inexactRec = addArr(inexactRec, res1)
                hypoRec = addArr(hypoRec, res2)
                old_i += old_offset
                new_i += new_offset
            sys.stderr.write("Exceeded allowed number of skipped genes (" + str(args.allowed_skipped_genes) + "), unable to maintain alignment and continue comparison.\n")

        if old_feature is None and new_feature is None:

        old_i += 1
        new_i += 1

    args.sumOut.write('Exact Match:\t' + str(exactRec) + "\n\n")

    args.sumOut.write('Inexact Match:\t' + str(inexactRec[0] + inexactRec[1] + inexactRec[2]) + "\n")
    args.sumOut.write('  Same length:\t' + str(inexactRec[0]) + "\n")
    args.sumOut.write('  Second Gbk Seq longer:\t' + str(inexactRec[2]) + "\n")
    args.sumOut.write('  First Gbk Seq longer:\t' + str(inexactRec[1]) + "\n\n")
    args.sumOut.write('In Second Gbk but not in first:\t' + str(newCount) + "\n")
    args.sumOut.write('In First Gbk but not in second:\t' + str(oldCount) + "\n\n")

    args.sumOut.write('Hypothetical Annotation Change:\t' + str(hypoRec[1] + hypoRec[2]) + "\n")
    args.sumOut.write('Hypothetical:\t' + str(hypoRec[0] + hypoRec[2]) + "\n")

def print_match(f1, f2, identity, length_diff, outNotes):
    #print('', flush=True)
    line = f1.qualifiers['product'][0] + "\t"
    matchArr = [0, 0, 0]
    hypoArr = [0, 0, 0]
    if identity == 1.0:
#        print('Exact match')
        line += 'Exact match\t' + f2.qualifiers['product'][0] + "\t100.0\tSame Length\t"
#        print('Inexact match (' + '%.2f' % (identity * 100.0) + '% ID, ', end='')
        line += 'Inexact match\t' + f2.qualifiers['product'][0] + "\t%.2f\t" % (identity * 100.0)
        if length_diff == 0:
#            print('same length)')
            line +="Same Length\t"
            matchArr[0] += 1
        elif length_diff > 0:
#            print('old seq longer)')
            line +="First Gbk Seq Longer\t"
            matchArr[1] += 1
        elif length_diff < 0:
#            print('new seq longer)')
            line +="Second Gbk Seq Longer\t"
            matchArr[2] += 1
#    print('  old: ', end='')
#    print_feature_one_line(f1)
    line += print_feature_one_line(f1) + "\t"
#    print('  new: ', end='')
#    print_feature_one_line(f2)
    line += print_feature_one_line(f2) + "\t"
    p1 = f1.qualifiers['product'][0].lower()
    p2 = f2.qualifiers['product'][0].lower()
    if 'hypothetical' in p1 and 'hypothetical' in p2:
#        print('  still hypothetical')
        line += "Hypothetical\t"
        hypoArr[0] += 1
    elif 'hypothetical' in p1 and 'hypothetical' not in p2:
#        print('  no longer hypothetical')
        line += "No Longer Hypothetical\t"
        hypoArr[1] += 1
    elif 'hypothetical' not in p1 and 'hypothetical' in p2:
#        print('  became hypothetical')
        line += "Became Hypothetical\t"
        hypoArr[2] += 1
        line += "'Hypothetical' not in second nor first Gbk's product tag"
    if outNotes:
      line += "\t"
      if "note" in f1.qualifiers.keys():
        for x in f1.qualifiers["note"]:
          line += x
        line += "\t"
        line += "N/A\t"
      if "note" in f2.qualifiers.keys():
        for x in f2.qualifiers["note"]:
          line += x
        line += "N/A"

    return matchArr, hypoArr

def print_in_old_not_new(f): # rename file outputs
    line = f.qualifiers['product'][0] + "\tIn First Gbk but not Second\tN/A\t0.00\t" + str(f.location.end - f.location.start) + "\t" + print_feature_one_line(f) + "\tN/A\tN/A"
#    print('')
#    print('In old but not in new:')
#    print('  ', end='')
#    print_feature_one_line(f)

def print_in_new_not_old(f): # rename file outputs
    line = "N/A\tIn Second Gbk but not First\t" + f.qualifiers['product'][0] + "\t0.00\t" + str(f.location.end - f.location.start) + "\tN/A\t" + print_feature_one_line(f) + "\tN/A"
    #print('In new but not in old:')
    #print('  ', end='')

def print_feature_one_line(f):
    #f_str = f.qualifiers['product'][0]
    f_str = ""
    strand = '+' if f.location.strand == 1 else '-'
    f_str += '(' + str(f.location.start) + '-' + str(f.location.end) + ' ' + strand + ', '
    f_str += str(f.location.end - f.location.start) + ' bp)'

def compare_features(f1, f2, r1, r2, match_identity_threshold):
    if f1 is None or f2 is None:
        return False

    s1 = f1.extract(r1).seq
    s2 = f2.extract(r2).seq
    score = pairwise2.align.globalms(s1, s2, 1, 0, 0, 0, score_only=True)
    identity = score / max(len(s1), len(s2))
    match = identity >= match_identity_threshold
    length_diff = len(s1) - len(s2)
    return match, identity, length_diff

if __name__ == '__main__':