0
|
1 #!/usr/bin/env python
|
|
2 import sys
|
|
3 import logging
|
|
4 import argparse
|
|
5 from Bio import SeqIO
|
|
6 from Bio.SeqFeature import FeatureLocation
|
|
7 from CPT_GFFParser import gffParse, gffWrite
|
|
8 from gff3 import feature_lambda, feature_test_type
|
|
9 from shinefind import NaiveSDCaller
|
|
10
|
|
11 logging.basicConfig(level=logging.INFO)
|
|
12 log = logging.getLogger(__name__)
|
|
13
|
|
14
|
|
15 def require_shinefind(gff3, fasta):
|
|
16 sd_finder = NaiveSDCaller()
|
|
17 # Load up sequence(s) for GFF3 data
|
|
18 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
|
|
19 # Parse GFF3 records
|
|
20 for record in gffParse(gff3, base_dict=seq_dict):
|
|
21 # Reopen
|
|
22 genes = list(
|
|
23 feature_lambda(
|
|
24 record.features, feature_test_type, {"type": "gene"}, subfeatures=True
|
|
25 )
|
|
26 )
|
|
27 good_genes = []
|
|
28 for gene in genes:
|
|
29 cdss = sorted(
|
|
30 list(
|
|
31 feature_lambda(
|
|
32 gene.sub_features,
|
|
33 feature_test_type,
|
|
34 {"type": "CDS"},
|
|
35 subfeatures=False,
|
|
36 )
|
|
37 ),
|
|
38 key=lambda x: x.location.start,
|
|
39 )
|
|
40 if len(cdss) == 0:
|
|
41 continue
|
|
42
|
|
43 cds = cdss[0]
|
|
44
|
|
45 sds, start, end, seq = sd_finder.testFeatureUpstream(
|
|
46 cds, record, sd_min=5, sd_max=15
|
|
47 )
|
|
48 if len(sds) >= 1:
|
|
49 sd_features = sd_finder.to_features(
|
|
50 sds, gene.location.strand, start, end, feature_id=gene.id
|
|
51 )
|
|
52 gene.sub_features.append(sd_features[0])
|
|
53 if gene.location.start > sd_features[0].location.start:
|
|
54 gene.location = FeatureLocation(int(sd_features[0].location.start), int(gene.location.end), gene.location.strand)
|
|
55 if gene.location.end < sd_features[0].location.end:
|
|
56 gene.location = FeatureLocation(int(gene.location.start), int(sd_features[0].location.end), gene.location.strand)
|
|
57 good_genes.append(gene)
|
|
58
|
|
59 record.features = good_genes
|
|
60 yield record
|
|
61
|
|
62
|
|
63 if __name__ == "__main__":
|
|
64 parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences")
|
|
65 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
|
|
66 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations")
|
|
67 args = parser.parse_args()
|
|
68
|
|
69 for rec in require_shinefind(**vars(args)):
|
|
70 rec.annotations = {}
|
|
71 gffWrite([rec], sys.stdout)
|