Mercurial > repos > cpt > cpt_prep_for_apollo
comparison cpt_gff_apollo_prep/gff3_prep_for_apollo.py @ 0:eb0c42719156 draft
Uploaded
author | cpt |
---|---|
date | Fri, 13 May 2022 04:55:55 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:eb0c42719156 |
---|---|
1 #!/usr/bin/env python | |
2 import sys | |
3 import logging | |
4 import argparse | |
5 import copy | |
6 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature | |
7 from gff3 import feature_lambda, feature_test_type | |
8 from Bio.SeqFeature import FeatureLocation | |
9 | |
10 logging.basicConfig(level=logging.INFO) | |
11 log = logging.getLogger(__name__) | |
12 | |
13 ALLOWED_FEATURES = [ | |
14 "mRNA", | |
15 "exon", | |
16 "transposable_element", | |
17 "tRNA", | |
18 "transcript", | |
19 "terminator", | |
20 "Shine_Dalgarno_Sequence", | |
21 "pseudogene", | |
22 "stop_codon_read_through", | |
23 "repeat_region", | |
24 "CDS", | |
25 "gene", | |
26 "rRNA", | |
27 "ncRNA", | |
28 "snRNA", | |
29 "snoRNA", | |
30 "miRNA", | |
31 ] | |
32 | |
33 SPECIAL_REMOVED_FEATURES = ["gene_component_region", "sequence_difference"] | |
34 | |
35 | |
36 | |
37 def add_exons(features): | |
38 for gene in feature_lambda( | |
39 features, feature_test_type, {"type": "gene"}, subfeatures=True | |
40 ): | |
41 clean_gene = copy.deepcopy(gene) | |
42 exon_start = None | |
43 exon_end = None | |
44 exon_strand = None | |
45 cds_list = [] | |
46 | |
47 #for mRNA in gene.sub_features: | |
48 # for x in mRNA.sub_features: | |
49 # x.qualifiers["Parent"] = [gene.id] | |
50 # gene.sub_features.append(x) | |
51 | |
52 for exon in feature_lambda(gene.sub_features, feature_test_type, {"type": "exon"}, subfeatures=False,recurse=False): | |
53 #if the gene contains an exon, skip. | |
54 continue | |
55 hasMRNA = False | |
56 for x in gene.sub_features: | |
57 if x.type == "mRNA": | |
58 hasMRNA = True | |
59 mRNA = x | |
60 """ | |
61 if not hasMRNA: | |
62 mRNA = gffSeqFeature( | |
63 location=FeatureLocation(gene.location.start, gene.location.end, gene.location.strand), | |
64 type="mRNA", | |
65 source = "cpt.prepApollo", | |
66 qualifiers={ | |
67 "ID": ["%s.mRNA" % clean_gene.qualifiers["ID"][0]], | |
68 "Parent": clean_gene.qualifiers["ID"], | |
69 }, | |
70 sub_features=gene.sub_features, | |
71 strand=exon_strand | |
72 ) | |
73 for x in mRNA.sub_features: | |
74 x.qualifiers["Parent"] = mRNA["ID"] | |
75 clean_gene.sub_features = [mRNA] | |
76 else: | |
77 for x in clean_gene.sub_features: | |
78 if x.type != "mRNA": | |
79 x.qualifiers["Parent"] = [mRNA.id] """ | |
80 | |
81 # check for CDS child features of the gene, do not go a further step (this should skip any CDS children of exon child features) | |
82 for cds in feature_lambda( | |
83 gene.sub_features, | |
84 feature_test_type, | |
85 {"type": "CDS"}, | |
86 subfeatures=False, | |
87 recurse=False, | |
88 ): | |
89 # check all CDS features for min/max boundaries | |
90 if exon_start is None: | |
91 exon_start = cds.location.start | |
92 exon_strand = cds.location.strand | |
93 if exon_end is None: | |
94 exon_end = cds.location.end | |
95 exon_start = min(exon_start, cds.location.start) | |
96 exon_end = max(exon_end, cds.location.end) | |
97 cds_list.append(cds) | |
98 if cds_list: | |
99 # we found a CDS to adopt | |
100 new_exon = gffSeqFeature( | |
101 location=FeatureLocation(exon_start, exon_end), | |
102 type="exon", | |
103 source = "cpt.prepApollo", | |
104 qualifiers={ | |
105 "ID": ["%s.exon" % clean_gene.qualifiers["ID"][0]], | |
106 "Parent": [clean_gene.id], | |
107 "ApolloExon": ["True"], | |
108 }, | |
109 sub_features=[], | |
110 strand=exon_strand | |
111 ) | |
112 for cds in cds_list: | |
113 cds.qualifiers["Parent"] = new_exon.qualifiers["ID"] | |
114 new_exon.sub_features.append(cds) | |
115 #gene.sub_features.append(new_exon) | |
116 # get all the other children of gene that AREN'T a CDS including the new exon | |
117 clean_gene.sub_features.append(copy.deepcopy(new_exon)) | |
118 #clean_gene.sub_features.append(gffSeqFeature(location=FeatureLocation(exon_start, exon_end, exon_strand), type="exon", source = "cpt.prepApollo", qualifiers={"ID": ["%s.exon" % clean_gene.qualifiers["ID"][0]], "Parent": clean_gene.qualifiers["ID"]}, sub_features=[], strand=exon_strand)) | |
119 """ | |
120 for sf in feature_lambda( | |
121 gene.sub_features, | |
122 feature_test_type, | |
123 {"type": "CDS"}, | |
124 subfeatures=True, | |
125 recurse=False, | |
126 invert=True, | |
127 ): | |
128 child = copy.deepcopy(sf) | |
129 child.qualifiers["Parent"] = new_exon.qualifiers["ID"] | |
130 clean_gene.sub_features.append(child) | |
131 """ | |
132 # add them to the new Exon feature | |
133 # return the cleaned gene with new exon | |
134 yield clean_gene | |
135 | |
136 def process_features(features): | |
137 # change RBS to 'Shine_Dalgarno_sequence' | |
138 for rbs in feature_lambda(features, feature_test_type, {'type': "RBS"}): | |
139 rbs.type = "Shine_Dalgarno_sequence" | |
140 | |
141 # Filter top level features | |
142 for feature in feature_lambda(features, feature_test_type, {"types": ALLOWED_FEATURES}, subfeatures=True): | |
143 cleaned_subfeatures = [] | |
144 for sf in feature.sub_features: | |
145 if sf.type in SPECIAL_REMOVED_FEATURES: | |
146 # 'gene_component_region' is uncaught by feature_test_type as it contains `gene` | |
147 continue | |
148 else: | |
149 cleaned_subfeatures.append(sf) | |
150 feature.sub_features = copy.deepcopy(cleaned_subfeatures) | |
151 yield feature | |
152 | |
153 def gff_filter(gff3): | |
154 for rec in gffParse(gff3): | |
155 cleaned_features = sorted(list(process_features(rec.features)), key=lambda x: x.location.start) | |
156 rec.features = sorted(list(add_exons(cleaned_features)), key=lambda x: x.location.start) | |
157 rec.annotations = {} | |
158 gffWrite([rec], sys.stdout) | |
159 | |
160 | |
161 if __name__ == "__main__": | |
162 parser = argparse.ArgumentParser( | |
163 description="add parent exon features to CDSs for Apollo" | |
164 ) | |
165 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations") | |
166 args = parser.parse_args() | |
167 gff_filter(**vars(args)) |