comparison shinefind.py @ 4:5004ddb62700 draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:53:31 +0000
parents
children
comparison
equal deleted inserted replaced
3:4d4a4b603d33 4:5004ddb62700
1 #!/usr/bin/env python
2 import re
3 import sys
4 import argparse
5 import logging
6 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature
7 from Bio import SeqIO
8 from Bio.SeqRecord import SeqRecord
9 from Bio.SeqFeature import FeatureLocation
10 from gff3 import (
11 feature_lambda,
12 feature_test_type,
13 feature_test_true,
14 feature_test_quals,
15 get_id,
16 ensure_location_in_bounds,
17 )
18
19 logging.basicConfig(level=logging.INFO)
20 log = logging.getLogger()
21
22
23 class NaiveSDCaller(object):
24
25 # TODO May make switch for different sequence sets
26 SD_SEQUENCES = (
27 "AGGAGGT",
28 "GGAGGT",
29 "AGGAGG",
30 "GGGGGG",
31 "AGGAG",
32 "GAGGT",
33 "GGAGG",
34 "GGGGG",
35 "AGGT",
36 "GGGT",
37 "GAGG",
38 "GGGG",
39 "AGGA",
40 "GGAG",
41 "GGA",
42 "GAG",
43 "AGG",
44 "GGT",
45 "GGG",
46 )
47
48 def __init__(self):
49 self.sd_reg = [re.compile(x, re.IGNORECASE) for x in self.SD_SEQUENCES]
50
51 def list_sds(self, sequence, sd_min=3, sd_max=17):
52 hits = []
53 for regex in self.sd_reg:
54 for match in regex.finditer(sequence):
55 spacing = len(sequence) - len(match.group()) - match.start()
56 if sd_max >= spacing + sd_min and spacing + sd_min >= sd_min:
57 # if the spacing is within gap limits, add
58 # (search space is [sd_max+7 .. sd_min] so actual gap is spacing+sd_min)
59 # print('min %d max %d - adding SD with gap %d' % (sd_min, sd_max, spacing+sd_min))
60 hits.append(
61 {
62 "spacing": spacing,
63 "hit": match.group(),
64 "start": match.start(),
65 "end": match.end(),
66 "len": len(match.group()),
67 }
68 )
69 hits = sorted(hits, key=lambda x: (-x["len"], x["spacing"]))
70 return hits
71
72 @classmethod
73 def highlight_sd(cls, sequence, start, end):
74 return " ".join(
75 [
76 sequence[0:start].lower(),
77 sequence[start:end].upper(),
78 sequence[end:].lower(),
79 ]
80 )
81
82 @classmethod
83 def to_features(
84 cls,
85 hits,
86 strand,
87 parent_start,
88 parent_end,
89 feature_id=None,
90 sd_min=3,
91 sd_max=17,
92 ):
93 results = []
94 for idx, hit in enumerate(hits):
95 # gene complement(124..486)
96 # -1 491 501 0 5 5
97 # -1 491 501 0 4 5
98 # -1 491 501 1 4 5
99 # -1 491 501 2 3 5
100 # -1 491 501 1 3 5
101 # -1 491 501 0 3 5
102
103 qualifiers = {
104 "source": "CPT_ShineFind",
105 "ID": "%s.rbs-%s" % (feature_id, idx),
106 }
107
108 if strand > 0:
109 start = parent_end - hit["spacing"] - hit["len"]
110 end = parent_end - hit["spacing"]
111 else:
112 start = parent_start + hit["spacing"]
113 end = parent_start + hit["spacing"] + hit["len"]
114 # check that the END of the SD sequence is within the given min/max of parent start/end
115
116 # gap is either the sd_start-cds_end (neg strand) or the sd_end-cds_start (pos strand)
117 # minimum absolute value of these two will be the proper gap regardless of strand
118 tmp = gffSeqFeature(
119 FeatureLocation(min(start, end), max(start, end), strand=strand),
120 # FeatureLocation(min(start, end), max(start, end), strand=strand),
121 type="Shine_Dalgarno_sequence",
122 qualifiers=qualifiers,
123 )
124 results.append(tmp)
125 return results
126
127 def testFeatureUpstream(self, feature, record, sd_min=3, sd_max=17):
128 # Strand information necessary to getting correct upstream sequence
129 strand = feature.location.strand
130
131 # n_bases_upstream (plus/minus 7 upstream to make the min/max define the possible gap position)
132 if strand > 0:
133 start = feature.location.start - sd_max - 7
134 end = feature.location.start - sd_min
135 else:
136 start = feature.location.end + sd_min
137 end = feature.location.end + sd_max + 7
138
139 (start, end) = ensure_location_in_bounds(
140 start=start, end=end, parent_length=len(record)
141 )
142
143 # Create our temp feature used to obtain correct portion of
144 # genome
145 tmp = gffSeqFeature(
146 FeatureLocation(min(start, end), max(start, end), strand=strand),
147 type="domain",
148 )
149 seq = str(tmp.extract(record.seq))
150 return self.list_sds(seq, sd_min, sd_max), start, end, seq
151
152 def hasSd(self, feature, record, sd_min=3, sd_max=17):
153 sds, start, end, seq = self.testFeatureUpstream(
154 feature, record, sd_min=sd_min, sd_max=sd_max
155 )
156 return len(sds) > 0
157
158
159 # Cycle through subfeatures, set feature's location to be equal
160 # to the smallest start and largest end.
161 # Remove pending bugfix for feature display in Apollo
162 def fminmax(feature):
163 fmin = None
164 fmax = None
165 for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True):
166 if fmin is None:
167 fmin = sf.location.start
168 fmax = sf.location.end
169 if sf.location.start < fmin:
170 fmin = sf.location.start
171 if sf.location.end > fmax:
172 fmax = sf.location.end
173 return fmin, fmax
174
175
176 def fix_gene_boundaries(feature):
177 # There is a bug in Apollo whereby we have created gene
178 # features which are larger than expected, but we cannot see this.
179 # We only see a perfect sized gene + SD together.
180 #
181 # So, we clamp the location of the gene feature to the
182 # contained mRNAs. Will remove pending Apollo upgrade.
183 fmin, fmax = fminmax(feature)
184 if feature.location.strand > 0:
185 feature.location = FeatureLocation(fmin, fmax, strand=1)
186 else:
187 feature.location = FeatureLocation(fmin, fmax, strand=-1)
188 return feature
189
190
191 def shinefind(
192 fasta,
193 gff3,
194 gff3_output=None,
195 table_output=None,
196 lookahead_min=3,
197 lookahead_max=17,
198 top_only=False,
199 add=False,
200 ):
201 table_output.write(
202 "\t".join(
203 [
204 "ID",
205 "Name",
206 "Terminus",
207 "Terminus",
208 "Strand",
209 "Upstream Sequence",
210 "SD",
211 "Spacing",
212 ]
213 )
214 + "\n"
215 )
216
217 sd_finder = NaiveSDCaller()
218 # Load up sequence(s) for GFF3 data
219 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
220 # Parse GFF3 records
221 for record in gffParse(gff3, base_dict=seq_dict):
222 # Shinefind's gff3_output.
223 gff3_output_record = SeqRecord(record.seq, record.id)
224 # Filter out just coding sequences
225 ignored_features = []
226 for x in record.features:
227 # If feature X does NOT contain a CDS, add to ignored_features
228 # list. This means if we have a top level gene feature with or
229 # without a CDS subfeature, we're catch it appropriately here.
230 if (
231 len(
232 list(
233 feature_lambda(
234 [x], feature_test_type, {"type": "CDS"}, subfeatures=True
235 )
236 )
237 )
238 == 0
239 ):
240 ignored_features.append(x)
241
242 # Loop over all gene features
243 for gene in feature_lambda(
244 record.features, feature_test_type, {"type": "gene"}, subfeatures=True
245 ):
246
247 # Get the CDS from this gene.
248 feature = sorted(
249 list(
250 feature_lambda(
251 gene.sub_features,
252 feature_test_type,
253 {"type": "CDS"},
254 subfeatures=True,
255 )
256 ),
257 key=lambda x: x.location.start,
258 )
259 # If no CDSs are in this gene feature, then quit
260 if len(feature) == 0:
261 # We've already caught these above in our ignored_features
262 # list, so we skip out on the rest of this for loop
263 continue
264 else:
265 # Otherwise pull the first on the strand.
266 feature = feature[0]
267
268 # Three different ways RBSs can be stored that we expect.
269 rbs_rbs = list(
270 feature_lambda(
271 gene.sub_features,
272 feature_test_type,
273 {"type": "RBS"},
274 subfeatures=False,
275 )
276 )
277 rbs_sds = list(
278 feature_lambda(
279 gene.sub_features,
280 feature_test_type,
281 {"type": "Shine_Dalgarno_sequence"},
282 subfeatures=False,
283 )
284 )
285 regulatory_elements = list(
286 feature_lambda(
287 gene.sub_features,
288 feature_test_type,
289 {"type": "regulatory"},
290 subfeatures=False,
291 )
292 )
293 rbs_regulatory = list(
294 feature_lambda(
295 regulatory_elements,
296 feature_test_quals,
297 {"regulatory_class": ["ribosome_binding_site"]},
298 subfeatures=False,
299 )
300 )
301 rbss = rbs_rbs + rbs_sds + rbs_regulatory
302
303 # If someone has already annotated an RBS, we move to the next gene
304 if len(rbss) > 0:
305 log.debug("Has %s RBSs", len(rbss))
306 ignored_features.append(gene)
307 continue
308
309 sds, start, end, seq = sd_finder.testFeatureUpstream(
310 feature, record, sd_min=lookahead_min, sd_max=lookahead_max
311 )
312
313 feature_id = get_id(feature)
314 sd_features = sd_finder.to_features(
315 sds, feature.location.strand, start, end, feature_id=feature.id
316 )
317
318 human_strand = "+" if feature.location.strand == 1 else "-"
319
320 # http://book.pythontips.com/en/latest/for_-_else.html
321 log.debug("Found %s SDs", len(sds))
322 for (sd, sd_feature) in zip(sds, sd_features):
323 # If we only want the top feature, after the bulk of the
324 # forloop executes once, we append the top feature, and fake a
325 # break, because an actual break triggers the else: block
326 table_output.write(
327 "\t".join(
328 map(
329 str,
330 [
331 feature.id,
332 feature_id,
333 feature.location.start,
334 feature.location.end,
335 human_strand,
336 sd_finder.highlight_sd(seq, sd["start"], sd["end"]),
337 sd["hit"],
338 int(sd["spacing"]) + lookahead_min,
339 ],
340 )
341 )
342 + "\n"
343 )
344
345 if add:
346 # Append the top RBS to the gene feature
347 gene.sub_features.append(sd_feature)
348 # Pick out start/end locations for all sub_features
349 locations = [x.location.start for x in gene.sub_features] + [
350 x.location.end for x in gene.sub_features
351 ]
352 # Update gene's start/end to be inclusive
353 gene.location._start = min(locations)
354 gene.location._end = max(locations)
355 # Also register the feature with the separate GFF3 output
356 sd_feature = fix_gene_boundaries(sd_feature)
357 gff3_output_record.features.append(sd_feature)
358
359 if top_only or sd == (sds[-1]):
360 break
361 else:
362 table_output.write(
363 "\t".join(
364 map(
365 str,
366 [
367 feature.id,
368 feature_id,
369 feature.location.start,
370 feature.location.end,
371 human_strand,
372 seq,
373 None,
374 -1,
375 ],
376 )
377 )
378 + "\n"
379 )
380
381 record.annotations = {}
382 gffWrite([record], sys.stdout)
383
384 gff3_output_record.features = sorted(
385 gff3_output_record.features, key=lambda x: x.location.start
386 )
387 gff3_output_record.annotations = {}
388 gffWrite([gff3_output_record], gff3_output)
389
390
391 if __name__ == "__main__":
392 parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences")
393 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
394 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations")
395
396 parser.add_argument(
397 "--gff3_output",
398 type=argparse.FileType("w"),
399 help="GFF3 Output",
400 default="shinefind.gff3",
401 )
402 parser.add_argument(
403 "--table_output",
404 type=argparse.FileType("w"),
405 help="Tabular Output",
406 default="shinefind.tbl",
407 )
408
409 parser.add_argument(
410 "--lookahead_min",
411 nargs="?",
412 type=int,
413 help="Number of bases upstream of CDSs to end search",
414 default=3,
415 )
416 parser.add_argument(
417 "--lookahead_max",
418 nargs="?",
419 type=int,
420 help="Number of bases upstream of CDSs to begin search",
421 default=17,
422 )
423
424 parser.add_argument("--top_only", action="store_true", help="Only report best hits")
425 parser.add_argument(
426 "--add",
427 action="store_true",
428 help='Function in "addition" mode whereby the '
429 + "RBSs are added directly to the gene model.",
430 )
431
432 args = parser.parse_args()
433 shinefind(**vars(args))