Mercurial > repos > cpt > cpt_shinefind
view shinefind.py @ 4:5004ddb62700 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:53:31 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python import re import sys import argparse import logging from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature from Bio import SeqIO from Bio.SeqRecord import SeqRecord from Bio.SeqFeature import FeatureLocation from gff3 import ( feature_lambda, feature_test_type, feature_test_true, feature_test_quals, get_id, ensure_location_in_bounds, ) logging.basicConfig(level=logging.INFO) log = logging.getLogger() class NaiveSDCaller(object): # TODO May make switch for different sequence sets SD_SEQUENCES = ( "AGGAGGT", "GGAGGT", "AGGAGG", "GGGGGG", "AGGAG", "GAGGT", "GGAGG", "GGGGG", "AGGT", "GGGT", "GAGG", "GGGG", "AGGA", "GGAG", "GGA", "GAG", "AGG", "GGT", "GGG", ) def __init__(self): self.sd_reg = [re.compile(x, re.IGNORECASE) for x in self.SD_SEQUENCES] def list_sds(self, sequence, sd_min=3, sd_max=17): hits = [] for regex in self.sd_reg: for match in regex.finditer(sequence): spacing = len(sequence) - len(match.group()) - match.start() if sd_max >= spacing + sd_min and spacing + sd_min >= sd_min: # if the spacing is within gap limits, add # (search space is [sd_max+7 .. sd_min] so actual gap is spacing+sd_min) # print('min %d max %d - adding SD with gap %d' % (sd_min, sd_max, spacing+sd_min)) hits.append( { "spacing": spacing, "hit": match.group(), "start": match.start(), "end": match.end(), "len": len(match.group()), } ) hits = sorted(hits, key=lambda x: (-x["len"], x["spacing"])) return hits @classmethod def highlight_sd(cls, sequence, start, end): return " ".join( [ sequence[0:start].lower(), sequence[start:end].upper(), sequence[end:].lower(), ] ) @classmethod def to_features( cls, hits, strand, parent_start, parent_end, feature_id=None, sd_min=3, sd_max=17, ): results = [] for idx, hit in enumerate(hits): # gene complement(124..486) # -1 491 501 0 5 5 # -1 491 501 0 4 5 # -1 491 501 1 4 5 # -1 491 501 2 3 5 # -1 491 501 1 3 5 # -1 491 501 0 3 5 qualifiers = { "source": "CPT_ShineFind", "ID": "%s.rbs-%s" % (feature_id, idx), } if strand > 0: start = parent_end - hit["spacing"] - hit["len"] end = parent_end - hit["spacing"] else: start = parent_start + hit["spacing"] end = parent_start + hit["spacing"] + hit["len"] # check that the END of the SD sequence is within the given min/max of parent start/end # gap is either the sd_start-cds_end (neg strand) or the sd_end-cds_start (pos strand) # minimum absolute value of these two will be the proper gap regardless of strand tmp = gffSeqFeature( FeatureLocation(min(start, end), max(start, end), strand=strand), # FeatureLocation(min(start, end), max(start, end), strand=strand), type="Shine_Dalgarno_sequence", qualifiers=qualifiers, ) results.append(tmp) return results def testFeatureUpstream(self, feature, record, sd_min=3, sd_max=17): # Strand information necessary to getting correct upstream sequence strand = feature.location.strand # n_bases_upstream (plus/minus 7 upstream to make the min/max define the possible gap position) if strand > 0: start = feature.location.start - sd_max - 7 end = feature.location.start - sd_min else: start = feature.location.end + sd_min end = feature.location.end + sd_max + 7 (start, end) = ensure_location_in_bounds( start=start, end=end, parent_length=len(record) ) # Create our temp feature used to obtain correct portion of # genome tmp = gffSeqFeature( FeatureLocation(min(start, end), max(start, end), strand=strand), type="domain", ) seq = str(tmp.extract(record.seq)) return self.list_sds(seq, sd_min, sd_max), start, end, seq def hasSd(self, feature, record, sd_min=3, sd_max=17): sds, start, end, seq = self.testFeatureUpstream( feature, record, sd_min=sd_min, sd_max=sd_max ) return len(sds) > 0 # Cycle through subfeatures, set feature's location to be equal # to the smallest start and largest end. # Remove pending bugfix for feature display in Apollo def fminmax(feature): fmin = None fmax = None for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True): if fmin is None: fmin = sf.location.start fmax = sf.location.end if sf.location.start < fmin: fmin = sf.location.start if sf.location.end > fmax: fmax = sf.location.end return fmin, fmax def fix_gene_boundaries(feature): # There is a bug in Apollo whereby we have created gene # features which are larger than expected, but we cannot see this. # We only see a perfect sized gene + SD together. # # So, we clamp the location of the gene feature to the # contained mRNAs. Will remove pending Apollo upgrade. fmin, fmax = fminmax(feature) if feature.location.strand > 0: feature.location = FeatureLocation(fmin, fmax, strand=1) else: feature.location = FeatureLocation(fmin, fmax, strand=-1) return feature def shinefind( fasta, gff3, gff3_output=None, table_output=None, lookahead_min=3, lookahead_max=17, top_only=False, add=False, ): table_output.write( "\t".join( [ "ID", "Name", "Terminus", "Terminus", "Strand", "Upstream Sequence", "SD", "Spacing", ] ) + "\n" ) sd_finder = NaiveSDCaller() # Load up sequence(s) for GFF3 data seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) # Parse GFF3 records for record in gffParse(gff3, base_dict=seq_dict): # Shinefind's gff3_output. gff3_output_record = SeqRecord(record.seq, record.id) # Filter out just coding sequences ignored_features = [] for x in record.features: # If feature X does NOT contain a CDS, add to ignored_features # list. This means if we have a top level gene feature with or # without a CDS subfeature, we're catch it appropriately here. if ( len( list( feature_lambda( [x], feature_test_type, {"type": "CDS"}, subfeatures=True ) ) ) == 0 ): ignored_features.append(x) # Loop over all gene features for gene in feature_lambda( record.features, feature_test_type, {"type": "gene"}, subfeatures=True ): # Get the CDS from this gene. feature = sorted( list( feature_lambda( gene.sub_features, feature_test_type, {"type": "CDS"}, subfeatures=True, ) ), key=lambda x: x.location.start, ) # If no CDSs are in this gene feature, then quit if len(feature) == 0: # We've already caught these above in our ignored_features # list, so we skip out on the rest of this for loop continue else: # Otherwise pull the first on the strand. feature = feature[0] # Three different ways RBSs can be stored that we expect. rbs_rbs = list( feature_lambda( gene.sub_features, feature_test_type, {"type": "RBS"}, subfeatures=False, ) ) rbs_sds = list( feature_lambda( gene.sub_features, feature_test_type, {"type": "Shine_Dalgarno_sequence"}, subfeatures=False, ) ) regulatory_elements = list( feature_lambda( gene.sub_features, feature_test_type, {"type": "regulatory"}, subfeatures=False, ) ) rbs_regulatory = list( feature_lambda( regulatory_elements, feature_test_quals, {"regulatory_class": ["ribosome_binding_site"]}, subfeatures=False, ) ) rbss = rbs_rbs + rbs_sds + rbs_regulatory # If someone has already annotated an RBS, we move to the next gene if len(rbss) > 0: log.debug("Has %s RBSs", len(rbss)) ignored_features.append(gene) continue sds, start, end, seq = sd_finder.testFeatureUpstream( feature, record, sd_min=lookahead_min, sd_max=lookahead_max ) feature_id = get_id(feature) sd_features = sd_finder.to_features( sds, feature.location.strand, start, end, feature_id=feature.id ) human_strand = "+" if feature.location.strand == 1 else "-" # http://book.pythontips.com/en/latest/for_-_else.html log.debug("Found %s SDs", len(sds)) for (sd, sd_feature) in zip(sds, sd_features): # If we only want the top feature, after the bulk of the # forloop executes once, we append the top feature, and fake a # break, because an actual break triggers the else: block table_output.write( "\t".join( map( str, [ feature.id, feature_id, feature.location.start, feature.location.end, human_strand, sd_finder.highlight_sd(seq, sd["start"], sd["end"]), sd["hit"], int(sd["spacing"]) + lookahead_min, ], ) ) + "\n" ) if add: # Append the top RBS to the gene feature gene.sub_features.append(sd_feature) # Pick out start/end locations for all sub_features locations = [x.location.start for x in gene.sub_features] + [ x.location.end for x in gene.sub_features ] # Update gene's start/end to be inclusive gene.location._start = min(locations) gene.location._end = max(locations) # Also register the feature with the separate GFF3 output sd_feature = fix_gene_boundaries(sd_feature) gff3_output_record.features.append(sd_feature) if top_only or sd == (sds[-1]): break else: table_output.write( "\t".join( map( str, [ feature.id, feature_id, feature.location.start, feature.location.end, human_strand, seq, None, -1, ], ) ) + "\n" ) record.annotations = {} gffWrite([record], sys.stdout) gff3_output_record.features = sorted( gff3_output_record.features, key=lambda x: x.location.start ) gff3_output_record.annotations = {} gffWrite([gff3_output_record], gff3_output) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences") parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations") parser.add_argument( "--gff3_output", type=argparse.FileType("w"), help="GFF3 Output", default="shinefind.gff3", ) parser.add_argument( "--table_output", type=argparse.FileType("w"), help="Tabular Output", default="shinefind.tbl", ) parser.add_argument( "--lookahead_min", nargs="?", type=int, help="Number of bases upstream of CDSs to end search", default=3, ) parser.add_argument( "--lookahead_max", nargs="?", type=int, help="Number of bases upstream of CDSs to begin search", default=17, ) parser.add_argument("--top_only", action="store_true", help="Only report best hits") parser.add_argument( "--add", action="store_true", help='Function in "addition" mode whereby the ' + "RBSs are added directly to the gene model.", ) args = parser.parse_args() shinefind(**vars(args))