comparison cpt_phageqc_annotation/cpt.py @ 0:c3140b08d703 draft default tip

Uploaded
author cpt
date Fri, 17 Jun 2022 13:00:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:c3140b08d703
1 #!/usr/bin/env python
2 import regex as re
3 from Bio.Seq import Seq, reverse_complement, translate
4 from Bio.SeqRecord import SeqRecord
5 from Bio import SeqIO
6 from Bio.Data import CodonTable
7 import logging
8
9 logging.basicConfig()
10 log = logging.getLogger()
11
12 PHAGE_IN_MIDDLE = re.compile("^(?P<host>.*)\s*phage (?P<phage>.*)$")
13 BACTERIOPHAGE_IN_MIDDLE = re.compile("^(?P<host>.*)\s*bacteriophage (?P<phage>.*)$")
14 STARTS_WITH_PHAGE = re.compile(
15 "^(bacterio|vibrio|Bacterio|Vibrio|)?[Pp]hage (?P<phage>.*)$"
16 )
17 NEW_STYLE_NAMES = re.compile("(?P<phage>v[A-Z]_[A-Z][a-z]{2}_.*)")
18
19
20 def phage_name_parser(name):
21 host = None
22 phage = None
23 name = name.replace(", complete genome.", "")
24 name = name.replace(", complete genome", "")
25
26 m = BACTERIOPHAGE_IN_MIDDLE.match(name)
27 if m:
28 host = m.group("host")
29 phage = m.group("phage")
30 return (host, phage)
31
32 m = PHAGE_IN_MIDDLE.match(name)
33 if m:
34 host = m.group("host")
35 phage = m.group("phage")
36 return (host, phage)
37
38 m = STARTS_WITH_PHAGE.match(name)
39 if m:
40 phage = m.group("phage")
41 return (host, phage)
42
43 m = NEW_STYLE_NAMES.match(name)
44 if m:
45 phage = m.group("phage")
46 return (host, phage)
47
48 return (host, phage)
49
50
51 class OrfFinder(object):
52 def __init__(self, table, ftype, ends, min_len, strand):
53 self.table = table
54 self.table_obj = CodonTable.ambiguous_generic_by_id[table]
55 self.ends = ends
56 self.ftype = ftype
57 self.min_len = min_len
58 self.starts = sorted(self.table_obj.start_codons)
59 self.stops = sorted(self.table_obj.stop_codons)
60 self.re_starts = re.compile("|".join(self.starts))
61 self.re_stops = re.compile("|".join(self.stops))
62 self.strand = strand
63
64 def locate(self, fasta_file, out_nuc, out_prot, out_bed, out_gff3):
65 seq_format = "fasta"
66 log.debug("Genetic code table %i" % self.table)
67 log.debug("Minimum length %i aa" % self.min_len)
68
69 out_count = 0
70
71 out_gff3.write("##gff-version 3\n")
72
73 for idx, record in enumerate(SeqIO.parse(fasta_file, seq_format)):
74 for i, (f_start, f_end, f_strand, n, t) in enumerate(
75 self.get_all_peptides(str(record.seq).upper())
76 ):
77 out_count += 1
78
79 descr = "length %i aa, %i bp, from %s..%s[%s] of %s" % (
80 len(t),
81 len(n),
82 f_start,
83 f_end,
84 f_strand,
85 record.description,
86 )
87 fid = record.id + "|%s%i" % (self.ftype, i + 1)
88
89 r = SeqRecord(Seq(n), id=fid, name="", description=descr)
90 t = SeqRecord(Seq(t), id=fid, name="", description=descr)
91
92 SeqIO.write(r, out_nuc, "fasta")
93 SeqIO.write(t, out_prot, "fasta")
94
95 nice_strand = "+" if f_strand == +1 else "-"
96
97 out_bed.write(
98 "\t".join(
99 map(str, [record.id, f_start, f_end, fid, 0, nice_strand])
100 )
101 + "\n"
102 )
103
104 out_gff3.write(
105 "\t".join(
106 map(
107 str,
108 [
109 record.id,
110 "getOrfsOrCds",
111 "CDS",
112 f_start + 1,
113 f_end,
114 ".",
115 nice_strand,
116 0,
117 "ID=%s.%s.%s" % (self.ftype, idx, i + 1),
118 ],
119 )
120 )
121 + "\n"
122 )
123 log.info("Found %i %ss", out_count, self.ftype)
124
125 def start_chop_and_trans(self, s, strict=True):
126 """Returns offset, trimmed nuc, protein."""
127 if strict:
128 assert s[-3:] in self.stops, s
129 assert len(s) % 3 == 0
130 for match in self.re_starts.finditer(s, overlapped=True):
131 # Must check the start is in frame
132 start = match.start()
133 if start % 3 == 0:
134 n = s[start:]
135 assert len(n) % 3 == 0, "%s is len %i" % (n, len(n))
136 if strict:
137 t = translate(n, self.table)
138 else:
139 # Use when missing stop codon,
140 t = "M" + translate(n[3:], self.table, to_stop=True)
141 yield start, n, t # Edited by CPT to be a generator
142
143 def break_up_frame(self, s):
144 """Returns offset, nuc, protein."""
145 start = 0
146 for match in self.re_stops.finditer(s, overlapped=True):
147 index = match.start() + 3
148 if index % 3 != 0:
149 continue
150 n = s[start:index]
151 for (offset, n, t) in self.start_chop_and_trans(n):
152 if n and len(t) >= self.min_len:
153 yield start + offset, n, t
154 start = index
155
156 def putative_genes_in_sequence(self, nuc_seq):
157 """Returns start, end, strand, nucleotides, protein.
158 Co-ordinates are Python style zero-based.
159 """
160 nuc_seq = nuc_seq.upper()
161 # TODO - Refactor to use a generator function (in start order)
162 # rather than making a list and sorting?
163 answer = []
164 full_len = len(nuc_seq)
165
166 for frame in range(0, 3):
167 for offset, n, t in self.break_up_frame(nuc_seq[frame:]):
168 start = frame + offset # zero based
169 answer.append((start, start + len(n), +1, n, t))
170
171 rc = reverse_complement(nuc_seq)
172 for frame in range(0, 3):
173 for offset, n, t in self.break_up_frame(rc[frame:]):
174 start = full_len - frame - offset # zero based
175 answer.append((start, start - len(n), -1, n, t))
176 answer.sort()
177 return answer
178
179 def get_all_peptides(self, nuc_seq):
180 """Returns start, end, strand, nucleotides, protein.
181
182 Co-ordinates are Python style zero-based.
183 """
184 # Refactored into generator by CPT
185 full_len = len(nuc_seq)
186 if self.strand != "reverse":
187 for frame in range(0, 3):
188 for offset, n, t in self.break_up_frame(nuc_seq[frame:]):
189 start = frame + offset # zero based
190 yield (start, start + len(n), +1, n, t)
191 if self.strand != "forward":
192 rc = reverse_complement(nuc_seq)
193 for frame in range(0, 3):
194 for offset, n, t in self.break_up_frame(rc[frame:]):
195 start = full_len - frame - offset # zero based
196 yield (start - len(n), start, -1, n, t)
197
198
199 class MGAFinder(object):
200 def __init__(self, table, ftype, ends, min_len):
201 self.table = table
202 self.table_obj = CodonTable.ambiguous_generic_by_id[table]
203 self.ends = ends
204 self.ftype = ftype
205 self.min_len = min_len
206 self.starts = sorted(self.table_obj.start_codons)
207 self.stops = sorted(self.table_obj.stop_codons)
208 self.re_starts = re.compile("|".join(self.starts))
209 self.re_stops = re.compile("|".join(self.stops))
210
211 def locate(self, fasta_file, out_nuc, out_prot, out_bed, out_gff3):
212 seq_format = "fasta"
213 log.debug("Genetic code table %i" % self.table)
214 log.debug("Minimum length %i aa" % self.min_len)
215
216 out_count = 0
217
218 out_gff3.write("##gff-version 3\n")
219
220 for idx, record in enumerate(SeqIO.parse(fasta_file, seq_format)):
221 for i, (f_start, f_end, f_strand, n, t) in enumerate(
222 self.get_all_peptides(str(record.seq).upper())
223 ):
224 out_count += 1
225
226 descr = "length %i aa, %i bp, from %s..%s[%s] of %s" % (
227 len(t),
228 len(n),
229 f_start,
230 f_end,
231 f_strand,
232 record.description,
233 )
234 fid = record.id + "|%s%i" % (self.ftype, i + 1)
235
236 r = SeqRecord(Seq(n), id=fid, name="", description=descr)
237 t = SeqRecord(Seq(t), id=fid, name="", description=descr)
238
239 SeqIO.write(r, out_nuc, "fasta")
240 SeqIO.write(t, out_prot, "fasta")
241
242 nice_strand = "+" if f_strand == +1 else "-"
243
244 out_bed.write(
245 "\t".join(
246 map(str, [record.id, f_start, f_end, fid, 0, nice_strand])
247 )
248 + "\n"
249 )
250
251 out_gff3.write(
252 "\t".join(
253 map(
254 str,
255 [
256 record.id,
257 "getOrfsOrCds",
258 "CDS",
259 f_start + 1,
260 f_end,
261 ".",
262 nice_strand,
263 0,
264 "ID=%s.%s.%s" % (self.ftype, idx, i + 1),
265 ],
266 )
267 )
268 + "\n"
269 )
270 log.info("Found %i %ss", out_count, self.ftype)
271
272 def start_chop_and_trans(self, s, strict=True):
273 """Returns offset, trimmed nuc, protein."""
274 if strict:
275 assert s[-3:] in self.stops, s
276 assert len(s) % 3 == 0
277 for match in self.re_starts.finditer(s, overlapped=True):
278 # Must check the start is in frame
279 start = match.start()
280 if start % 3 == 0:
281 n = s[start:]
282 assert len(n) % 3 == 0, "%s is len %i" % (n, len(n))
283 if strict:
284 t = translate(n, self.table)
285 else:
286 # Use when missing stop codon,
287 t = "M" + translate(n[3:], self.table, to_stop=True)
288 yield start, n, t
289
290 def break_up_frame(self, s):
291 """Returns offset, nuc, protein."""
292 start = 0
293 for match in self.re_stops.finditer(s, overlapped=True):
294 index = match.start() + 3
295 if index % 3 != 0:
296 continue
297 n = s[start:index]
298 for (offset, n, t) in self.start_chop_and_trans(n):
299 if n and len(t) >= self.min_len:
300 yield start + offset, n, t
301 start = index
302
303 def putative_genes_in_sequence(self, nuc_seq):
304 """Returns start, end, strand, nucleotides, protein.
305 Co-ordinates are Python style zero-based.
306 """
307 nuc_seq = nuc_seq.upper()
308 # TODO - Refactor to use a generator function (in start order)
309 # rather than making a list and sorting?
310 answer = []
311 full_len = len(nuc_seq)
312
313 for frame in range(0, 3):
314 for offset, n, t in self.break_up_frame(nuc_seq[frame:]):
315 start = frame + offset # zero based
316 answer.append((start, start + len(n), +1, n, t))
317
318 rc = reverse_complement(nuc_seq)
319 for frame in range(0, 3):
320 for offset, n, t in self.break_up_frame(rc[frame:]):
321 start = full_len - frame - offset # zero based
322 answer.append((start, start - len(n), -1, n, t))
323 answer.sort()
324 return answer
325
326 def get_all_peptides(self, nuc_seq):
327 """Returns start, end, strand, nucleotides, protein.
328
329 Co-ordinates are Python style zero-based.
330 """
331 # Refactored into generator by CPT
332
333 full_len = len(nuc_seq)
334 for frame in range(0, 3):
335 for offset, n, t in self.break_up_frame(nuc_seq[frame:]):
336 start = frame + offset # zero based
337 yield (start, start + len(n), +1, n, t)
338 rc = reverse_complement(nuc_seq)
339 for frame in range(0, 3):
340 for offset, n, t in self.break_up_frame(rc[frame:]):
341 start = full_len - frame - offset # zero based
342 yield (start - len(n), start, -1, n, t)