Mercurial > repos > cpt > cpt_require_shine
diff cpt_req_sd/gff3_require_sd.py @ 0:ae2791f3108f draft
Uploaded
author | cpt |
---|---|
date | Fri, 13 May 2022 05:35:34 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cpt_req_sd/gff3_require_sd.py Fri May 13 05:35:34 2022 +0000 @@ -0,0 +1,71 @@ +#!/usr/bin/env python +import sys +import logging +import argparse +from Bio import SeqIO +from Bio.SeqFeature import FeatureLocation +from CPT_GFFParser import gffParse, gffWrite +from gff3 import feature_lambda, feature_test_type +from shinefind import NaiveSDCaller + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + + +def require_shinefind(gff3, fasta): + sd_finder = NaiveSDCaller() + # Load up sequence(s) for GFF3 data + seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) + # Parse GFF3 records + for record in gffParse(gff3, base_dict=seq_dict): + # Reopen + genes = list( + feature_lambda( + record.features, feature_test_type, {"type": "gene"}, subfeatures=True + ) + ) + good_genes = [] + for gene in genes: + cdss = sorted( + list( + feature_lambda( + gene.sub_features, + feature_test_type, + {"type": "CDS"}, + subfeatures=False, + ) + ), + key=lambda x: x.location.start, + ) + if len(cdss) == 0: + continue + + cds = cdss[0] + + sds, start, end, seq = sd_finder.testFeatureUpstream( + cds, record, sd_min=5, sd_max=15 + ) + if len(sds) >= 1: + sd_features = sd_finder.to_features( + sds, gene.location.strand, start, end, feature_id=gene.id + ) + gene.sub_features.append(sd_features[0]) + if gene.location.start > sd_features[0].location.start: + gene.location = FeatureLocation(int(sd_features[0].location.start), int(gene.location.end), gene.location.strand) + if gene.location.end < sd_features[0].location.end: + gene.location = FeatureLocation(int(gene.location.start), int(sd_features[0].location.end), gene.location.strand) + good_genes.append(gene) + + record.features = good_genes + yield record + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences") + parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") + parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations") + args = parser.parse_args() + + for rec in require_shinefind(**vars(args)): + rec.annotations = {} + gffWrite([rec], sys.stdout)