comparison adjacent_features.py @ 1:e29c36ee61e0 draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:42:35 +0000
parents
children
comparison
equal deleted inserted replaced
0:1311f97dccfa 1:e29c36ee61e0
1 #!/usr/bin/env python
2 from Bio import SeqIO
3 from Bio.Seq import Seq
4 from Bio.Data import CodonTable
5 from Bio.SeqRecord import SeqRecord
6 from Bio.SeqFeature import SeqFeature, FeatureLocation
7 from Bio.Alphabet import generic_dna, generic_protein
8 import argparse
9 import logging
10
11 logging.basicConfig(level=logging.INFO)
12 log = logging.getLogger()
13
14
15 def extract_features(
16 genbankFiles=None,
17 fastaFiles=None,
18 upOut=None,
19 downOut=None,
20 genesOnly=False,
21 cdsOnly=True,
22 forceSeqID=False,
23 forward=1,
24 behind=1,
25 outProt=True,
26 tTable=11,
27 fTable=11,
28 ):
29
30 genList = []
31 fastaList = []
32
33 for fileX in genbankFiles:
34 opener = SeqIO.parse(fileX, "genbank")
35 for (
36 openRec
37 ) in (
38 opener
39 ): # To turn the generator into objects (or else we end up with a list of generators)
40 genList.append(openRec)
41
42 for fileX in fastaFiles:
43 opener = SeqIO.parse(fileX, "fasta")
44 for openRec in opener: # Technically flattens multifastas too
45 fastaList.append(openRec)
46
47 for seqMatch in fastaList:
48 longOut = seqMatch.description
49 protID = seqMatch.id
50 if fTable != 0:
51 fSeq = seqMatch.seq.translate(table=fTable, cds=False)
52 else:
53 fSeq = seqMatch.seq
54
55 for gbk in genList:
56 sourceOut = gbk.id
57 num = -1
58 for feat in gbk.features:
59 num += 1
60
61 if (genesOnly and feat.type != "gene") or (
62 cdsOnly and feat.type != "CDS"
63 ):
64 continue
65
66 if "codon_start" in feat.qualifiers:
67 offset = 1 - int(feat.qualifiers["codon_start"][0])
68 else:
69 offset = 0
70
71 temp = gbk.seq[feat.location.start : feat.location.end]
72 if feat.location.strand == -1:
73 temp = gbk.seq[feat.location.start : feat.location.end - offset]
74 temp = temp.reverse_complement()
75 else:
76 temp = gbk.seq[feat.location.start + offset : feat.location.end]
77
78 if tTable != 0:
79 try:
80 gSeq = temp.translate(table=tTable, cds=True)
81 except CodonTable.TranslationError as cte:
82 # log.info("Translation issue at %s", cte)
83 gSeq = temp.translate(table=tTable, cds=False)
84 else:
85 gSeq = temp
86
87 if not ("protein_id" in feat.qualifiers):
88 feat.qualifiers["protein_id"] = [
89 "++++++++"
90 ] # Junk value for genesOnly flag
91
92 if (gSeq == fSeq) and (
93 protID == feat.qualifiers["protein_id"][0] or forceSeqID == False
94 ):
95 goBack = num - 1
96 goAhead = num + 1
97 numBack = behind
98 numAhead = forward
99 backList = []
100 aheadList = []
101
102 while numBack != 0 and goBack >= 0:
103 if (genesOnly and gbk.features[goBack].type != "gene") or (
104 cdsOnly and gbk.features[goBack].type != "CDS"
105 ):
106 goBack -= 1
107 continue
108 backList.append(gbk.features[goBack])
109 numBack -= 1
110 goBack -= 1
111
112 while numAhead != 0 and goAhead < len(gbk.features):
113 if (genesOnly and gbk.features[goAhead].type != "gene") or (
114 cdsOnly and gbk.features[goAhead].type != "CDS"
115 ):
116 goAhead += 1
117 continue
118 aheadList.append(gbk.features[goAhead])
119 numAhead -= 1
120 goAhead += 1
121
122 backList.reverse()
123 if feat.location.strand == -1:
124 tmpList = aheadList
125 aheadList = backList
126 backList = tmpList
127
128 for item in backList:
129 addition = ""
130 header = ""
131 if "product" in item.qualifiers:
132 addition = " -" + str(item.qualifiers["product"][0]) + "-"
133 if "protein_id" in item.qualifiers:
134 header = (
135 ">"
136 + (item.qualifiers["protein_id"][0])
137 + addition
138 + " (5' of "
139 + longOut
140 + " found within "
141 + sourceOut
142 + ")\n"
143 )
144 else:
145 header = (
146 ">"
147 + (item.qualifiers["locus_tag"][0])
148 + addition
149 + " (5' of "
150 + longOut
151 + " found within "
152 + sourceOut
153 + ")\n"
154 )
155 if outProt == True:
156 if "translation" in item.qualifiers:
157 upOut.write(header)
158 upOut.write(
159 str(item.qualifiers["translation"][0]) + "\n\n"
160 )
161 else:
162 modS = 0
163 modE = 0
164 if "codon_start" in item.qualifiers:
165 if item.location.strand > 0:
166 modS = (
167 int(item.qualifiers["codon_start"][0]) - 1
168 )
169 else:
170 modE = (
171 int(item.qualifiers["codon_start"][0]) - 1
172 )
173
174 seqHold = gbk.seq[
175 item.location.start
176 + modS : item.location.end
177 - modE
178 ]
179 if item.location.strand == -1:
180 seqHold = seqHold.reverse_complement()
181 if cdsOnly:
182 try:
183 finalSeq = ""
184 if tTable != 0:
185 finalSeq = (
186 str(
187 seqHold.translate(
188 table=tTable, cds=True
189 )
190 )
191 + "\n\n"
192 )
193 else:
194 finalSeq = str(seqHold) + "\n\n"
195 # upOut.write(header)
196 # upOut.write(finalSeq)
197 except Exception as bdct:
198 log.warn(
199 "ERROR %s %s",
200 item.qualifiers["locus_tag"][0],
201 bdct,
202 )
203 finalSeq = ""
204 if tTable != 0:
205 finalSeq = (
206 str(
207 seqHold.translate(
208 table=tTable, cds=False
209 )
210 )
211 + "\n\n"
212 )
213 else:
214 finalSeq = str(seqHold) + "\n\n"
215 header = (
216 ">"
217 + (item.qualifiers["locus_tag"][0])
218 + addition
219 + " [INCOMPLETE] (5' of "
220 + longOut
221 + " found within "
222 + sourceOut
223 + ")\n"
224 )
225 upOut.write(header)
226 upOut.write(finalSeq)
227 else:
228
229 if tTable != 0:
230 upOut.write(header)
231 upOut.write(
232 str(
233 seqHold.translate(
234 table=tTable, cds=False
235 )
236 )
237 + "\n\n"
238 )
239 else:
240 upOut.write(header)
241 upOut.write(str(seqHold) + "\n\n")
242 else:
243 upOut.write(header)
244 upOut.write(
245 str(gbk.seq[item.location.start : item.location.end])
246 + "\n\n"
247 )
248
249 for item in aheadList:
250 addition = ""
251 header = ""
252 if "product" in item.qualifiers:
253 addition = " -" + str(item.qualifiers["product"][0]) + "-"
254 if "protein_id" in item.qualifiers:
255 header = (
256 ">"
257 + (item.qualifiers["protein_id"][0])
258 + addition
259 + " (3' of "
260 + longOut
261 + " found within "
262 + sourceOut
263 + ")\n"
264 )
265 else:
266 header = (
267 ">"
268 + (item.qualifiers["locus_tag"][0])
269 + addition
270 + " (3' of "
271 + longOut
272 + " found within "
273 + sourceOut
274 + ")\n"
275 )
276 if outProt == True:
277 if "translation" in item.qualifiers:
278 downOut.write(header)
279 downOut.write(
280 str(item.qualifiers["translation"][0]) + "\n\n"
281 )
282 else:
283 modS = 0
284 modE = 0
285 if "codon_start" in item.qualifiers:
286 if item.location.strand > 0:
287 modS = (
288 int(item.qualifiers["codon_start"][0]) - 1
289 )
290 else:
291 modE = (
292 int(item.qualifiers["codon_start"][0]) - 1
293 )
294
295 seqHold = gbk.seq[
296 item.location.start
297 + modS : item.location.end
298 - modE
299 ]
300 if item.location.strand == -1:
301 seqHold = seqHold.reverse_complement()
302 if cdsOnly:
303 try:
304 finalSeq = ""
305 if tTable != 0:
306 finalSeq = (
307 str(
308 seqHold.translate(
309 table=tTable, cds=True
310 )
311 )
312 + "\n\n"
313 )
314 else:
315 finalSeq = str(seqHold) + "\n\n"
316 # downOut.write(header)
317 # downOut.write(finalSeq)
318 except Exception as bdct:
319 log.warn(
320 "ERROR %s %s",
321 item.qualifiers["locus_tag"][0],
322 bdct,
323 )
324 finalSeq = ""
325 if tTable != 0:
326 finalSeq = (
327 str(
328 seqHold.translate(
329 table=tTable, cds=False
330 )
331 )
332 + "\n\n"
333 )
334 else:
335 finalSeq = str(seqHold) + "\n\n"
336 header = (
337 ">"
338 + (item.qualifiers["locus_tag"][0])
339 + addition
340 + " [INCOMPLETE] (3' of "
341 + longOut
342 + " found within "
343 + sourceOut
344 + ")\n"
345 )
346 downOut.write(header)
347 downOut.write(finalSeq)
348 else:
349
350 if tTable != 0:
351 downOut.write(header)
352 downOut.write(
353 str(
354 seqHold.translate(
355 table=tTable, cds=False
356 )
357 )
358 + "\n\n"
359 )
360 else:
361 downOut.write(header)
362 downOut.write(str(seqHold) + "\n\n")
363 else:
364 downOut.write(header)
365 downOut.write(
366 str(gbk.seq[item.location.start : item.location.end])
367 + "\n\n"
368 )
369 # print(longOut)
370
371 return
372
373
374 if __name__ == "__main__":
375 parser = argparse.ArgumentParser(
376 description="Export a subset of features from a Genbank file", epilog=""
377 )
378 parser.add_argument(
379 "-genbankFiles", nargs="+", type=argparse.FileType("r"), help="Genbank file"
380 )
381 parser.add_argument(
382 "-fastaFiles",
383 nargs="+",
384 type=argparse.FileType("r"),
385 help="Fasta file to match against",
386 )
387 parser.add_argument(
388 "-tTable",
389 type=int,
390 default=11,
391 help="Translation table to use",
392 choices=range(0, 23),
393 )
394 parser.add_argument(
395 "-fTable",
396 type=int,
397 default=11,
398 help="Translation table to use",
399 choices=range(0, 23),
400 )
401 parser.add_argument(
402 "-upOut",
403 type=argparse.FileType("w"),
404 help="Upstream Fasta output",
405 default="test-data/upOut.fa",
406 )
407 parser.add_argument(
408 "-downOut",
409 type=argparse.FileType("w"),
410 help="Downstream Fasta output",
411 default="test-data/downOut.fa",
412 )
413 parser.add_argument(
414 "--genesOnly",
415 action="store_true",
416 help="Search and return only Gene type features",
417 )
418 parser.add_argument(
419 "--cdsOnly",
420 action="store_true",
421 help="Search and return only CDS type features",
422 )
423 parser.add_argument(
424 "--forceSeqID",
425 action="store_true",
426 help="Search and return only CDS type features",
427 )
428 parser.add_argument(
429 "--outProt", action="store_true", help="Output the translated sequence"
430 )
431 parser.add_argument(
432 "--forward",
433 type=int,
434 default=1,
435 help="Number of features upstream from the hit to return",
436 )
437 parser.add_argument(
438 "--behind",
439 type=int,
440 default=1,
441 help="Number of features downstream from the hit to return",
442 )
443 args = parser.parse_args()
444 extract_features(**vars(args))