Mercurial > repos > cpt > cpt_phageqc_annotations
diff cpt_phageqc_annotation/phage_annotation_validator.py @ 0:c3140b08d703 draft default tip
Uploaded
author | cpt |
---|---|
date | Fri, 17 Jun 2022 13:00:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cpt_phageqc_annotation/phage_annotation_validator.py Fri Jun 17 13:00:50 2022 +0000 @@ -0,0 +1,1254 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim: set fileencoding=utf-8 +import os +import sys +import json +import math +import numpy +import argparse +import itertools +import logging +from gff3 import ( + feature_lambda, + coding_genes, + genes, + get_gff3_id, + feature_test_location, + get_rbs_from, + nice_name, +) +from shinefind import NaiveSDCaller +from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature +from Bio import SeqIO +from Bio.SeqRecord import SeqRecord +from Bio.SeqFeature import SeqFeature, FeatureLocation +from jinja2 import Environment, FileSystemLoader +from cpt import MGAFinder + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger(name="pav") + +# Path to script, required because of Galaxy. +SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__)) +# Path to the HTML template for the report + +ENCOURAGEMENT = ( + (100, "Perfection itself!"), + (90, "Amazing!"), + (80, "Not too bad, a few minor things to fix..."), + (70, "Some issues to address"), + ( + 50, + """Issues detected! </p><p class="text-muted">Have you heard of the + <a href="https://cpt.tamu.edu">CPT</a>\'s Automated Phage Annotation + Pipeline?""", + ), + ( + 0, + """<b>MAJOR</b> issues detected! Please consider using the + <a href="https://cpt.tamu.edu">CPT</a>\'s Automated Phage Annotation Pipeline""", + ), +) + + +def gen_qc_feature(start, end, message, strand=0, id_src=None, type_src="gene"): + kwargs = {"qualifiers": {"note": [message]}} + kwargs["type"] = type_src + kwargs["strand"] = strand + kwargs["phase"]=0 + kwargs["score"]=0.0 + kwargs["source"]="feature" + if id_src is not None: + kwargs["id"] = id_src.id + kwargs["qualifiers"]["ID"] = [id_src.id] + kwargs["qualifiers"]["Name"] = id_src.qualifiers.get("Name", []) + + + if end >= start: + return gffSeqFeature(FeatureLocation(start, end, strand=strand), **kwargs) + else: + return gffSeqFeature(FeatureLocation(end, start, strand=strand), **kwargs) + + +def __ensure_location_in_bounds(start=0, end=0, parent_length=0): + # This prevents frameshift errors + while start < 0: + start += 3 + while end < 0: + end += 3 + while start > parent_length: + start -= 3 + while end > parent_length: + end -= 3 + return (start, end) + + +def missing_rbs(record, lookahead_min=5, lookahead_max=15): + """ + Identify gene features with missing RBSs + + This "looks ahead" 5-15 bases ahead of each gene feature, and checks if + there's an RBS feature in those bounds. + + The returned data is a set of genes with the RBS sequence in the __upstream + attribute, and a message in the __message attribute. + """ + results = [] + good = 0 + bad = 0 + qc_features = [] + sd_finder = NaiveSDCaller() + + any_rbss = False + + for gene in coding_genes(record.features): + # Check if there are RBSs, TODO: make this recursive. Each feature in + # gene.sub_features can also have sub_features. + rbss = get_rbs_from(gene) + # No RBS found + if len(rbss) == 0: + # Get the sequence lookahead_min to lookahead_max upstream + if gene.strand > 0: + start = gene.location.start - lookahead_max + end = gene.location.start - lookahead_min + else: + start = gene.location.end + lookahead_min + end = gene.location.end + lookahead_max + # We have to ensure the feature is ON the genome, otherwise we may + # be trying to access a location outside of the length of the + # genome, which would be bad. + (start, end) = __ensure_location_in_bounds( + start=start, end=end, parent_length=len(record) + ) + # Temporary feature to extract sequence + tmp = gffSeqFeature( + FeatureLocation(start, end, strand=gene.strand), type="domain" + ) + # Get the sequence + seq = str(tmp.extract(record.seq)) + # Set the default properties + gene.__upstream = seq.lower() + gene.__message = "No RBS annotated, None found" + + # Try and do an automated shinefind call + sds = sd_finder.list_sds(seq) + if len(sds) > 0: + sd = sds[0] + gene.__upstream = sd_finder.highlight_sd( + seq.lower(), sd["start"], sd["end"] + ) + gene.__message = "Unannotated but valid RBS" + + qc_features.append( + gen_qc_feature( + start, end, "Missing RBS", strand=gene.strand, id_src=gene, type_src="gene" + ) + ) + + bad += 1 + results.append(gene) + results[-1].location = FeatureLocation(results[-1].location.start + 1, results[-1].location.end, results[-1].location.strand) + else: + if len(rbss) > 1: + log.warn("%s RBSs found for gene %s", rbss[0].id, get_gff3_id(gene)) + any_rbss = True + # get first RBS/CDS + cds = list(genes(gene.sub_features, feature_type="CDS"))[0] + rbs = rbss[0] + + # Get the distance between the two + if gene.strand > 0: + distance = cds.location.start - rbs.location.end + else: + distance = rbs.location.start - cds.location.end + + # If the RBS is too far away, annotate that + if distance > lookahead_max: + gene.__message = "RBS too far away (%s nt)" % distance + + qc_features.append( + gen_qc_feature( + rbs.location.start, + rbs.location.end, + gene.__message, + strand=gene.strand, + id_src=gene, + type_src="gene" + ) + ) + + bad += 1 + results.append(gene) + results[-1].location = FeatureLocation(results[-1].location.start + 1, results[-1].location.end, results[-1].location.strand) + else: + good += 1 + + return good, bad, results, qc_features, any_rbss + + +# modified from get_orfs_or_cdss.py +# ----------------------------------------------------------- + + +def require_sd(data, record, chrom_start, sd_min, sd_max): + sd_finder = NaiveSDCaller() + for putative_gene in data: + if putative_gene[2] > 0: # strand + start = chrom_start + putative_gene[0] - sd_max + end = chrom_start + putative_gene[0] - sd_min + else: + start = chrom_start + putative_gene[1] + sd_min + end = chrom_start + putative_gene[1] + sd_max + + (start, end) = __ensure_location_in_bounds( + start=start, end=end, parent_length=len(record) + ) + tmp = gffSeqFeature( + FeatureLocation(start, end, strand=putative_gene[2]), type="domain" + ) + # Get the sequence + seq = str(tmp.extract(record.seq)) + sds = sd_finder.list_sds(seq) + if len(sds) > 0: + yield putative_gene + (start, end) + + +def excessive_gap( + record, + excess=50, + excess_divergent=200, + min_gene=30, + slop=30, + lookahead_min=5, + lookahead_max=15, +): + """ + Identify excessive gaps between gene features. + + Default "excessive" gap size is 10, but that should likely be larger. + """ + results = [] + good = 0 + bad = 0 + + contiguous_regions = [] + + sorted_genes = sorted( + genes(record.features), key=lambda feature: feature.location.start + ) + if len(sorted_genes) == 0: + log.warn("NO GENES FOUND") + return good, bad, results, [] + + current_gene = None + for gene in sorted_genes: + # If the gene's start is contiguous to the "current_gene", then we + # extend current_gene + for cds in genes(gene.sub_features, feature_type="CDS"): + if current_gene is None: + current_gene = [int(cds.location.start), int(cds.location.end)] + + if cds.location.start <= current_gene[1] + excess: + # Don't want to decrease size + if int(cds.location.end) >= current_gene[1]: + current_gene[1] = int(cds.location.end) + else: + # If it's discontiguous, we append the region and clear. + contiguous_regions.append(current_gene) + current_gene = [int(cds.location.start), int(cds.location.end)] + + # This generally expected that annotations would NOT continue unto the end + # of the genome, however that's a bug, and we can make it here with an + # empty contiguous_regions list + contiguous_regions.append(current_gene) + + for i in range(len(contiguous_regions) + 1): + if i == 0: + a = (1, 1) + b = contiguous_regions[i] + elif i >= len(contiguous_regions): + a = contiguous_regions[i - 1] + b = (len(record.seq), None) + else: + a = contiguous_regions[i - 1] + b = contiguous_regions[i] + + gap_size = abs(b[0] - a[1]) + + if gap_size > min(excess, excess_divergent): + a_feat_l = itertools.islice( + feature_lambda( + sorted_genes, + feature_test_location, + {"loc": a[1]}, + subfeatures=False, + ), + 1, + ) + b_feat_l = itertools.islice( + feature_lambda( + sorted_genes, + feature_test_location, + {"loc": b[0]}, + subfeatures=False, + ), + 1, + ) + + try: + a_feat = next(a_feat_l) + except StopIteration: + # Triggers on end of genome + a_feat = None + try: + b_feat = next(b_feat_l) + except StopIteration: + # Triggers on end of genome + b_feat = None + + result_obj = [ + a[1], + b[0], + None if not a_feat else a_feat.location.strand, + None if not b_feat else b_feat.location.strand, + ] + + if a_feat is None or b_feat is None: + if gap_size > excess_divergent: + results.append(result_obj) + else: + if ( + a_feat.location.strand == b_feat.location.strand + and gap_size > excess + ): + results.append(result_obj) + elif ( + a_feat.location.strand != b_feat.location.strand + and gap_size > excess_divergent + ): + results.append(result_obj) + + better_results = [] + qc_features = [] + of = MGAFinder(11, "CDS", "closed", min_gene) + # of = OrfFinder(11, 'CDS', 'closed', min_gene) + + for result_obj in results: + start = result_obj[0] + end = result_obj[1] + f = gen_qc_feature(start, end, "Excessive gap, %s bases" % abs(end - start), type_src="gene") + qc_features.append(f) + putative_genes = of.putative_genes_in_sequence( + str(record[start - slop : end + slop].seq) + ) + putative_genes = list( + require_sd(putative_genes, record, start, lookahead_min, lookahead_max) + ) + for putative_gene in putative_genes: + # (0, 33, 1, 'ATTATTTTATCAAAACGCTTTACAATCTTTTAG', 'MILSKRFTIF', 123123, 124324) + possible_gene_start = start + putative_gene[0] + possible_gene_end = start + putative_gene[1] + + if possible_gene_start <= possible_gene_end: + possible_cds = gffSeqFeature( + FeatureLocation( + possible_gene_start, possible_gene_end, strand=putative_gene[2] + ), + type="CDS", + ) + else: + possible_cds = gffSeqFeature( + FeatureLocation( + possible_gene_end, possible_gene_start, strand=putative_gene[2], + ), + type="CDS", + ) + + # Now we adjust our boundaries for the RBS that's required + # There are only two cases, the rbs is upstream of it, or downstream + if putative_gene[5] < possible_gene_start: + possible_gene_start = putative_gene[5] + else: + possible_gene_end = putative_gene[6] + + if putative_gene[5] <= putative_gene[6]: + possible_rbs = gffSeqFeature( + FeatureLocation( + putative_gene[5], putative_gene[6], strand=putative_gene[2] + ), + type="Shine_Dalgarno_sequence", + ) + else: + possible_rbs = gffSeqFeature( + FeatureLocation( + putative_gene[6], putative_gene[5], strand=putative_gene[2], + ), + type="Shine_Dalgarno_sequence", + ) + + if possible_gene_start <= possible_gene_end: + possible_gene = gffSeqFeature( + FeatureLocation( + possible_gene_start, possible_gene_end, strand=putative_gene[2] + ), + type="gene", + qualifiers={"note": ["Possible gene"]}, + ) + else: + possible_gene = gffSeqFeature( + FeatureLocation( + possible_gene_end, possible_gene_start, strand=putative_gene[2], + ), + type="gene", + qualifiers={"note": ["Possible gene"]}, + ) + possible_gene.sub_features = [possible_rbs, possible_cds] + qc_features.append(possible_gene) + + better_results.append(result_obj + [len(putative_genes)]) + + # Bad gaps are those with more than zero possible genes found + bad = len([x for x in better_results if x[2] > 0]) + # Generally taking "good" here as every possible gap in the genome + # Thus, good is TOTAL - gaps + good = len(sorted_genes) + 1 - bad + # and bad is just gaps + return good, bad, better_results, qc_features + + +def phi(x): + """Standard phi function used in calculation of normal distribution""" + return math.exp(-1 * math.pi * x * x) + + +def norm(x, mean=0, sd=1): + """ + Normal distribution. Given an x position, a mean, and a standard + deviation, calculate the "y" value. Useful for score scaling + + Modified to multiply by SD. This means even at sd=5, norm(x, mean) where x = mean => 1, rather than 1/5. + """ + return (1 / float(sd)) * phi(float(x - mean) / float(sd)) * sd + + +def coding_density(record, mean=92.5, sd=20): + """ + Find coding density in the genome + """ + feature_lengths = 0 + + for gene_a in coding_genes(record.features): + feature_lengths += sum( + [len(x) for x in genes(gene_a.sub_features, feature_type="CDS")] + ) + + avgFeatLen = float(feature_lengths) / float(len(record.seq)) + return int(norm(100 * avgFeatLen, mean=mean, sd=sd) * 100), int(100 * avgFeatLen) + + +def exact_coding_density(record, mean=92.5, sd=20): + """ + Find exact coding density in the genome + """ + data = numpy.zeros(len(record.seq)) + + for gene_a in coding_genes(record.features): + for cds in genes(gene_a.sub_features, feature_type="CDS"): + for i in range(cds.location.start, cds.location.end + 1): + data[i - 1] = 1 + + return float(sum(data)) / len(data) + + +def excessive_overlap(record, excess=15, excess_divergent=30): + """ + Find excessive overlaps in the genome, where excessive is defined as 15 + bases for same strand, and 30 for divergent translation. + + Does a product of all the top-level features in the genome, and calculates + gaps. + """ + results = [] + bad = 0 + qc_features = [] + + for (gene_a, gene_b) in itertools.combinations(coding_genes(record.features), 2): + # Get the CDS from the subfeature list. + # TODO: not recursive. + cds_a = [x for x in genes(gene_a.sub_features, feature_type="CDS")] + cds_b = [x for x in genes(gene_b.sub_features, feature_type="CDS")] + + if len(cds_a) == 0: + log.warn("Gene missing subfeatures; %s", get_gff3_id(gene_a)) + continue + + if len(cds_b) == 0: + log.warn("Gene missing subfeatures; %s", get_gff3_id(gene_b)) + continue + + cds_a = cds_a[0] + cds_b = cds_b[0] + + # Set of locations that are included in the CDS of A and the + # CDS of B + cas = set(range(cds_a.location.start, cds_a.location.end)) + cbs = set(range(cds_b.location.start, cds_b.location.end)) + + # Here we calculate the intersection between the two sets, and + # if it's larger than our excessive size, we know that they're + # overlapped + ix = cas.intersection(cbs) + + if (cds_a.location.strand == cds_b.location.strand and len(ix) >= excess) or ( + cds_a.location.strand != cds_b.location.strand + and len(ix) >= excess_divergent + ): + bad += float(len(ix)) / float(min(excess, excess_divergent)) + qc_features.append( + gen_qc_feature(min(ix), max(ix), "Excessive Overlap", id_src=gene_a, type_src="gene") + ) + results.append((gene_a, gene_b, min(ix), max(ix))) + + # Good isn't accurate here. It's a triangle number and just ugly, but we + # don't care enough to fix it. + good = len(list(coding_genes(record.features))) + good = int(good - bad) + if good < 0: + good = 0 + return good, int(bad), results, qc_features + + +def get_encouragement(score): + """Some text telling the user how they did + """ + for encouragement in ENCOURAGEMENT: + if score > encouragement[0]: + return encouragement[1] + return ENCOURAGEMENT[-1][1] + + +def genome_overview(record): + """Genome overview + """ + data = { + "genes": { + "count": 0, + "bases": len(record.seq), + "density": 0, # genes / kb + "avg_len": [], + "comp": {"A": 0, "C": 0, "G": 0, "T": 0}, + }, + "overall": { + "comp": { + "A": record.seq.count("A") + record.seq.count("a"), + "C": record.seq.count("C") + record.seq.count("c"), + "G": record.seq.count("G") + record.seq.count("g"), + "T": record.seq.count("T") + record.seq.count("t"), + }, + "gc": 0, + }, + } + gene_features = list(coding_genes(record.features)) + data["genes"]["count"] = len(gene_features) + + for feat in gene_features: + data["genes"]["comp"]["A"] += feat.extract(record).seq.count("A") + feat.extract(record).seq.count("a") + data["genes"]["comp"]["C"] += feat.extract(record).seq.count("C") + feat.extract(record).seq.count("c") + data["genes"]["comp"]["T"] += feat.extract(record).seq.count("T") + feat.extract(record).seq.count("t") + data["genes"]["comp"]["G"] += feat.extract(record).seq.count("G") + feat.extract(record).seq.count("g") + #data["genes"]["bases"] += len(feat) + data["genes"]["avg_len"].append(len(feat)) + + data["genes"]["avg_len"] = float(sum(data["genes"]["avg_len"])) / len(gene_features) + data["overall"]["gc"] = float( + data["overall"]["comp"]["G"] + data["overall"]["comp"]["C"] + ) / len(record.seq) + return data + + +def find_morons(record): + """Locate morons in the genome + + Don't even know why... + + TODO: remove? Idk. + """ + results = [] + good = 0 + bad = 0 + + gene_features = list(coding_genes(record.features)) + for i, gene in enumerate(gene_features): + two_left = gene_features[i - 2 : i] + two_right = gene_features[i + 1 : i + 1 + 2] + strands = [x.strand for x in two_left] + [x.strand for x in two_right] + anticon = [x for x in strands if x != gene.strand] + + if len(anticon) == 4: + has_rbs = [x.type == "Shine_Dalgarno_sequence" for x in gene.sub_features] + if any(has_rbs): + rbs = [ + x for x in gene.sub_features if x.type == "Shine_Dalgarno_sequence" + ][0] + rbs_msg = str(rbs.extract(record.seq)) + else: + rbs_msg = "No RBS Available" + results.append((gene, two_left, two_right, rbs_msg)) + bad += 1 + else: + good += 1 + return good, bad, results, [] + + +def bad_gene_model(record): + """Find features without product + """ + results = [] + good = 0 + bad = 0 + qc_features = [] + + for gene in coding_genes(record.features): + exons = [ + x for x in genes(gene.sub_features, feature_type="exon") if len(x) > 10 + ] + CDSs = [x for x in genes(gene.sub_features, feature_type="CDS")] + if len(exons) >= 1 and len(CDSs) >= 1: + if len(exons) != len(CDSs): + results.append( + ( + get_gff3_id(gene), + None, + None, + "Mismatched number of exons and CDSs in gff3 representation", + ) + ) + qc_features.append( + gen_qc_feature( + gene.location.start, + gene.location.end, + "Mismatched number of exons and CDSs in gff3 representation", + strand=gene.strand, + id_src=gene, + type_src="gene" + ) + ) + bad += 1 + else: + for (exon, cds) in zip( + sorted(exons, key=lambda x: x.location.start), + sorted(CDSs, key=lambda x: x.location.start), + ): + if len(exon) != len(cds): + results.append( + ( + get_gff3_id(gene), + exon, + cds, + "CDS does not extend to full length of gene", + ) + ) + qc_features.append( + gen_qc_feature( + exon.location.start, + exon.location.end, + "CDS does not extend to full length of gene", + strand=exon.strand, + id_src=gene, + type_src="CDS" + ) + ) + bad += 1 + else: + good += 1 + else: + log.warn("Could not handle %s, %s", exons, CDSs) + results.append( + ( + get_gff3_id(gene), + None, + None, + "{0} exons, {1} CDSs".format(len(exons), len(CDSs)), + ) + ) + + return good, len(results) + bad, results, qc_features + + +def weird_starts(record): + """Find features without product + """ + good = 0 + bad = 0 + qc_features = [] + results = [] + + overall = {} + for gene in coding_genes(record.features): + seq = [x for x in genes(gene.sub_features, feature_type="CDS")] + if len(seq) == 0: + log.warn("No CDS for gene %s", get_gff3_id(gene)) + continue + else: + seq = seq[0] + + seq_str = str(seq.extract(record.seq)) + start_codon = seq_str[0:3] + if len(seq_str) < 3: + sys.stderr.write("Fatal Error: CDS of length less than 3 at " + str(seq.location) + '\n') + exit(2) +# if len(seq_str) % 3 != 0: +# if len(seq_str) < 3: +# stop_codon = seq_str[-(len(seq_str))] +# else: +# stop_codon = seq_str[-3] +# +# log.warn("CDS at %s length is not a multiple of three (Length = %d)", get_gff3_id(gene), len(seq_str)) +# seq.__error = "Bad CDS Length" +# results.append(seq) +# qc_features.append( +# gen_qc_feature( +# s, e, "Bad Length", strand=seq.strand, id_src=gene +# ) +# ) +# bad += 1 +# seq.__start = start_codon +# seq.__stop = stop_codon +# continue + + stop_codon = seq_str[-3] + seq.__start = start_codon + seq.__stop = stop_codon + if start_codon not in overall: + overall[start_codon] = 1 + else: + overall[start_codon] += 1 + + if start_codon not in ("ATG", "TTG", "GTG"): + log.warn("Weird start codon (%s) on %s", start_codon, get_gff3_id(gene)) + seq.__error = "Unusual start codon %s" % start_codon + + s = 0 + e = 0 + if seq.strand > 0: + s = seq.location.start + e = seq.location.start + 3 + else: + s = seq.location.end + e = seq.location.end - 3 + + results.append(seq) + results[-1].location = FeatureLocation(results[-1].location.start + 1, results[-1].location.end, results[-1].location.strand) + qc_features.append( + gen_qc_feature( + s, e, "Weird start codon", strand=seq.strand, id_src=gene, type_src="gene" + ) + ) + bad += 1 + else: + good += 1 + + return good, bad, results, qc_features, overall + + +def missing_genes(record): + """Find features without product + """ + results = [] + good = 0 + bad = 0 + qc_features = [] + + for gene in coding_genes(record.features): + if gene.qualifiers.get("cpt_source", [None])[0] == "CPT_GENE_MODEL_CORRECTION": + results.append(gene) + bad += 1 + else: + good += 1 + + return good, bad, results, qc_features + + +def gene_model_correction_issues(record): + """Find features that have issues from the gene model correction step. + These have qualifiers beginning with CPT_GMS + """ + results = [] + good = 0 + bad = 0 + qc_features = [] + + # For each gene + for gene in coding_genes(record.features): + # Get the list of child CDSs + cdss = [x for x in genes(gene.sub_features, feature_type="CDS")] + # And our matching qualifiers + gene_data = [(k, v) for (k, v) in gene.qualifiers.items() if k == "cpt_gmc"] + # If there are problems with ONLY the parent, let's complain + local_results = [] + local_qc_features = [] + for x in gene_data: + if "Missing Locus Tag" in x[1]: + # Missing locus tag is an either or thing, if it hits here + # there shouldn't be anything else wrong with it. + + # Obviously missing so we remove it + gene.qualifiers["locus_tag"] = [""] + # Translation from bp_genbank2gff3.py + cdss[0].qualifiers["locus_tag"] = cdss[0].qualifiers["Name"] + # Append our results + local_results.append((gene, cdss[0], "Gene is missing a locus_tag")) + local_qc_features.append( + gen_qc_feature( + gene.location.start, + gene.location.end, + "Gene is missing a locus_tag", + strand=gene.strand, + type_src="gene" + ) + ) + + # We need to alert on any child issues as well. + for cds in cdss: + cds_data = [ + (k, v[0]) for (k, v) in cds.qualifiers.items() if k == "cpt_gmc" + ] + if len(gene_data) == 0 and len(cds_data) == 0: + # Alles gut + pass + else: + for _, problem in cds_data: + if problem == "BOTH Missing Locus Tag": + gene.qualifiers["locus_tag"] = [""] + cds.qualifiers["locus_tag"] = [""] + local_results.append( + (gene, cds, "Both gene and CDS are missing locus tags") + ) + local_qc_features.append( + gen_qc_feature( + cds.location.start, + cds.location.end, + "CDS is missing a locus_tag", + strand=cds.strand, + type_src="CDS" + ) + ) + local_qc_features.append( + gen_qc_feature( + gene.location.start, + gene.location.end, + "Gene is missing a locus_tag", + strand=gene.strand, + type_src="gene" + ) + ) + elif problem == "Different locus tag from associated gene.": + gene.qualifiers["locus_tag"] = gene.qualifiers["Name"] + cds.qualifiers["locus_tag"] = cds.qualifiers["cpt_gmc_locus"] + local_results.append( + (gene, cds, "Gene and CDS have differing locus tags") + ) + local_qc_features.append( + gen_qc_feature( + gene.location.start, + gene.location.end, + "Gene and CDS have differing locus tags", + strand=gene.strand, + type_src="gene" + ) + ) + elif problem == "Missing Locus Tag": + # Copy this over + gene.qualifiers["locus_tag"] = gene.qualifiers["Name"] + # This one is missing + cds.qualifiers["locus_tag"] = [""] + local_results.append((gene, cds, "CDS is missing a locus_tag")) + local_qc_features.append( + gen_qc_feature( + cds.location.start, + cds.location.end, + "CDS is missing a locus_tag", + strand=cds.strand, + type_src="CDS" + ) + ) + else: + log.warn("Cannot handle %s", problem) + + if len(local_results) > 0: + bad += 1 + else: + good += 1 + + qc_features.extend(local_qc_features) + results.extend(local_results) + return good, bad, results, qc_features + + +def missing_tags(record): + """Find features without product + """ + results = [] + good = 0 + bad = 0 + qc_features = [] + + for gene in coding_genes(record.features): + cds = [x for x in genes(gene.sub_features, feature_type="CDS")] + if len(cds) == 0: + log.warn("Gene missing CDS subfeature %s", get_gff3_id(gene)) + continue + + cds = cds[0] + + if "product" not in cds.qualifiers: + log.info("Missing product tag on %s", get_gff3_id(gene)) + qc_features.append( + gen_qc_feature( + cds.location.start, + cds.location.end, + "Missing product tag", + strand=cds.strand, + type_src="CDS" + ) + ) + results.append(cds) + bad += 1 + else: + good += 1 + + return good, bad, results, qc_features + + +def evaluate_and_report( + annotations, + genome, + gff3=None, + tbl=None, + sd_min=5, + sd_max=15, + min_gene_length=30, + excessive_gap_dist=50, + excessive_gap_divergent_dist=200, + excessive_overlap_dist=25, + excessive_overlap_divergent_dist=50, + reportTemplateName="phage_annotation_validator.html", +): + """ + Generate our HTML evaluation of the genome + """ + # Get features from GFF file + seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta")) + # Get the first GFF3 record + # TODO: support multiple GFF3 files. + mostFeat = 0 + for rec in list(gffParse(annotations, base_dict=seq_dict)): + if len(rec.features) > mostFeat: + mostFeat = len(rec.features) + record = rec + + gff3_qc_record = SeqRecord(record.id, id=record.id) + gff3_qc_record.features = [] + gff3_qc_features = [] + + log.info("Locating missing RBSs") + # mb_any = "did they annotate ANY rbss? if so, take off from score." + mb_good, mb_bad, mb_results, mb_annotations, mb_any = missing_rbs( + record, lookahead_min=sd_min, lookahead_max=sd_max + ) + gff3_qc_features += mb_annotations + + log.info("Locating excessive gaps") + eg_good, eg_bad, eg_results, eg_annotations = excessive_gap( + record, + excess=excessive_gap_dist, + excess_divergent=excessive_gap_divergent_dist, + min_gene=min_gene_length, + slop=excessive_overlap_dist, + lookahead_min=sd_min, + lookahead_max=sd_max, + ) + gff3_qc_features += eg_annotations + + log.info("Locating excessive overlaps") + eo_good, eo_bad, eo_results, eo_annotations = excessive_overlap( + record, + excess=excessive_overlap_dist, + excess_divergent=excessive_overlap_divergent_dist, + ) + gff3_qc_features += eo_annotations + + log.info("Locating morons") + mo_good, mo_bad, mo_results, mo_annotations = find_morons(record) + gff3_qc_features += mo_annotations + + log.info("Locating missing tags") + mt_good, mt_bad, mt_results, mt_annotations = missing_tags(record) + gff3_qc_features += mt_annotations + + log.info("Locating missing gene features") + mg_good, mg_bad, mg_results, mg_annotations = missing_genes(record) + gff3_qc_features += mg_annotations + + log.info("Determining coding density") + cd, cd_real = coding_density(record) + + log.info("Locating weird starts") + ws_good, ws_bad, ws_results, ws_annotations, ws_overall = weird_starts(record) + gff3_qc_features += ws_annotations + + log.info("Locating bad gene models") + gm_good, gm_bad, gm_results, gm_annotations = bad_gene_model(record) + if gm_good + gm_bad == 0: + gm_bad = 1 + + log.info("Locating more bad gene models") + gmc_good, gmc_bad, gmc_results, gmc_annotations = gene_model_correction_issues( + record + ) + if gmc_good + gmc_bad == 0: + gmc_bad = 1 + + good_scores = [eg_good, eo_good, mt_good, ws_good, gm_good, gmc_good] + bad_scores = [eg_bad, eo_bad, mt_bad, ws_bad, gm_bad, gmc_bad] + + # Only if they tried to annotate RBSs do we consider them. + if mb_any: + good_scores.append(mb_good) + bad_scores.append(mb_bad) + subscores = [] + + for (g, b) in zip(good_scores, bad_scores): + if g + b == 0: + s = 0 + else: + s = int(100 * float(g) / (float(b) + float(g))) + subscores.append(s) + subscores.append(cd) + + score = int(float(sum(subscores)) / float(len(subscores))) + + # This is data that will go into our HTML template + kwargs = { + "upstream_min": sd_min, + "upstream_max": sd_max, + "record_name": record.id, + "record_nice_name": nice_name(record), + "params": { + "sd_min": sd_min, + "sd_max": sd_max, + "min_gene_length": min_gene_length, + "excessive_gap_dist": excessive_gap_dist, + "excessive_gap_divergent_dist": excessive_gap_divergent_dist, + "excessive_overlap_dist": excessive_overlap_dist, + "excessive_overlap_divergent_dist": excessive_overlap_divergent_dist, + }, + "score": score, + "encouragement": get_encouragement(score), + "genome_overview": genome_overview(record), + "rbss_annotated": mb_any, + "missing_rbs": mb_results, + "missing_rbs_good": mb_good, + "missing_rbs_bad": mb_bad, + "missing_rbs_score": 0 + if mb_good + mb_bad == 0 + else (100 * mb_good / (mb_good + mb_bad)), + "excessive_gap": eg_results, + "excessive_gap_good": eg_good, + "excessive_gap_bad": eg_bad, + "excessive_gap_score": 0 + if eo_good + eo_bad == 0 + else (100 * eo_good / (eo_good + eo_bad)), + "excessive_overlap": eo_results, + "excessive_overlap_good": eo_good, + "excessive_overlap_bad": eo_bad, + "excessive_overlap_score": 0 + if eo_good + eo_bad == 0 + else (100 * eo_good / (eo_good + eo_bad)), + "morons": mo_results, + "morons_good": mo_good, + "morons_bad": mo_bad, + "morons_score": 0 + if mo_good + mo_bad == 0 + else (100 * mo_good / (mo_good + mo_bad)), + "missing_tags": mt_results, + "missing_tags_good": mt_good, + "missing_tags_bad": mt_bad, + "missing_tags_score": 0 + if mt_good + mt_bad == 0 + else (100 * mt_good / (mt_good + mt_bad)), + "missing_genes": mg_results, + "missing_genes_good": mg_good, + "missing_genes_bad": mg_bad, + "missing_genes_score": 0 + if mg_good + mg_bad == 0 + else (100 * mg_good / (mg_good + mg_bad)), + "weird_starts": ws_results, + "weird_starts_good": ws_good, + "weird_starts_bad": ws_bad, + "weird_starts_overall": ws_overall, + "weird_starts_overall_sorted_keys": sorted( + ws_overall, reverse=True, key=lambda x: ws_overall[x] + ), + "weird_starts_score": 0 + if ws_good + ws_bad == 0 + else (100 * ws_good / (ws_good + ws_bad)), + "gene_model": gm_results, + "gene_model_good": gm_good, + "gene_model_bad": gm_bad, + "gene_model_score": 0 + if gm_good + gm_bad == 0 + else (100 * gm_good / (gm_good + gm_bad)), + "gene_model_correction": gmc_results, + "gene_model_correction_good": gmc_good, + "gene_model_correction_bad": gmc_bad, + "gene_model_correction_score": 0 + if gmc_good + gmc_bad == 0 + else (100 * gmc_good / (gmc_good + gmc_bad)), + "coding_density": cd, + "coding_density_exact": exact_coding_density(record), + "coding_density_real": cd_real, + "coding_density_score": cd, + } + + with open(tbl, "w") as handle: + kw_subset = {} + for key in kwargs: + if ( + key in ("score", "record_name") + or "_good" in key + or "_bad" in key + or "_overall" in key + ): + kw_subset[key] = kwargs[key] + json.dump(kw_subset, handle) + + with open(gff3, "w") as handle: + gff3_qc_record.features = gff3_qc_features + gff3_qc_record.annotations = {} + gffWrite([gff3_qc_record], handle) + + def nice_strand(direction): + # It is somehow possible for whole gffSeqFeature objects to end up in here, apparently at the gene level + if "SeqFeature" in str(type(direction)): + direction = direction.location.strand + if direction > 0: + return "→"#.decode("utf-8") + else: + return "←"#.decode("utf-8") + + def nice_strand_tex(direction): + if "SeqFeature" in str(type(direction)): + direction = direction.location.strand + if direction > 0: + return "$\\rightarrow$" + else: + return "$\\leftarrow$" + + def texify(data): + return data.replace("_", "\\_").replace("$", "\\$") + + def length(data): + return len(data) + + def my_encode(data): + return str(data)#.encode("utf-8") + + def my_decode(data): + # For production + return str(data)#.decode("utf-8") + # For local testing. No, I do not understand. + return str(data)#.encode("utf-8")).decode("utf-8") + + env = Environment( + loader=FileSystemLoader(SCRIPT_PATH), trim_blocks=True, lstrip_blocks=True + ) + env.filters.update( + { + "nice_id": get_gff3_id, + "nice_strand": nice_strand, + "nice_strand_tex": nice_strand_tex, + "texify": texify, + "length": length, + "encode": my_encode, + "decode": my_decode, + } + ) + tpl = env.get_template(reportTemplateName) + return tpl.render(**kwargs)#.encode("utf-8") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="rebase gff3 features against parent locations", epilog="" + ) + parser.add_argument( + "annotations", type=argparse.FileType("r"), help="Parent GFF3 annotations" + ) + parser.add_argument("genome", type=argparse.FileType("r"), help="Genome Sequence") + parser.add_argument( + "--gff3", type=str, help="GFF3 Annotations", default="qc_annotations.gff3" + ) + parser.add_argument( + "--tbl", + type=str, + help="Table for noninteractive parsing", + default="qc_results.json", + ) + + parser.add_argument( + "--sd_min", + type=int, + help="Minimum distance from gene start for an SD to be", + default=5, + ) + parser.add_argument( + "--sd_max", + type=int, + help="Maximum distance from gene start for an SD to be", + default=15, + ) + + parser.add_argument( + "--min_gene_length", + type=int, + help="Minimum length for a putative gene call (AAs)", + default=30, + ) + + parser.add_argument( + "--excessive_overlap_dist", + type=int, + help="Excessive overlap for genes in same direction", + default=25, + ) + parser.add_argument( + "--excessive_overlap_divergent_dist", + type=int, + help="Excessive overlap for genes in diff directions", + default=50, + ) + + parser.add_argument( + "--excessive_gap_dist", + type=int, + help="Maximum distance between two genes", + default=40, + ) + parser.add_argument( + "--excessive_gap_divergent_dist", + type=int, + help="Maximum distance between two divergent genes", + default=200, + ) + + parser.add_argument( + "--reportTemplateName", + help="Report template file name", + default="phageqc_report_full.html", + ) + + args = parser.parse_args() + + sys.stdout.write(evaluate_and_report(**vars(args)))