Mercurial > repos > cpt > cpt_gbk_adjacent
comparison adjacent_features.py @ 1:e29c36ee61e0 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:42:35 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:1311f97dccfa | 1:e29c36ee61e0 |
---|---|
1 #!/usr/bin/env python | |
2 from Bio import SeqIO | |
3 from Bio.Seq import Seq | |
4 from Bio.Data import CodonTable | |
5 from Bio.SeqRecord import SeqRecord | |
6 from Bio.SeqFeature import SeqFeature, FeatureLocation | |
7 from Bio.Alphabet import generic_dna, generic_protein | |
8 import argparse | |
9 import logging | |
10 | |
11 logging.basicConfig(level=logging.INFO) | |
12 log = logging.getLogger() | |
13 | |
14 | |
15 def extract_features( | |
16 genbankFiles=None, | |
17 fastaFiles=None, | |
18 upOut=None, | |
19 downOut=None, | |
20 genesOnly=False, | |
21 cdsOnly=True, | |
22 forceSeqID=False, | |
23 forward=1, | |
24 behind=1, | |
25 outProt=True, | |
26 tTable=11, | |
27 fTable=11, | |
28 ): | |
29 | |
30 genList = [] | |
31 fastaList = [] | |
32 | |
33 for fileX in genbankFiles: | |
34 opener = SeqIO.parse(fileX, "genbank") | |
35 for ( | |
36 openRec | |
37 ) in ( | |
38 opener | |
39 ): # To turn the generator into objects (or else we end up with a list of generators) | |
40 genList.append(openRec) | |
41 | |
42 for fileX in fastaFiles: | |
43 opener = SeqIO.parse(fileX, "fasta") | |
44 for openRec in opener: # Technically flattens multifastas too | |
45 fastaList.append(openRec) | |
46 | |
47 for seqMatch in fastaList: | |
48 longOut = seqMatch.description | |
49 protID = seqMatch.id | |
50 if fTable != 0: | |
51 fSeq = seqMatch.seq.translate(table=fTable, cds=False) | |
52 else: | |
53 fSeq = seqMatch.seq | |
54 | |
55 for gbk in genList: | |
56 sourceOut = gbk.id | |
57 num = -1 | |
58 for feat in gbk.features: | |
59 num += 1 | |
60 | |
61 if (genesOnly and feat.type != "gene") or ( | |
62 cdsOnly and feat.type != "CDS" | |
63 ): | |
64 continue | |
65 | |
66 if "codon_start" in feat.qualifiers: | |
67 offset = 1 - int(feat.qualifiers["codon_start"][0]) | |
68 else: | |
69 offset = 0 | |
70 | |
71 temp = gbk.seq[feat.location.start : feat.location.end] | |
72 if feat.location.strand == -1: | |
73 temp = gbk.seq[feat.location.start : feat.location.end - offset] | |
74 temp = temp.reverse_complement() | |
75 else: | |
76 temp = gbk.seq[feat.location.start + offset : feat.location.end] | |
77 | |
78 if tTable != 0: | |
79 try: | |
80 gSeq = temp.translate(table=tTable, cds=True) | |
81 except CodonTable.TranslationError as cte: | |
82 # log.info("Translation issue at %s", cte) | |
83 gSeq = temp.translate(table=tTable, cds=False) | |
84 else: | |
85 gSeq = temp | |
86 | |
87 if not ("protein_id" in feat.qualifiers): | |
88 feat.qualifiers["protein_id"] = [ | |
89 "++++++++" | |
90 ] # Junk value for genesOnly flag | |
91 | |
92 if (gSeq == fSeq) and ( | |
93 protID == feat.qualifiers["protein_id"][0] or forceSeqID == False | |
94 ): | |
95 goBack = num - 1 | |
96 goAhead = num + 1 | |
97 numBack = behind | |
98 numAhead = forward | |
99 backList = [] | |
100 aheadList = [] | |
101 | |
102 while numBack != 0 and goBack >= 0: | |
103 if (genesOnly and gbk.features[goBack].type != "gene") or ( | |
104 cdsOnly and gbk.features[goBack].type != "CDS" | |
105 ): | |
106 goBack -= 1 | |
107 continue | |
108 backList.append(gbk.features[goBack]) | |
109 numBack -= 1 | |
110 goBack -= 1 | |
111 | |
112 while numAhead != 0 and goAhead < len(gbk.features): | |
113 if (genesOnly and gbk.features[goAhead].type != "gene") or ( | |
114 cdsOnly and gbk.features[goAhead].type != "CDS" | |
115 ): | |
116 goAhead += 1 | |
117 continue | |
118 aheadList.append(gbk.features[goAhead]) | |
119 numAhead -= 1 | |
120 goAhead += 1 | |
121 | |
122 backList.reverse() | |
123 if feat.location.strand == -1: | |
124 tmpList = aheadList | |
125 aheadList = backList | |
126 backList = tmpList | |
127 | |
128 for item in backList: | |
129 addition = "" | |
130 header = "" | |
131 if "product" in item.qualifiers: | |
132 addition = " -" + str(item.qualifiers["product"][0]) + "-" | |
133 if "protein_id" in item.qualifiers: | |
134 header = ( | |
135 ">" | |
136 + (item.qualifiers["protein_id"][0]) | |
137 + addition | |
138 + " (5' of " | |
139 + longOut | |
140 + " found within " | |
141 + sourceOut | |
142 + ")\n" | |
143 ) | |
144 else: | |
145 header = ( | |
146 ">" | |
147 + (item.qualifiers["locus_tag"][0]) | |
148 + addition | |
149 + " (5' of " | |
150 + longOut | |
151 + " found within " | |
152 + sourceOut | |
153 + ")\n" | |
154 ) | |
155 if outProt == True: | |
156 if "translation" in item.qualifiers: | |
157 upOut.write(header) | |
158 upOut.write( | |
159 str(item.qualifiers["translation"][0]) + "\n\n" | |
160 ) | |
161 else: | |
162 modS = 0 | |
163 modE = 0 | |
164 if "codon_start" in item.qualifiers: | |
165 if item.location.strand > 0: | |
166 modS = ( | |
167 int(item.qualifiers["codon_start"][0]) - 1 | |
168 ) | |
169 else: | |
170 modE = ( | |
171 int(item.qualifiers["codon_start"][0]) - 1 | |
172 ) | |
173 | |
174 seqHold = gbk.seq[ | |
175 item.location.start | |
176 + modS : item.location.end | |
177 - modE | |
178 ] | |
179 if item.location.strand == -1: | |
180 seqHold = seqHold.reverse_complement() | |
181 if cdsOnly: | |
182 try: | |
183 finalSeq = "" | |
184 if tTable != 0: | |
185 finalSeq = ( | |
186 str( | |
187 seqHold.translate( | |
188 table=tTable, cds=True | |
189 ) | |
190 ) | |
191 + "\n\n" | |
192 ) | |
193 else: | |
194 finalSeq = str(seqHold) + "\n\n" | |
195 # upOut.write(header) | |
196 # upOut.write(finalSeq) | |
197 except Exception as bdct: | |
198 log.warn( | |
199 "ERROR %s %s", | |
200 item.qualifiers["locus_tag"][0], | |
201 bdct, | |
202 ) | |
203 finalSeq = "" | |
204 if tTable != 0: | |
205 finalSeq = ( | |
206 str( | |
207 seqHold.translate( | |
208 table=tTable, cds=False | |
209 ) | |
210 ) | |
211 + "\n\n" | |
212 ) | |
213 else: | |
214 finalSeq = str(seqHold) + "\n\n" | |
215 header = ( | |
216 ">" | |
217 + (item.qualifiers["locus_tag"][0]) | |
218 + addition | |
219 + " [INCOMPLETE] (5' of " | |
220 + longOut | |
221 + " found within " | |
222 + sourceOut | |
223 + ")\n" | |
224 ) | |
225 upOut.write(header) | |
226 upOut.write(finalSeq) | |
227 else: | |
228 | |
229 if tTable != 0: | |
230 upOut.write(header) | |
231 upOut.write( | |
232 str( | |
233 seqHold.translate( | |
234 table=tTable, cds=False | |
235 ) | |
236 ) | |
237 + "\n\n" | |
238 ) | |
239 else: | |
240 upOut.write(header) | |
241 upOut.write(str(seqHold) + "\n\n") | |
242 else: | |
243 upOut.write(header) | |
244 upOut.write( | |
245 str(gbk.seq[item.location.start : item.location.end]) | |
246 + "\n\n" | |
247 ) | |
248 | |
249 for item in aheadList: | |
250 addition = "" | |
251 header = "" | |
252 if "product" in item.qualifiers: | |
253 addition = " -" + str(item.qualifiers["product"][0]) + "-" | |
254 if "protein_id" in item.qualifiers: | |
255 header = ( | |
256 ">" | |
257 + (item.qualifiers["protein_id"][0]) | |
258 + addition | |
259 + " (3' of " | |
260 + longOut | |
261 + " found within " | |
262 + sourceOut | |
263 + ")\n" | |
264 ) | |
265 else: | |
266 header = ( | |
267 ">" | |
268 + (item.qualifiers["locus_tag"][0]) | |
269 + addition | |
270 + " (3' of " | |
271 + longOut | |
272 + " found within " | |
273 + sourceOut | |
274 + ")\n" | |
275 ) | |
276 if outProt == True: | |
277 if "translation" in item.qualifiers: | |
278 downOut.write(header) | |
279 downOut.write( | |
280 str(item.qualifiers["translation"][0]) + "\n\n" | |
281 ) | |
282 else: | |
283 modS = 0 | |
284 modE = 0 | |
285 if "codon_start" in item.qualifiers: | |
286 if item.location.strand > 0: | |
287 modS = ( | |
288 int(item.qualifiers["codon_start"][0]) - 1 | |
289 ) | |
290 else: | |
291 modE = ( | |
292 int(item.qualifiers["codon_start"][0]) - 1 | |
293 ) | |
294 | |
295 seqHold = gbk.seq[ | |
296 item.location.start | |
297 + modS : item.location.end | |
298 - modE | |
299 ] | |
300 if item.location.strand == -1: | |
301 seqHold = seqHold.reverse_complement() | |
302 if cdsOnly: | |
303 try: | |
304 finalSeq = "" | |
305 if tTable != 0: | |
306 finalSeq = ( | |
307 str( | |
308 seqHold.translate( | |
309 table=tTable, cds=True | |
310 ) | |
311 ) | |
312 + "\n\n" | |
313 ) | |
314 else: | |
315 finalSeq = str(seqHold) + "\n\n" | |
316 # downOut.write(header) | |
317 # downOut.write(finalSeq) | |
318 except Exception as bdct: | |
319 log.warn( | |
320 "ERROR %s %s", | |
321 item.qualifiers["locus_tag"][0], | |
322 bdct, | |
323 ) | |
324 finalSeq = "" | |
325 if tTable != 0: | |
326 finalSeq = ( | |
327 str( | |
328 seqHold.translate( | |
329 table=tTable, cds=False | |
330 ) | |
331 ) | |
332 + "\n\n" | |
333 ) | |
334 else: | |
335 finalSeq = str(seqHold) + "\n\n" | |
336 header = ( | |
337 ">" | |
338 + (item.qualifiers["locus_tag"][0]) | |
339 + addition | |
340 + " [INCOMPLETE] (3' of " | |
341 + longOut | |
342 + " found within " | |
343 + sourceOut | |
344 + ")\n" | |
345 ) | |
346 downOut.write(header) | |
347 downOut.write(finalSeq) | |
348 else: | |
349 | |
350 if tTable != 0: | |
351 downOut.write(header) | |
352 downOut.write( | |
353 str( | |
354 seqHold.translate( | |
355 table=tTable, cds=False | |
356 ) | |
357 ) | |
358 + "\n\n" | |
359 ) | |
360 else: | |
361 downOut.write(header) | |
362 downOut.write(str(seqHold) + "\n\n") | |
363 else: | |
364 downOut.write(header) | |
365 downOut.write( | |
366 str(gbk.seq[item.location.start : item.location.end]) | |
367 + "\n\n" | |
368 ) | |
369 # print(longOut) | |
370 | |
371 return | |
372 | |
373 | |
374 if __name__ == "__main__": | |
375 parser = argparse.ArgumentParser( | |
376 description="Export a subset of features from a Genbank file", epilog="" | |
377 ) | |
378 parser.add_argument( | |
379 "-genbankFiles", nargs="+", type=argparse.FileType("r"), help="Genbank file" | |
380 ) | |
381 parser.add_argument( | |
382 "-fastaFiles", | |
383 nargs="+", | |
384 type=argparse.FileType("r"), | |
385 help="Fasta file to match against", | |
386 ) | |
387 parser.add_argument( | |
388 "-tTable", | |
389 type=int, | |
390 default=11, | |
391 help="Translation table to use", | |
392 choices=range(0, 23), | |
393 ) | |
394 parser.add_argument( | |
395 "-fTable", | |
396 type=int, | |
397 default=11, | |
398 help="Translation table to use", | |
399 choices=range(0, 23), | |
400 ) | |
401 parser.add_argument( | |
402 "-upOut", | |
403 type=argparse.FileType("w"), | |
404 help="Upstream Fasta output", | |
405 default="test-data/upOut.fa", | |
406 ) | |
407 parser.add_argument( | |
408 "-downOut", | |
409 type=argparse.FileType("w"), | |
410 help="Downstream Fasta output", | |
411 default="test-data/downOut.fa", | |
412 ) | |
413 parser.add_argument( | |
414 "--genesOnly", | |
415 action="store_true", | |
416 help="Search and return only Gene type features", | |
417 ) | |
418 parser.add_argument( | |
419 "--cdsOnly", | |
420 action="store_true", | |
421 help="Search and return only CDS type features", | |
422 ) | |
423 parser.add_argument( | |
424 "--forceSeqID", | |
425 action="store_true", | |
426 help="Search and return only CDS type features", | |
427 ) | |
428 parser.add_argument( | |
429 "--outProt", action="store_true", help="Output the translated sequence" | |
430 ) | |
431 parser.add_argument( | |
432 "--forward", | |
433 type=int, | |
434 default=1, | |
435 help="Number of features upstream from the hit to return", | |
436 ) | |
437 parser.add_argument( | |
438 "--behind", | |
439 type=int, | |
440 default=1, | |
441 help="Number of features downstream from the hit to return", | |
442 ) | |
443 args = parser.parse_args() | |
444 extract_features(**vars(args)) |