0
|
1 #!/usr/bin/env python
|
|
2 import sys
|
|
3 import argparse
|
|
4 import logging
|
|
5 import uuid
|
|
6 from CPT_GFFParser import gffParse, gffWrite
|
|
7 from Bio import SeqIO
|
|
8 from Bio.Seq import Seq
|
|
9 from Bio.SeqRecord import SeqRecord
|
|
10 from Bio.SeqFeature import FeatureLocation, CompoundLocation
|
|
11 from gff3 import feature_lambda, feature_test_type, get_id
|
|
12
|
|
13 logging.basicConfig(level=logging.INFO)
|
|
14 log = logging.getLogger(__name__)
|
|
15
|
|
16
|
|
17 def main(fasta, gff3, feature_filter=None, nodesc=False):
|
|
18 if feature_filter == "nice_cds":
|
|
19 from gff2gb import gff3_to_genbank as cpt_Gff2Gbk
|
|
20
|
|
21
|
|
22 for rec in cpt_Gff2Gbk(gff3, fasta, 11):
|
|
23 seenList = {}
|
|
24 if rec.seq[0] == "?":
|
|
25 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
|
|
26 exit(1)
|
|
27 for feat in sorted(rec.features, key=lambda x: x.location.start):
|
|
28 if feat.type != "CDS":
|
|
29 continue
|
|
30
|
|
31 ind = 0
|
|
32 if (
|
|
33 str(
|
|
34 feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-")
|
|
35 )
|
|
36 in seenList.keys()
|
|
37 ):
|
|
38 seenList[
|
|
39 str(
|
|
40 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
|
|
41 " ", "-"
|
|
42 )
|
|
43 )
|
|
44 ] += 1
|
|
45 ind = seenList[
|
|
46 str(
|
|
47 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
|
|
48 " ", "-"
|
|
49 )
|
|
50 )
|
|
51 ]
|
|
52 else:
|
|
53 seenList[
|
|
54 str(
|
|
55 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
|
|
56 " ", "-"
|
|
57 )
|
|
58 )
|
|
59 ] = 1
|
|
60 append = ""
|
|
61 if ind != 0:
|
|
62 append = "_" + str(ind)
|
|
63
|
|
64 if nodesc:
|
|
65 description = ""
|
|
66 else:
|
|
67 feat.qualifiers["ID"] = [feat._ID]
|
|
68 product = feat.qualifiers.get("product", "")
|
|
69 description = "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format(
|
|
70 feat, product
|
|
71 )
|
|
72 yield [
|
|
73 SeqRecord(
|
|
74 feat.extract(rec).seq,
|
|
75 id=str(
|
|
76 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
|
|
77 " ", "-"
|
|
78 )
|
|
79 )
|
|
80 + append,
|
|
81 description=description,
|
|
82 )
|
|
83 ]
|
|
84
|
|
85 elif feature_filter == "unique_cds":
|
|
86 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
|
|
87 seen_ids = {}
|
|
88
|
|
89 for rec in gffParse(gff3, base_dict=seq_dict):
|
|
90 noMatch = True
|
|
91 if "Alias" in rec.features[0].qualifiers.keys():
|
|
92 lColumn = rec.features[0].qualifiers["Alias"][0]
|
|
93 else:
|
|
94 lColumn = ""
|
|
95 for x in seq_dict:
|
|
96 if x == rec.id or x == lColumn:
|
|
97 noMatch = False
|
|
98 if noMatch:
|
|
99 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
|
|
100 exit(1)
|
|
101 newfeats = []
|
|
102 for feat in sorted(
|
|
103 feature_lambda(
|
|
104 rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False
|
|
105 ),
|
|
106 key=lambda f: f.location.start,
|
|
107 ):
|
|
108 nid = rec.id + "____" + feat.id
|
|
109 if nid in seen_ids:
|
|
110 nid = nid + "__" + uuid.uuid4().hex
|
|
111 feat.qualifiers["ID"] = [nid]
|
|
112 newfeats.append(feat)
|
|
113 seen_ids[nid] = True
|
|
114
|
|
115 if nodesc:
|
|
116 description = ""
|
|
117 else:
|
|
118 if feat.strand == -1:
|
|
119 important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)}
|
|
120 else:
|
|
121 important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)}
|
|
122 if "Name" in feat.qualifiers:
|
|
123 important_data["Name"] = feat.qualifiers.get("Name", [""])[0]
|
|
124
|
|
125 description = "[{}]".format(
|
|
126 ";".join(
|
|
127 [
|
|
128 "{key}={value}".format(key=k, value=v)
|
|
129 for (k, v) in important_data.items()
|
|
130 ]
|
|
131 )
|
|
132 )
|
|
133 #if feat.id == "CPT_Privateer_006.p01":
|
|
134 #print(feat)
|
|
135 #exit()
|
|
136
|
|
137 if isinstance(feat.location, CompoundLocation):
|
|
138 finSeq = ""
|
|
139 if feat.strand == -1:
|
|
140 for x in feat.location.parts:
|
|
141 finSeq += str((rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement())
|
|
142 else:
|
|
143 for x in feat.location.parts:
|
|
144 finSeq += str(rec.seq[feat.location.start + feat.phase: feat.location.end])
|
|
145 yield [
|
|
146 SeqRecord(
|
|
147 finSeq,
|
|
148 id=nid.replace(" ", "-"),
|
|
149 description=description,
|
|
150 )
|
|
151 ]
|
|
152 elif feat.strand == -1:
|
|
153 yield [
|
|
154 SeqRecord(
|
|
155 (rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement(),
|
|
156 id=nid.replace(" ", "-"),
|
|
157 description=description,
|
|
158 )
|
|
159 ]
|
|
160 else:
|
|
161 yield [
|
|
162 SeqRecord(
|
|
163 #feat.extract(rec).seq,
|
|
164 rec.seq[feat.location.start + feat.phase: feat.location.end],
|
|
165 id=nid.replace(" ", "-"),
|
|
166 description=description,
|
|
167 )
|
|
168 ]
|
|
169 rec.features = newfeats
|
|
170 rec.annotations = {}
|
|
171 #gffWrite([rec], sys.stdout)
|
|
172 else:
|
|
173 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
|
|
174
|
|
175 for rec in gffParse(gff3, base_dict=seq_dict):
|
|
176 noMatch = True
|
|
177 if "Alias" in rec.features[0].qualifiers.keys():
|
|
178 lColumn = rec.features[0].qualifiers["Alias"][0]
|
|
179 else:
|
|
180 lColumn = ""
|
|
181 for x in seq_dict:
|
|
182 if x == rec.id or x == lColumn:
|
|
183 noMatch = False
|
|
184 if noMatch:
|
|
185 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
|
|
186 exit(1)
|
|
187 for feat in sorted(
|
|
188 feature_lambda(
|
|
189 rec.features,
|
|
190 feature_test_type,
|
|
191 {"type": feature_filter},
|
|
192 subfeatures=True,
|
|
193 ),
|
|
194 key=lambda f: f.location.start,
|
|
195 ):
|
|
196 id = feat.id
|
|
197 if len(id) == 0:
|
|
198 id = get_id(feat)
|
|
199
|
|
200 if nodesc:
|
|
201 description = ""
|
|
202 else:
|
|
203 if feat.strand == -1:
|
|
204 important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)}
|
|
205 else:
|
|
206 important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)}
|
|
207 if "Name" in feat.qualifiers:
|
|
208 important_data["Name"] = feat.qualifiers.get("Name", [""])[0]
|
|
209
|
|
210 description = "[{}]".format(
|
|
211 ";".join(
|
|
212 [
|
|
213 "{key}={value}".format(key=k, value=v)
|
|
214 for (k, v) in important_data.items()
|
|
215 ]
|
|
216 )
|
|
217 )
|
|
218
|
|
219 if isinstance(feat.location, CompoundLocation):
|
|
220 finSeq = ""
|
|
221 if feat.strand == -1:
|
|
222 for x in feat.location.parts:
|
|
223 finSeq += str((rec.seq[x.start: x.end - feat.phase]).reverse_complement())
|
|
224 else:
|
|
225 for x in feat.location.parts:
|
|
226 finSeq += str(rec.seq[x.start + feat.phase: x.end])
|
|
227 yield [
|
|
228 SeqRecord(
|
|
229 Seq(finSeq),
|
|
230 id=id.replace(" ", "-"),
|
|
231 description=description,
|
|
232 )
|
|
233 ]
|
|
234
|
|
235 else:
|
|
236
|
|
237 if feat.strand == -1:
|
|
238 yield [
|
|
239 SeqRecord(
|
|
240 seq=Seq(str(rec.seq[feat.location.start: feat.location.end - feat.phase])).reverse_complement(),
|
|
241 id=id.replace(" ", "-"),
|
|
242 description=description,
|
|
243 )
|
|
244 ]
|
|
245 else:
|
|
246 yield [
|
|
247 SeqRecord(
|
|
248 #feat.extract(rec).seq,
|
|
249 seq=Seq(str(rec.seq[feat.location.start + feat.phase: feat.location.end])),
|
|
250 id=id.replace(" ", "-"),
|
|
251 description=description,
|
|
252 )
|
|
253 ]
|
|
254
|
|
255
|
|
256 if __name__ == "__main__":
|
|
257 parser = argparse.ArgumentParser(
|
|
258 description="Export corresponding sequence in genome from GFF3", epilog=""
|
|
259 )
|
|
260 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
|
|
261 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File")
|
|
262 parser.add_argument(
|
|
263 "--feature_filter", default=None, help="Filter for specific feature types"
|
|
264 )
|
|
265 parser.add_argument(
|
|
266 "--nodesc", action="store_true", help="Strip description field off"
|
|
267 )
|
|
268 args = parser.parse_args()
|
|
269 for seq in main(**vars(args)):
|
|
270 #if isinstance(seq, list):
|
|
271 # for x in seq:
|
|
272 # print(type(x.seq))
|
|
273 # SeqIO.write(x, sys.stdout, "fasta")
|
|
274 #else:
|
|
275 SeqIO.write(seq, sys.stdout, "fasta")
|