Mercurial > repos > cpt > cpt_phageqc_annotations
diff cpt_phageqc_annotation/shinefind.py @ 0:c3140b08d703 draft default tip
Uploaded
author | cpt |
---|---|
date | Fri, 17 Jun 2022 13:00:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cpt_phageqc_annotation/shinefind.py Fri Jun 17 13:00:50 2022 +0000 @@ -0,0 +1,420 @@ +#!/usr/bin/env python +import re +import sys +import argparse +import logging +from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature +from Bio import SeqIO +from Bio.SeqRecord import SeqRecord +from Bio.SeqFeature import FeatureLocation +from gff3 import ( + feature_lambda, + feature_test_type, + feature_test_true, + feature_test_quals, + get_id, + ensure_location_in_bounds, +) + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger() + + +class NaiveSDCaller(object): + + # TODO May make switch for different sequence sets + SD_SEQUENCES = ( + "AGGAGGT", + "GGAGGT", + "AGGAGG", + "GGGGGG", + "AGGAG", + "GAGGT", + "GGAGG", + "GGGGG", + "AGGT", + "GGGT", + "GAGG", + "GGGG", + "AGGA", + "GGAG", + "GGA", + "GAG", + "AGG", + "GGT", + "GGG", + ) + + def __init__(self): + self.sd_reg = [re.compile(x, re.IGNORECASE) for x in self.SD_SEQUENCES] + + def list_sds(self, sequence, sd_min=3, sd_max=17): + hits = [] + for regex in self.sd_reg: + for match in regex.finditer(sequence): + spacing = len(sequence) - len(match.group()) - match.start() + if sd_max >= spacing+sd_min and spacing+sd_min >= sd_min: + #if the spacing is within gap limits, add + #(search space is [sd_max+7 .. sd_min] so actual gap is spacing+sd_min) + #print('min %d max %d - adding SD with gap %d' % (sd_min, sd_max, spacing+sd_min)) + hits.append( + { + "spacing": spacing, + "hit": match.group(), + "start": match.start(), + "end": match.end(), + "len": len(match.group()), + } + ) + hits = sorted(hits, key= lambda x: (-x['len'],x['spacing'])) + return hits + + @classmethod + def highlight_sd(cls, sequence, start, end): + return " ".join( + [ + sequence[0:start].lower(), + sequence[start:end].upper(), + sequence[end:].lower(), + ] + ) + + @classmethod + def to_features(cls, hits, strand, parent_start, parent_end, feature_id=None, sd_min=3, sd_max=17): + results = [] + for idx, hit in enumerate(hits): + # gene complement(124..486) + # -1 491 501 0 5 5 + # -1 491 501 0 4 5 + # -1 491 501 1 4 5 + # -1 491 501 2 3 5 + # -1 491 501 1 3 5 + # -1 491 501 0 3 5 + + qualifiers = { + "source": "CPT_ShineFind", + "ID": "%s.rbs-%s" % (feature_id, idx), + } + + if strand > 0: + start = parent_end - hit["spacing"] - hit["len"] + end = parent_end - hit["spacing"] + else: + start = parent_start + hit["spacing"] + end = parent_start + hit["spacing"] + hit["len"] + # check that the END of the SD sequence is within the given min/max of parent start/end + + # gap is either the sd_start-cds_end (neg strand) or the sd_end-cds_start (pos strand) + # minimum absolute value of these two will be the proper gap regardless of strand + tmp = gffSeqFeature( + FeatureLocation(min(start, end), max(start, end), strand=strand), + #FeatureLocation(min(start, end), max(start, end), strand=strand), + type="Shine_Dalgarno_sequence", + qualifiers=qualifiers, + ) + results.append(tmp) + return results + + def testFeatureUpstream(self, feature, record, sd_min=3, sd_max=17): + # Strand information necessary to getting correct upstream sequence + strand = feature.location.strand + + # n_bases_upstream (plus/minus 7 upstream to make the min/max define the possible gap position) + if strand > 0: + start = feature.location.start - sd_max - 7 + end = feature.location.start - sd_min + else: + start = feature.location.end + sd_min + end = feature.location.end + sd_max + 7 + + (start, end) = ensure_location_in_bounds( + start=start, end=end, parent_length=len(record) + ) + + # Create our temp feature used to obtain correct portion of + # genome + tmp = gffSeqFeature(FeatureLocation(min(start, end), max(start, end), strand=strand), type="domain") + seq = str(tmp.extract(record.seq)) + return self.list_sds(seq, sd_min, sd_max), start, end, seq + + def hasSd(self, feature, record, sd_min=3, sd_max=17): + sds, start, end, seq = self.testFeatureUpstream( + feature, record, sd_min=sd_min, sd_max=sd_max + ) + return len(sds) > 0 + + +# Cycle through subfeatures, set feature's location to be equal +# to the smallest start and largest end. +# Remove pending bugfix for feature display in Apollo +def fminmax(feature): + fmin = None + fmax = None + for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True): + if fmin is None: + fmin = sf.location.start + fmax = sf.location.end + if sf.location.start < fmin: + fmin = sf.location.start + if sf.location.end > fmax: + fmax = sf.location.end + return fmin, fmax + + +def fix_gene_boundaries(feature): + # There is a bug in Apollo whereby we have created gene + # features which are larger than expected, but we cannot see this. + # We only see a perfect sized gene + SD together. + # + # So, we clamp the location of the gene feature to the + # contained mRNAs. Will remove pending Apollo upgrade. + fmin, fmax = fminmax(feature) + if feature.location.strand > 0: + feature.location = FeatureLocation(fmin, fmax, strand=1) + else: + feature.location = FeatureLocation(fmin, fmax, strand=-1) + return feature + +def shinefind( + fasta, + gff3, + gff3_output=None, + table_output=None, + lookahead_min=3, + lookahead_max=17, + top_only=False, + add=False, +): + table_output.write( + "\t".join( + [ + "ID", + "Name", + "Terminus", + "Terminus", + "Strand", + "Upstream Sequence", + "SD", + "Spacing", + ] + ) + + "\n" + ) + + sd_finder = NaiveSDCaller() + # Load up sequence(s) for GFF3 data + seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) + # Parse GFF3 records + for record in gffParse(gff3, base_dict=seq_dict): + # Shinefind's gff3_output. + gff3_output_record = SeqRecord(record.seq, record.id) + # Filter out just coding sequences + ignored_features = [] + for x in record.features: + # If feature X does NOT contain a CDS, add to ignored_features + # list. This means if we have a top level gene feature with or + # without a CDS subfeature, we're catch it appropriately here. + if ( + len( + list( + feature_lambda( + [x], feature_test_type, {"type": "CDS"}, subfeatures=True + ) + ) + ) + == 0 + ): + ignored_features.append(x) + + # Loop over all gene features + for gene in feature_lambda( + record.features, feature_test_type, {"type": "gene"}, subfeatures=True + ): + + # Get the CDS from this gene. + feature = sorted( + list( + feature_lambda( + gene.sub_features, + feature_test_type, + {"type": "CDS"}, + subfeatures=True, + ) + ), + key=lambda x: x.location.start, + ) + # If no CDSs are in this gene feature, then quit + if len(feature) == 0: + # We've already caught these above in our ignored_features + # list, so we skip out on the rest of this for loop + continue + else: + # Otherwise pull the first on the strand. + feature = feature[0] + + # Three different ways RBSs can be stored that we expect. + rbs_rbs = list( + feature_lambda( + gene.sub_features, + feature_test_type, + {"type": "RBS"}, + subfeatures=False, + ) + ) + rbs_sds = list( + feature_lambda( + gene.sub_features, + feature_test_type, + {"type": "Shine_Dalgarno_sequence"}, + subfeatures=False, + ) + ) + regulatory_elements = list( + feature_lambda( + gene.sub_features, + feature_test_type, + {"type": "regulatory"}, + subfeatures=False, + ) + ) + rbs_regulatory = list( + feature_lambda( + regulatory_elements, + feature_test_quals, + {"regulatory_class": ["ribosome_binding_site"]}, + subfeatures=False, + ) + ) + rbss = rbs_rbs + rbs_sds + rbs_regulatory + + # If someone has already annotated an RBS, we move to the next gene + if len(rbss) > 0: + log.debug("Has %s RBSs", len(rbss)) + ignored_features.append(gene) + continue + + sds, start, end, seq = sd_finder.testFeatureUpstream( + feature, record, sd_min=lookahead_min, sd_max=lookahead_max + ) + + feature_id = get_id(feature) + sd_features = sd_finder.to_features( + sds, feature.location.strand, start, end, feature_id=feature.id + ) + + human_strand = "+" if feature.location.strand == 1 else "-" + + # http://book.pythontips.com/en/latest/for_-_else.html + log.debug("Found %s SDs", len(sds)) + for (sd, sd_feature) in zip(sds, sd_features): + # If we only want the top feature, after the bulk of the + # forloop executes once, we append the top feature, and fake a + # break, because an actual break triggers the else: block + table_output.write( + "\t".join( + map( + str, + [ + feature.id, + feature_id, + feature.location.start, + feature.location.end, + human_strand, + sd_finder.highlight_sd(seq, sd["start"], sd["end"]), + sd["hit"], + int(sd["spacing"]) + lookahead_min, + ], + ) + ) + + "\n" + ) + + if add: + # Append the top RBS to the gene feature + gene.sub_features.append(sd_feature) + # Pick out start/end locations for all sub_features + locations = [x.location.start for x in gene.sub_features] + [ + x.location.end for x in gene.sub_features + ] + # Update gene's start/end to be inclusive + gene.location._start = min(locations) + gene.location._end = max(locations) + # Also register the feature with the separate GFF3 output + sd_feature = fix_gene_boundaries(sd_feature) + gff3_output_record.features.append(sd_feature) + + if top_only or sd == (sds[-1]): + break + else: + table_output.write( + "\t".join( + map( + str, + [ + feature.id, + feature_id, + feature.location.start, + feature.location.end, + human_strand, + seq, + None, + -1, + ], + ) + ) + + "\n" + ) + + record.annotations = {} + gffWrite([record], sys.stdout) + + gff3_output_record.features = sorted( + gff3_output_record.features, key=lambda x: x.location.start + ) + gff3_output_record.annotations = {} + gffWrite([gff3_output_record], gff3_output) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences") + parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") + parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations") + + parser.add_argument( + "--gff3_output", + type=argparse.FileType("w"), + help="GFF3 Output", + default="shinefind.gff3", + ) + parser.add_argument( + "--table_output", + type=argparse.FileType("w"), + help="Tabular Output", + default="shinefind.tbl", + ) + + parser.add_argument( + "--lookahead_min", + nargs="?", + type=int, + help="Number of bases upstream of CDSs to end search", + default=3, + ) + parser.add_argument( + "--lookahead_max", + nargs="?", + type=int, + help="Number of bases upstream of CDSs to begin search", + default=17, + ) + + parser.add_argument("--top_only", action="store_true", help="Only report best hits") + parser.add_argument( + "--add", + action="store_true", + help='Function in "addition" mode whereby the ' + + "RBSs are added directly to the gene model.", + ) + + args = parser.parse_args() + shinefind(**vars(args))