0
|
1 #!/usr/bin/env python
|
|
2 import sys
|
|
3 import logging
|
|
4 import argparse
|
|
5 import copy
|
|
6 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature
|
|
7 from gff3 import feature_lambda, feature_test_type
|
|
8 from Bio.SeqFeature import FeatureLocation
|
|
9
|
|
10 logging.basicConfig(level=logging.INFO)
|
|
11 log = logging.getLogger(__name__)
|
|
12
|
|
13 ALLOWED_FEATURES = [
|
|
14 "mRNA",
|
|
15 "exon",
|
|
16 "transposable_element",
|
|
17 "tRNA",
|
|
18 "transcript",
|
|
19 "terminator",
|
|
20 "Shine_Dalgarno_Sequence",
|
|
21 "pseudogene",
|
|
22 "stop_codon_read_through",
|
|
23 "repeat_region",
|
|
24 "CDS",
|
|
25 "gene",
|
|
26 "rRNA",
|
|
27 "ncRNA",
|
|
28 "snRNA",
|
|
29 "snoRNA",
|
|
30 "miRNA",
|
|
31 ]
|
|
32
|
|
33 SPECIAL_REMOVED_FEATURES = ["gene_component_region", "sequence_difference"]
|
|
34
|
|
35
|
|
36
|
|
37 def add_exons(features):
|
|
38 for gene in feature_lambda(
|
|
39 features, feature_test_type, {"type": "gene"}, subfeatures=True
|
|
40 ):
|
|
41 clean_gene = copy.deepcopy(gene)
|
|
42 exon_start = None
|
|
43 exon_end = None
|
|
44 exon_strand = None
|
|
45 cds_list = []
|
|
46
|
|
47 #for mRNA in gene.sub_features:
|
|
48 # for x in mRNA.sub_features:
|
|
49 # x.qualifiers["Parent"] = [gene.id]
|
|
50 # gene.sub_features.append(x)
|
|
51
|
|
52 for exon in feature_lambda(gene.sub_features, feature_test_type, {"type": "exon"}, subfeatures=False,recurse=False):
|
|
53 #if the gene contains an exon, skip.
|
|
54 continue
|
|
55 hasMRNA = False
|
|
56 for x in gene.sub_features:
|
|
57 if x.type == "mRNA":
|
|
58 hasMRNA = True
|
|
59 mRNA = x
|
|
60 """
|
|
61 if not hasMRNA:
|
|
62 mRNA = gffSeqFeature(
|
|
63 location=FeatureLocation(gene.location.start, gene.location.end, gene.location.strand),
|
|
64 type="mRNA",
|
|
65 source = "cpt.prepApollo",
|
|
66 qualifiers={
|
|
67 "ID": ["%s.mRNA" % clean_gene.qualifiers["ID"][0]],
|
|
68 "Parent": clean_gene.qualifiers["ID"],
|
|
69 },
|
|
70 sub_features=gene.sub_features,
|
|
71 strand=exon_strand
|
|
72 )
|
|
73 for x in mRNA.sub_features:
|
|
74 x.qualifiers["Parent"] = mRNA["ID"]
|
|
75 clean_gene.sub_features = [mRNA]
|
|
76 else:
|
|
77 for x in clean_gene.sub_features:
|
|
78 if x.type != "mRNA":
|
|
79 x.qualifiers["Parent"] = [mRNA.id] """
|
|
80
|
|
81 # check for CDS child features of the gene, do not go a further step (this should skip any CDS children of exon child features)
|
|
82 for cds in feature_lambda(
|
|
83 gene.sub_features,
|
|
84 feature_test_type,
|
|
85 {"type": "CDS"},
|
|
86 subfeatures=False,
|
|
87 recurse=False,
|
|
88 ):
|
|
89 # check all CDS features for min/max boundaries
|
|
90 if exon_start is None:
|
|
91 exon_start = cds.location.start
|
|
92 exon_strand = cds.location.strand
|
|
93 if exon_end is None:
|
|
94 exon_end = cds.location.end
|
|
95 exon_start = min(exon_start, cds.location.start)
|
|
96 exon_end = max(exon_end, cds.location.end)
|
|
97 cds_list.append(cds)
|
|
98 if cds_list:
|
|
99 # we found a CDS to adopt
|
|
100 new_exon = gffSeqFeature(
|
|
101 location=FeatureLocation(exon_start, exon_end),
|
|
102 type="exon",
|
|
103 source = "cpt.prepApollo",
|
|
104 qualifiers={
|
|
105 "ID": ["%s.exon" % clean_gene.qualifiers["ID"][0]],
|
|
106 "Parent": [clean_gene.id],
|
|
107 "ApolloExon": ["True"],
|
|
108 },
|
|
109 sub_features=[],
|
|
110 strand=exon_strand
|
|
111 )
|
|
112 for cds in cds_list:
|
|
113 cds.qualifiers["Parent"] = new_exon.qualifiers["ID"]
|
|
114 new_exon.sub_features.append(cds)
|
|
115 #gene.sub_features.append(new_exon)
|
|
116 # get all the other children of gene that AREN'T a CDS including the new exon
|
|
117 clean_gene.sub_features.append(copy.deepcopy(new_exon))
|
|
118 #clean_gene.sub_features.append(gffSeqFeature(location=FeatureLocation(exon_start, exon_end, exon_strand), type="exon", source = "cpt.prepApollo", qualifiers={"ID": ["%s.exon" % clean_gene.qualifiers["ID"][0]], "Parent": clean_gene.qualifiers["ID"]}, sub_features=[], strand=exon_strand))
|
|
119 """
|
|
120 for sf in feature_lambda(
|
|
121 gene.sub_features,
|
|
122 feature_test_type,
|
|
123 {"type": "CDS"},
|
|
124 subfeatures=True,
|
|
125 recurse=False,
|
|
126 invert=True,
|
|
127 ):
|
|
128 child = copy.deepcopy(sf)
|
|
129 child.qualifiers["Parent"] = new_exon.qualifiers["ID"]
|
|
130 clean_gene.sub_features.append(child)
|
|
131 """
|
|
132 # add them to the new Exon feature
|
|
133 # return the cleaned gene with new exon
|
|
134 yield clean_gene
|
|
135
|
|
136 def process_features(features):
|
|
137 # change RBS to 'Shine_Dalgarno_sequence'
|
|
138 for rbs in feature_lambda(features, feature_test_type, {'type': "RBS"}):
|
|
139 rbs.type = "Shine_Dalgarno_sequence"
|
|
140
|
|
141 # Filter top level features
|
|
142 for feature in feature_lambda(features, feature_test_type, {"types": ALLOWED_FEATURES}, subfeatures=True):
|
|
143 cleaned_subfeatures = []
|
|
144 for sf in feature.sub_features:
|
|
145 if sf.type in SPECIAL_REMOVED_FEATURES:
|
|
146 # 'gene_component_region' is uncaught by feature_test_type as it contains `gene`
|
|
147 continue
|
|
148 else:
|
|
149 cleaned_subfeatures.append(sf)
|
|
150 feature.sub_features = copy.deepcopy(cleaned_subfeatures)
|
|
151 yield feature
|
|
152
|
|
153 def gff_filter(gff3):
|
|
154 for rec in gffParse(gff3):
|
|
155 cleaned_features = sorted(list(process_features(rec.features)), key=lambda x: x.location.start)
|
|
156 rec.features = sorted(list(add_exons(cleaned_features)), key=lambda x: x.location.start)
|
|
157 rec.annotations = {}
|
|
158 gffWrite([rec], sys.stdout)
|
|
159
|
|
160
|
|
161 if __name__ == "__main__":
|
|
162 parser = argparse.ArgumentParser(
|
|
163 description="add parent exon features to CDSs for Apollo"
|
|
164 )
|
|
165 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations")
|
|
166 args = parser.parse_args()
|
|
167 gff_filter(**vars(args))
|