comparison cpt_phageqc_annotation/ @ 0:c3140b08d703 draft default tip

author cpt
date Fri, 17 Jun 2022 13:00:50 +0000 (2022-06-17)
equal deleted inserted replaced
-1:000000000000 0:c3140b08d703
1 #!/usr/bin/env python
2 import re
3 import sys
4 import argparse
5 import logging
6 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature
7 from Bio import SeqIO
8 from Bio.SeqRecord import SeqRecord
9 from Bio.SeqFeature import FeatureLocation
10 from gff3 import (
11 feature_lambda,
12 feature_test_type,
13 feature_test_true,
14 feature_test_quals,
15 get_id,
16 ensure_location_in_bounds,
17 )
19 logging.basicConfig(level=logging.INFO)
20 log = logging.getLogger()
23 class NaiveSDCaller(object):
25 # TODO May make switch for different sequence sets
28 "GGAGGT",
29 "AGGAGG",
30 "GGGGGG",
31 "AGGAG",
32 "GAGGT",
33 "GGAGG",
34 "GGGGG",
35 "AGGT",
36 "GGGT",
37 "GAGG",
38 "GGGG",
39 "AGGA",
40 "GGAG",
41 "GGA",
42 "GAG",
43 "AGG",
44 "GGT",
45 "GGG",
46 )
48 def __init__(self):
49 self.sd_reg = [re.compile(x, re.IGNORECASE) for x in self.SD_SEQUENCES]
51 def list_sds(self, sequence, sd_min=3, sd_max=17):
52 hits = []
53 for regex in self.sd_reg:
54 for match in regex.finditer(sequence):
55 spacing = len(sequence) - len( - match.start()
56 if sd_max >= spacing+sd_min and spacing+sd_min >= sd_min:
57 #if the spacing is within gap limits, add
58 #(search space is [sd_max+7 .. sd_min] so actual gap is spacing+sd_min)
59 #print('min %d max %d - adding SD with gap %d' % (sd_min, sd_max, spacing+sd_min))
60 hits.append(
61 {
62 "spacing": spacing,
63 "hit":,
64 "start": match.start(),
65 "end": match.end(),
66 "len": len(,
67 }
68 )
69 hits = sorted(hits, key= lambda x: (-x['len'],x['spacing']))
70 return hits
72 @classmethod
73 def highlight_sd(cls, sequence, start, end):
74 return " ".join(
75 [
76 sequence[0:start].lower(),
77 sequence[start:end].upper(),
78 sequence[end:].lower(),
79 ]
80 )
82 @classmethod
83 def to_features(cls, hits, strand, parent_start, parent_end, feature_id=None, sd_min=3, sd_max=17):
84 results = []
85 for idx, hit in enumerate(hits):
86 # gene complement(124..486)
87 # -1 491 501 0 5 5
88 # -1 491 501 0 4 5
89 # -1 491 501 1 4 5
90 # -1 491 501 2 3 5
91 # -1 491 501 1 3 5
92 # -1 491 501 0 3 5
94 qualifiers = {
95 "source": "CPT_ShineFind",
96 "ID": "%s.rbs-%s" % (feature_id, idx),
97 }
99 if strand > 0:
100 start = parent_end - hit["spacing"] - hit["len"]
101 end = parent_end - hit["spacing"]
102 else:
103 start = parent_start + hit["spacing"]
104 end = parent_start + hit["spacing"] + hit["len"]
105 # check that the END of the SD sequence is within the given min/max of parent start/end
107 # gap is either the sd_start-cds_end (neg strand) or the sd_end-cds_start (pos strand)
108 # minimum absolute value of these two will be the proper gap regardless of strand
109 tmp = gffSeqFeature(
110 FeatureLocation(min(start, end), max(start, end), strand=strand),
111 #FeatureLocation(min(start, end), max(start, end), strand=strand),
112 type="Shine_Dalgarno_sequence",
113 qualifiers=qualifiers,
114 )
115 results.append(tmp)
116 return results
118 def testFeatureUpstream(self, feature, record, sd_min=3, sd_max=17):
119 # Strand information necessary to getting correct upstream sequence
120 strand = feature.location.strand
122 # n_bases_upstream (plus/minus 7 upstream to make the min/max define the possible gap position)
123 if strand > 0:
124 start = feature.location.start - sd_max - 7
125 end = feature.location.start - sd_min
126 else:
127 start = feature.location.end + sd_min
128 end = feature.location.end + sd_max + 7
130 (start, end) = ensure_location_in_bounds(
131 start=start, end=end, parent_length=len(record)
132 )
134 # Create our temp feature used to obtain correct portion of
135 # genome
136 tmp = gffSeqFeature(FeatureLocation(min(start, end), max(start, end), strand=strand), type="domain")
137 seq = str(tmp.extract(record.seq))
138 return self.list_sds(seq, sd_min, sd_max), start, end, seq
140 def hasSd(self, feature, record, sd_min=3, sd_max=17):
141 sds, start, end, seq = self.testFeatureUpstream(
142 feature, record, sd_min=sd_min, sd_max=sd_max
143 )
144 return len(sds) > 0
147 # Cycle through subfeatures, set feature's location to be equal
148 # to the smallest start and largest end.
149 # Remove pending bugfix for feature display in Apollo
150 def fminmax(feature):
151 fmin = None
152 fmax = None
153 for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True):
154 if fmin is None:
155 fmin = sf.location.start
156 fmax = sf.location.end
157 if sf.location.start < fmin:
158 fmin = sf.location.start
159 if sf.location.end > fmax:
160 fmax = sf.location.end
161 return fmin, fmax
164 def fix_gene_boundaries(feature):
165 # There is a bug in Apollo whereby we have created gene
166 # features which are larger than expected, but we cannot see this.
167 # We only see a perfect sized gene + SD together.
168 #
169 # So, we clamp the location of the gene feature to the
170 # contained mRNAs. Will remove pending Apollo upgrade.
171 fmin, fmax = fminmax(feature)
172 if feature.location.strand > 0:
173 feature.location = FeatureLocation(fmin, fmax, strand=1)
174 else:
175 feature.location = FeatureLocation(fmin, fmax, strand=-1)
176 return feature
178 def shinefind(
179 fasta,
180 gff3,
181 gff3_output=None,
182 table_output=None,
183 lookahead_min=3,
184 lookahead_max=17,
185 top_only=False,
186 add=False,
187 ):
188 table_output.write(
189 "\t".join(
190 [
191 "ID",
192 "Name",
193 "Terminus",
194 "Terminus",
195 "Strand",
196 "Upstream Sequence",
197 "SD",
198 "Spacing",
199 ]
200 )
201 + "\n"
202 )
204 sd_finder = NaiveSDCaller()
205 # Load up sequence(s) for GFF3 data
206 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
207 # Parse GFF3 records
208 for record in gffParse(gff3, base_dict=seq_dict):
209 # Shinefind's gff3_output.
210 gff3_output_record = SeqRecord(record.seq,
211 # Filter out just coding sequences
212 ignored_features = []
213 for x in record.features:
214 # If feature X does NOT contain a CDS, add to ignored_features
215 # list. This means if we have a top level gene feature with or
216 # without a CDS subfeature, we're catch it appropriately here.
217 if (
218 len(
219 list(
220 feature_lambda(
221 [x], feature_test_type, {"type": "CDS"}, subfeatures=True
222 )
223 )
224 )
225 == 0
226 ):
227 ignored_features.append(x)
229 # Loop over all gene features
230 for gene in feature_lambda(
231 record.features, feature_test_type, {"type": "gene"}, subfeatures=True
232 ):
234 # Get the CDS from this gene.
235 feature = sorted(
236 list(
237 feature_lambda(
238 gene.sub_features,
239 feature_test_type,
240 {"type": "CDS"},
241 subfeatures=True,
242 )
243 ),
244 key=lambda x: x.location.start,
245 )
246 # If no CDSs are in this gene feature, then quit
247 if len(feature) == 0:
248 # We've already caught these above in our ignored_features
249 # list, so we skip out on the rest of this for loop
250 continue
251 else:
252 # Otherwise pull the first on the strand.
253 feature = feature[0]
255 # Three different ways RBSs can be stored that we expect.
256 rbs_rbs = list(
257 feature_lambda(
258 gene.sub_features,
259 feature_test_type,
260 {"type": "RBS"},
261 subfeatures=False,
262 )
263 )
264 rbs_sds = list(
265 feature_lambda(
266 gene.sub_features,
267 feature_test_type,
268 {"type": "Shine_Dalgarno_sequence"},
269 subfeatures=False,
270 )
271 )
272 regulatory_elements = list(
273 feature_lambda(
274 gene.sub_features,
275 feature_test_type,
276 {"type": "regulatory"},
277 subfeatures=False,
278 )
279 )
280 rbs_regulatory = list(
281 feature_lambda(
282 regulatory_elements,
283 feature_test_quals,
284 {"regulatory_class": ["ribosome_binding_site"]},
285 subfeatures=False,
286 )
287 )
288 rbss = rbs_rbs + rbs_sds + rbs_regulatory
290 # If someone has already annotated an RBS, we move to the next gene
291 if len(rbss) > 0:
292 log.debug("Has %s RBSs", len(rbss))
293 ignored_features.append(gene)
294 continue
296 sds, start, end, seq = sd_finder.testFeatureUpstream(
297 feature, record, sd_min=lookahead_min, sd_max=lookahead_max
298 )
300 feature_id = get_id(feature)
301 sd_features = sd_finder.to_features(
302 sds, feature.location.strand, start, end,
303 )
305 human_strand = "+" if feature.location.strand == 1 else "-"
307 #
308 log.debug("Found %s SDs", len(sds))
309 for (sd, sd_feature) in zip(sds, sd_features):
310 # If we only want the top feature, after the bulk of the
311 # forloop executes once, we append the top feature, and fake a
312 # break, because an actual break triggers the else: block
313 table_output.write(
314 "\t".join(
315 map(
316 str,
317 [
319 feature_id,
320 feature.location.start,
321 feature.location.end,
322 human_strand,
323 sd_finder.highlight_sd(seq, sd["start"], sd["end"]),
324 sd["hit"],
325 int(sd["spacing"]) + lookahead_min,
326 ],
327 )
328 )
329 + "\n"
330 )
332 if add:
333 # Append the top RBS to the gene feature
334 gene.sub_features.append(sd_feature)
335 # Pick out start/end locations for all sub_features
336 locations = [x.location.start for x in gene.sub_features] + [
337 x.location.end for x in gene.sub_features
338 ]
339 # Update gene's start/end to be inclusive
340 gene.location._start = min(locations)
341 gene.location._end = max(locations)
342 # Also register the feature with the separate GFF3 output
343 sd_feature = fix_gene_boundaries(sd_feature)
344 gff3_output_record.features.append(sd_feature)
346 if top_only or sd == (sds[-1]):
347 break
348 else:
349 table_output.write(
350 "\t".join(
351 map(
352 str,
353 [
355 feature_id,
356 feature.location.start,
357 feature.location.end,
358 human_strand,
359 seq,
360 None,
361 -1,
362 ],
363 )
364 )
365 + "\n"
366 )
368 record.annotations = {}
369 gffWrite([record], sys.stdout)
371 gff3_output_record.features = sorted(
372 gff3_output_record.features, key=lambda x: x.location.start
373 )
374 gff3_output_record.annotations = {}
375 gffWrite([gff3_output_record], gff3_output)
378 if __name__ == "__main__":
379 parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences")
380 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
381 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations")
383 parser.add_argument(
384 "--gff3_output",
385 type=argparse.FileType("w"),
386 help="GFF3 Output",
387 default="shinefind.gff3",
388 )
389 parser.add_argument(
390 "--table_output",
391 type=argparse.FileType("w"),
392 help="Tabular Output",
393 default="shinefind.tbl",
394 )
396 parser.add_argument(
397 "--lookahead_min",
398 nargs="?",
399 type=int,
400 help="Number of bases upstream of CDSs to end search",
401 default=3,
402 )
403 parser.add_argument(
404 "--lookahead_max",
405 nargs="?",
406 type=int,
407 help="Number of bases upstream of CDSs to begin search",
408 default=17,
409 )
411 parser.add_argument("--top_only", action="store_true", help="Only report best hits")
412 parser.add_argument(
413 "--add",
414 action="store_true",
415 help='Function in "addition" mode whereby the '
416 + "RBSs are added directly to the gene model.",
417 )
419 args = parser.parse_args()
420 shinefind(**vars(args))