Mercurial > repos > cpt > cpt_shinefind
comparison shinefind.py @ 4:5004ddb62700 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:53:31 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
3:4d4a4b603d33 | 4:5004ddb62700 |
---|---|
1 #!/usr/bin/env python | |
2 import re | |
3 import sys | |
4 import argparse | |
5 import logging | |
6 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature | |
7 from Bio import SeqIO | |
8 from Bio.SeqRecord import SeqRecord | |
9 from Bio.SeqFeature import FeatureLocation | |
10 from gff3 import ( | |
11 feature_lambda, | |
12 feature_test_type, | |
13 feature_test_true, | |
14 feature_test_quals, | |
15 get_id, | |
16 ensure_location_in_bounds, | |
17 ) | |
18 | |
19 logging.basicConfig(level=logging.INFO) | |
20 log = logging.getLogger() | |
21 | |
22 | |
23 class NaiveSDCaller(object): | |
24 | |
25 # TODO May make switch for different sequence sets | |
26 SD_SEQUENCES = ( | |
27 "AGGAGGT", | |
28 "GGAGGT", | |
29 "AGGAGG", | |
30 "GGGGGG", | |
31 "AGGAG", | |
32 "GAGGT", | |
33 "GGAGG", | |
34 "GGGGG", | |
35 "AGGT", | |
36 "GGGT", | |
37 "GAGG", | |
38 "GGGG", | |
39 "AGGA", | |
40 "GGAG", | |
41 "GGA", | |
42 "GAG", | |
43 "AGG", | |
44 "GGT", | |
45 "GGG", | |
46 ) | |
47 | |
48 def __init__(self): | |
49 self.sd_reg = [re.compile(x, re.IGNORECASE) for x in self.SD_SEQUENCES] | |
50 | |
51 def list_sds(self, sequence, sd_min=3, sd_max=17): | |
52 hits = [] | |
53 for regex in self.sd_reg: | |
54 for match in regex.finditer(sequence): | |
55 spacing = len(sequence) - len(match.group()) - match.start() | |
56 if sd_max >= spacing + sd_min and spacing + sd_min >= sd_min: | |
57 # if the spacing is within gap limits, add | |
58 # (search space is [sd_max+7 .. sd_min] so actual gap is spacing+sd_min) | |
59 # print('min %d max %d - adding SD with gap %d' % (sd_min, sd_max, spacing+sd_min)) | |
60 hits.append( | |
61 { | |
62 "spacing": spacing, | |
63 "hit": match.group(), | |
64 "start": match.start(), | |
65 "end": match.end(), | |
66 "len": len(match.group()), | |
67 } | |
68 ) | |
69 hits = sorted(hits, key=lambda x: (-x["len"], x["spacing"])) | |
70 return hits | |
71 | |
72 @classmethod | |
73 def highlight_sd(cls, sequence, start, end): | |
74 return " ".join( | |
75 [ | |
76 sequence[0:start].lower(), | |
77 sequence[start:end].upper(), | |
78 sequence[end:].lower(), | |
79 ] | |
80 ) | |
81 | |
82 @classmethod | |
83 def to_features( | |
84 cls, | |
85 hits, | |
86 strand, | |
87 parent_start, | |
88 parent_end, | |
89 feature_id=None, | |
90 sd_min=3, | |
91 sd_max=17, | |
92 ): | |
93 results = [] | |
94 for idx, hit in enumerate(hits): | |
95 # gene complement(124..486) | |
96 # -1 491 501 0 5 5 | |
97 # -1 491 501 0 4 5 | |
98 # -1 491 501 1 4 5 | |
99 # -1 491 501 2 3 5 | |
100 # -1 491 501 1 3 5 | |
101 # -1 491 501 0 3 5 | |
102 | |
103 qualifiers = { | |
104 "source": "CPT_ShineFind", | |
105 "ID": "%s.rbs-%s" % (feature_id, idx), | |
106 } | |
107 | |
108 if strand > 0: | |
109 start = parent_end - hit["spacing"] - hit["len"] | |
110 end = parent_end - hit["spacing"] | |
111 else: | |
112 start = parent_start + hit["spacing"] | |
113 end = parent_start + hit["spacing"] + hit["len"] | |
114 # check that the END of the SD sequence is within the given min/max of parent start/end | |
115 | |
116 # gap is either the sd_start-cds_end (neg strand) or the sd_end-cds_start (pos strand) | |
117 # minimum absolute value of these two will be the proper gap regardless of strand | |
118 tmp = gffSeqFeature( | |
119 FeatureLocation(min(start, end), max(start, end), strand=strand), | |
120 # FeatureLocation(min(start, end), max(start, end), strand=strand), | |
121 type="Shine_Dalgarno_sequence", | |
122 qualifiers=qualifiers, | |
123 ) | |
124 results.append(tmp) | |
125 return results | |
126 | |
127 def testFeatureUpstream(self, feature, record, sd_min=3, sd_max=17): | |
128 # Strand information necessary to getting correct upstream sequence | |
129 strand = feature.location.strand | |
130 | |
131 # n_bases_upstream (plus/minus 7 upstream to make the min/max define the possible gap position) | |
132 if strand > 0: | |
133 start = feature.location.start - sd_max - 7 | |
134 end = feature.location.start - sd_min | |
135 else: | |
136 start = feature.location.end + sd_min | |
137 end = feature.location.end + sd_max + 7 | |
138 | |
139 (start, end) = ensure_location_in_bounds( | |
140 start=start, end=end, parent_length=len(record) | |
141 ) | |
142 | |
143 # Create our temp feature used to obtain correct portion of | |
144 # genome | |
145 tmp = gffSeqFeature( | |
146 FeatureLocation(min(start, end), max(start, end), strand=strand), | |
147 type="domain", | |
148 ) | |
149 seq = str(tmp.extract(record.seq)) | |
150 return self.list_sds(seq, sd_min, sd_max), start, end, seq | |
151 | |
152 def hasSd(self, feature, record, sd_min=3, sd_max=17): | |
153 sds, start, end, seq = self.testFeatureUpstream( | |
154 feature, record, sd_min=sd_min, sd_max=sd_max | |
155 ) | |
156 return len(sds) > 0 | |
157 | |
158 | |
159 # Cycle through subfeatures, set feature's location to be equal | |
160 # to the smallest start and largest end. | |
161 # Remove pending bugfix for feature display in Apollo | |
162 def fminmax(feature): | |
163 fmin = None | |
164 fmax = None | |
165 for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True): | |
166 if fmin is None: | |
167 fmin = sf.location.start | |
168 fmax = sf.location.end | |
169 if sf.location.start < fmin: | |
170 fmin = sf.location.start | |
171 if sf.location.end > fmax: | |
172 fmax = sf.location.end | |
173 return fmin, fmax | |
174 | |
175 | |
176 def fix_gene_boundaries(feature): | |
177 # There is a bug in Apollo whereby we have created gene | |
178 # features which are larger than expected, but we cannot see this. | |
179 # We only see a perfect sized gene + SD together. | |
180 # | |
181 # So, we clamp the location of the gene feature to the | |
182 # contained mRNAs. Will remove pending Apollo upgrade. | |
183 fmin, fmax = fminmax(feature) | |
184 if feature.location.strand > 0: | |
185 feature.location = FeatureLocation(fmin, fmax, strand=1) | |
186 else: | |
187 feature.location = FeatureLocation(fmin, fmax, strand=-1) | |
188 return feature | |
189 | |
190 | |
191 def shinefind( | |
192 fasta, | |
193 gff3, | |
194 gff3_output=None, | |
195 table_output=None, | |
196 lookahead_min=3, | |
197 lookahead_max=17, | |
198 top_only=False, | |
199 add=False, | |
200 ): | |
201 table_output.write( | |
202 "\t".join( | |
203 [ | |
204 "ID", | |
205 "Name", | |
206 "Terminus", | |
207 "Terminus", | |
208 "Strand", | |
209 "Upstream Sequence", | |
210 "SD", | |
211 "Spacing", | |
212 ] | |
213 ) | |
214 + "\n" | |
215 ) | |
216 | |
217 sd_finder = NaiveSDCaller() | |
218 # Load up sequence(s) for GFF3 data | |
219 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
220 # Parse GFF3 records | |
221 for record in gffParse(gff3, base_dict=seq_dict): | |
222 # Shinefind's gff3_output. | |
223 gff3_output_record = SeqRecord(record.seq, record.id) | |
224 # Filter out just coding sequences | |
225 ignored_features = [] | |
226 for x in record.features: | |
227 # If feature X does NOT contain a CDS, add to ignored_features | |
228 # list. This means if we have a top level gene feature with or | |
229 # without a CDS subfeature, we're catch it appropriately here. | |
230 if ( | |
231 len( | |
232 list( | |
233 feature_lambda( | |
234 [x], feature_test_type, {"type": "CDS"}, subfeatures=True | |
235 ) | |
236 ) | |
237 ) | |
238 == 0 | |
239 ): | |
240 ignored_features.append(x) | |
241 | |
242 # Loop over all gene features | |
243 for gene in feature_lambda( | |
244 record.features, feature_test_type, {"type": "gene"}, subfeatures=True | |
245 ): | |
246 | |
247 # Get the CDS from this gene. | |
248 feature = sorted( | |
249 list( | |
250 feature_lambda( | |
251 gene.sub_features, | |
252 feature_test_type, | |
253 {"type": "CDS"}, | |
254 subfeatures=True, | |
255 ) | |
256 ), | |
257 key=lambda x: x.location.start, | |
258 ) | |
259 # If no CDSs are in this gene feature, then quit | |
260 if len(feature) == 0: | |
261 # We've already caught these above in our ignored_features | |
262 # list, so we skip out on the rest of this for loop | |
263 continue | |
264 else: | |
265 # Otherwise pull the first on the strand. | |
266 feature = feature[0] | |
267 | |
268 # Three different ways RBSs can be stored that we expect. | |
269 rbs_rbs = list( | |
270 feature_lambda( | |
271 gene.sub_features, | |
272 feature_test_type, | |
273 {"type": "RBS"}, | |
274 subfeatures=False, | |
275 ) | |
276 ) | |
277 rbs_sds = list( | |
278 feature_lambda( | |
279 gene.sub_features, | |
280 feature_test_type, | |
281 {"type": "Shine_Dalgarno_sequence"}, | |
282 subfeatures=False, | |
283 ) | |
284 ) | |
285 regulatory_elements = list( | |
286 feature_lambda( | |
287 gene.sub_features, | |
288 feature_test_type, | |
289 {"type": "regulatory"}, | |
290 subfeatures=False, | |
291 ) | |
292 ) | |
293 rbs_regulatory = list( | |
294 feature_lambda( | |
295 regulatory_elements, | |
296 feature_test_quals, | |
297 {"regulatory_class": ["ribosome_binding_site"]}, | |
298 subfeatures=False, | |
299 ) | |
300 ) | |
301 rbss = rbs_rbs + rbs_sds + rbs_regulatory | |
302 | |
303 # If someone has already annotated an RBS, we move to the next gene | |
304 if len(rbss) > 0: | |
305 log.debug("Has %s RBSs", len(rbss)) | |
306 ignored_features.append(gene) | |
307 continue | |
308 | |
309 sds, start, end, seq = sd_finder.testFeatureUpstream( | |
310 feature, record, sd_min=lookahead_min, sd_max=lookahead_max | |
311 ) | |
312 | |
313 feature_id = get_id(feature) | |
314 sd_features = sd_finder.to_features( | |
315 sds, feature.location.strand, start, end, feature_id=feature.id | |
316 ) | |
317 | |
318 human_strand = "+" if feature.location.strand == 1 else "-" | |
319 | |
320 # http://book.pythontips.com/en/latest/for_-_else.html | |
321 log.debug("Found %s SDs", len(sds)) | |
322 for (sd, sd_feature) in zip(sds, sd_features): | |
323 # If we only want the top feature, after the bulk of the | |
324 # forloop executes once, we append the top feature, and fake a | |
325 # break, because an actual break triggers the else: block | |
326 table_output.write( | |
327 "\t".join( | |
328 map( | |
329 str, | |
330 [ | |
331 feature.id, | |
332 feature_id, | |
333 feature.location.start, | |
334 feature.location.end, | |
335 human_strand, | |
336 sd_finder.highlight_sd(seq, sd["start"], sd["end"]), | |
337 sd["hit"], | |
338 int(sd["spacing"]) + lookahead_min, | |
339 ], | |
340 ) | |
341 ) | |
342 + "\n" | |
343 ) | |
344 | |
345 if add: | |
346 # Append the top RBS to the gene feature | |
347 gene.sub_features.append(sd_feature) | |
348 # Pick out start/end locations for all sub_features | |
349 locations = [x.location.start for x in gene.sub_features] + [ | |
350 x.location.end for x in gene.sub_features | |
351 ] | |
352 # Update gene's start/end to be inclusive | |
353 gene.location._start = min(locations) | |
354 gene.location._end = max(locations) | |
355 # Also register the feature with the separate GFF3 output | |
356 sd_feature = fix_gene_boundaries(sd_feature) | |
357 gff3_output_record.features.append(sd_feature) | |
358 | |
359 if top_only or sd == (sds[-1]): | |
360 break | |
361 else: | |
362 table_output.write( | |
363 "\t".join( | |
364 map( | |
365 str, | |
366 [ | |
367 feature.id, | |
368 feature_id, | |
369 feature.location.start, | |
370 feature.location.end, | |
371 human_strand, | |
372 seq, | |
373 None, | |
374 -1, | |
375 ], | |
376 ) | |
377 ) | |
378 + "\n" | |
379 ) | |
380 | |
381 record.annotations = {} | |
382 gffWrite([record], sys.stdout) | |
383 | |
384 gff3_output_record.features = sorted( | |
385 gff3_output_record.features, key=lambda x: x.location.start | |
386 ) | |
387 gff3_output_record.annotations = {} | |
388 gffWrite([gff3_output_record], gff3_output) | |
389 | |
390 | |
391 if __name__ == "__main__": | |
392 parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences") | |
393 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") | |
394 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations") | |
395 | |
396 parser.add_argument( | |
397 "--gff3_output", | |
398 type=argparse.FileType("w"), | |
399 help="GFF3 Output", | |
400 default="shinefind.gff3", | |
401 ) | |
402 parser.add_argument( | |
403 "--table_output", | |
404 type=argparse.FileType("w"), | |
405 help="Tabular Output", | |
406 default="shinefind.tbl", | |
407 ) | |
408 | |
409 parser.add_argument( | |
410 "--lookahead_min", | |
411 nargs="?", | |
412 type=int, | |
413 help="Number of bases upstream of CDSs to end search", | |
414 default=3, | |
415 ) | |
416 parser.add_argument( | |
417 "--lookahead_max", | |
418 nargs="?", | |
419 type=int, | |
420 help="Number of bases upstream of CDSs to begin search", | |
421 default=17, | |
422 ) | |
423 | |
424 parser.add_argument("--top_only", action="store_true", help="Only report best hits") | |
425 parser.add_argument( | |
426 "--add", | |
427 action="store_true", | |
428 help='Function in "addition" mode whereby the ' | |
429 + "RBSs are added directly to the gene model.", | |
430 ) | |
431 | |
432 args = parser.parse_args() | |
433 shinefind(**vars(args)) |