Mercurial > repos > cpt > cpt_require_shine
comparison cpt_req_sd/gff3_require_sd.py @ 0:ae2791f3108f draft
Uploaded
author | cpt |
---|---|
date | Fri, 13 May 2022 05:35:34 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:ae2791f3108f |
---|---|
1 #!/usr/bin/env python | |
2 import sys | |
3 import logging | |
4 import argparse | |
5 from Bio import SeqIO | |
6 from Bio.SeqFeature import FeatureLocation | |
7 from CPT_GFFParser import gffParse, gffWrite | |
8 from gff3 import feature_lambda, feature_test_type | |
9 from shinefind import NaiveSDCaller | |
10 | |
11 logging.basicConfig(level=logging.INFO) | |
12 log = logging.getLogger(__name__) | |
13 | |
14 | |
15 def require_shinefind(gff3, fasta): | |
16 sd_finder = NaiveSDCaller() | |
17 # Load up sequence(s) for GFF3 data | |
18 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
19 # Parse GFF3 records | |
20 for record in gffParse(gff3, base_dict=seq_dict): | |
21 # Reopen | |
22 genes = list( | |
23 feature_lambda( | |
24 record.features, feature_test_type, {"type": "gene"}, subfeatures=True | |
25 ) | |
26 ) | |
27 good_genes = [] | |
28 for gene in genes: | |
29 cdss = sorted( | |
30 list( | |
31 feature_lambda( | |
32 gene.sub_features, | |
33 feature_test_type, | |
34 {"type": "CDS"}, | |
35 subfeatures=False, | |
36 ) | |
37 ), | |
38 key=lambda x: x.location.start, | |
39 ) | |
40 if len(cdss) == 0: | |
41 continue | |
42 | |
43 cds = cdss[0] | |
44 | |
45 sds, start, end, seq = sd_finder.testFeatureUpstream( | |
46 cds, record, sd_min=5, sd_max=15 | |
47 ) | |
48 if len(sds) >= 1: | |
49 sd_features = sd_finder.to_features( | |
50 sds, gene.location.strand, start, end, feature_id=gene.id | |
51 ) | |
52 gene.sub_features.append(sd_features[0]) | |
53 if gene.location.start > sd_features[0].location.start: | |
54 gene.location = FeatureLocation(int(sd_features[0].location.start), int(gene.location.end), gene.location.strand) | |
55 if gene.location.end < sd_features[0].location.end: | |
56 gene.location = FeatureLocation(int(gene.location.start), int(sd_features[0].location.end), gene.location.strand) | |
57 good_genes.append(gene) | |
58 | |
59 record.features = good_genes | |
60 yield record | |
61 | |
62 | |
63 if __name__ == "__main__": | |
64 parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences") | |
65 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") | |
66 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations") | |
67 args = parser.parse_args() | |
68 | |
69 for rec in require_shinefind(**vars(args)): | |
70 rec.annotations = {} | |
71 gffWrite([rec], sys.stdout) |