Mercurial > repos > bvalot > in_sillico_pcr
comparison qpcr_search.py @ 0:1f4836da4a14 draft default tip
planemo upload for repository https://github.com/bvalot/galaxy commit d57c24d4b2c0c741d572af9ca0d09f8b82689640
author | bvalot |
---|---|
date | Tue, 14 Jun 2022 08:52:22 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:1f4836da4a14 |
---|---|
1 #!/usr/bin/env python3 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 """Search primers/probe on database and return result""" | |
5 | |
6 import argparse | |
7 import os | |
8 import subprocess | |
9 import sys | |
10 import tempfile | |
11 | |
12 from Bio import SeqIO | |
13 | |
14 gasst_exe = "Gassst" | |
15 nucleo = { | |
16 "R": ("A", "G"), | |
17 "Y": ("C", "T"), | |
18 "S": ("G", "C"), | |
19 "W": ("A", "T"), | |
20 "K": ("G", "T"), | |
21 "M": ("A", "C"), | |
22 "B": ("C", "G", "T"), | |
23 "D": ("A", "G", "T"), | |
24 "H": ("A", "C", "T"), | |
25 "V": ("A", "C", "G"), | |
26 "N": ("A", "T", "C", "G"), | |
27 } | |
28 | |
29 desc = "Search primer/probe on database and return position and errors." | |
30 command = argparse.ArgumentParser( | |
31 prog="primer_search.py", | |
32 description=desc, | |
33 usage="%(prog)s [options] forward probe reverse database", | |
34 ) | |
35 command.add_argument( | |
36 "-o", | |
37 "--out", | |
38 nargs="?", | |
39 type=argparse.FileType("w"), | |
40 default=sys.stdout, | |
41 help="Return amplicon position on file, default=stdout", | |
42 ) | |
43 command.add_argument( | |
44 "-d", | |
45 "--debug", | |
46 nargs="?", | |
47 type=argparse.FileType("w"), | |
48 default=None, | |
49 help="Conserve a copy of primer search, default No", | |
50 ) | |
51 command.add_argument( | |
52 "-e", | |
53 "--error", | |
54 nargs="?", | |
55 type=int, | |
56 default=1, | |
57 help="Maximun error allowed on match, default=1", | |
58 ) | |
59 command.add_argument( | |
60 "-m", | |
61 "--min", | |
62 nargs="?", | |
63 type=int, | |
64 default=50, | |
65 help="Min len amplicon size, default=50", | |
66 ) | |
67 command.add_argument( | |
68 "-M", | |
69 "--max", | |
70 nargs="?", | |
71 type=int, | |
72 default=200, | |
73 help="Max len amplicon size, default=200", | |
74 ) | |
75 command.add_argument( | |
76 "-t", "--taxo", action="store_true", help="Parse description to report species only" | |
77 ) | |
78 command.add_argument("forward", type=str, help="Forward primer sequence") | |
79 command.add_argument("probe", type=str, help="Probe sequence") | |
80 command.add_argument("reverse", type=str, help="Reverse primer sequence") | |
81 command.add_argument( | |
82 "database", type=argparse.FileType("r"), help="Database to search on fasta" | |
83 ) | |
84 command.add_argument("-v", "--version", action="version", version="%(prog)s 0.3.0") | |
85 | |
86 | |
87 class GassstResult: | |
88 """A simple Gassst class containing result | |
89 rev : True indicate if reverse primers, else forward | |
90 """ | |
91 | |
92 def __init__(self, line): | |
93 h = line.strip().split() | |
94 if len(h) != 9: | |
95 raise Exception("Line seems not correspond to gassst result\n" + line) | |
96 self.primer = h[0] | |
97 # self.rev = h[0] != "Forward" | |
98 self.match = h[1] | |
99 self.strand = int(h[2]) | |
100 self.start = int(h[3]) | |
101 self.error = int(h[5]) | |
102 self.stop = self.start + len(h[7]) | |
103 | |
104 def __repr__(self): | |
105 return ( | |
106 self.match | |
107 + " : " | |
108 + str(self.start) | |
109 + " in strand:" | |
110 + str(self.strand) | |
111 + " in primer: " | |
112 + self.primer | |
113 + " with error: " | |
114 + str(self.error) | |
115 ) | |
116 | |
117 def __eq__(self, other): | |
118 return ( | |
119 isinstance(other, GassstResult) | |
120 and self.primer == other.primer | |
121 and self.match == other.match | |
122 and self.strand == other.strand | |
123 and self.start == other.start | |
124 ) | |
125 | |
126 def __ne__(self, other): | |
127 return not self == other | |
128 | |
129 def __lt__(self, other): | |
130 if self != other: | |
131 raise Exception("Only compare equal GassstResult") | |
132 return self.error < other.error | |
133 | |
134 | |
135 class Amplicon: | |
136 """A simple Amplicon class containing forward and reverse gassst result | |
137 Add probe including | |
138 """ | |
139 | |
140 def __init__(self, forw, reve): | |
141 if forw.strand == reve.strand: | |
142 raise Exception( | |
143 "Amplicon could be in inverse strand for reverse/forward primer\n" | |
144 + forw | |
145 + reve | |
146 ) | |
147 self.forw = forw | |
148 self.prob = None | |
149 self.reve = reve | |
150 self.strand = forw.strand | |
151 | |
152 def contain_probe(self, probe): | |
153 if ( | |
154 min(probe.start, probe.stop) > self.get_start() | |
155 and max(probe.start, probe.stop) < self.get_stop() | |
156 ): | |
157 self.prob = probe | |
158 return True | |
159 return False | |
160 | |
161 def get_start(self): | |
162 if self.strand == 0: | |
163 return self.forw.start | |
164 return self.reve.start | |
165 | |
166 def get_stop(self): | |
167 if self.strand == 1: | |
168 return self.forw.stop | |
169 return self.reve.stop | |
170 | |
171 def get_tab_data(self): | |
172 return map( | |
173 str, | |
174 [ | |
175 self.get_start(), | |
176 self.get_stop(), | |
177 self.forw.error, | |
178 self.prob.error, | |
179 self.reve.error, | |
180 ], | |
181 ) | |
182 | |
183 def __len__(self): | |
184 return self.get_stop() - self.get_start() | |
185 | |
186 def __repr__(self): | |
187 if self.prob is None: | |
188 return ( | |
189 str(self.get_start()) | |
190 + " : " | |
191 + str(self.get_stop()) | |
192 + " in strand:" | |
193 + str(self.strand) | |
194 + "; error F:" | |
195 + str(self.forw.error) | |
196 + ",R:" | |
197 + str(self.reve.error) | |
198 ) | |
199 return ( | |
200 str(self.get_start()) | |
201 + " : " | |
202 + str(self.get_stop()) | |
203 + " in strand:" | |
204 + str(self.strand) | |
205 + "; error F:" | |
206 + str(self.forw.error) | |
207 + ",P:" | |
208 + str(self.prob.error) | |
209 + ",R:" | |
210 + str(self.reve.error) | |
211 ) | |
212 | |
213 | |
214 def degenerate_primer(seq): | |
215 """Iterate all possible sequence | |
216 from degenerate primer | |
217 """ | |
218 notdegene = True | |
219 for i, n in enumerate(seq.upper()): | |
220 if n in nucleo: | |
221 notdegene = False | |
222 for mod in nucleo.get(n): | |
223 seq_cut = list(seq) | |
224 seq_cut[i] = mod | |
225 for seq2 in degenerate_primer("".join(seq_cut)): | |
226 yield (seq2) | |
227 break | |
228 if notdegene: | |
229 yield (seq) | |
230 | |
231 | |
232 def write_primer_fasta(forward_de, probe_de, reverse_de): | |
233 primer_fasta = tempfile.NamedTemporaryFile(delete=False, mode="w+t") | |
234 for forward in degenerate_primer(forward_de): | |
235 primer_fasta.write(">Forward\n" + forward + "\n") | |
236 for probe in degenerate_primer(probe_de): | |
237 primer_fasta.write(">Probe\n" + probe + "\n") | |
238 for reverse in degenerate_primer(reverse_de): | |
239 primer_fasta.write(">Reverse\n" + reverse + "\n") | |
240 primer_fasta.close() | |
241 return primer_fasta.name | |
242 | |
243 | |
244 def performed_gassst(command): | |
245 proc = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=sys.stderr) | |
246 error = "" | |
247 for line in iter(proc.stderr.readline, b""): | |
248 error += line.decode() | |
249 if error != "": | |
250 sys.stdout.write("Error during processing Gassst\n") | |
251 raise Exception(error) | |
252 | |
253 | |
254 def read_result(f_res, f_debug): | |
255 result = {} | |
256 for line in iter(f_res.readline, ""): | |
257 if line[0] == "F" or line[0] == "R" or line[0] == "P": | |
258 if f_debug is not None: | |
259 f_debug.write(line) | |
260 res = GassstResult(line) | |
261 other_res = result.setdefault(res.match, {}).setdefault(res.primer, []) | |
262 notfound = True | |
263 for i, res2 in enumerate(other_res): | |
264 if res == res2: | |
265 notfound = False | |
266 if res < res2: | |
267 other_res[i] = res | |
268 # print res | |
269 break | |
270 if notfound: | |
271 other_res.append(res) | |
272 return result | |
273 | |
274 | |
275 def get_amplicons(result): | |
276 """Function to get amplicon on a sequence from forward / reverse result""" | |
277 for forw in result["Forward"]: | |
278 for reve in result["Reverse"]: | |
279 if forw.strand != reve.strand: | |
280 amp = Amplicon(forw, reve) | |
281 if amp.get_start() < amp.get_stop(): # must be side by side | |
282 for probe in result["Probe"]: | |
283 if amp.contain_probe(probe): | |
284 yield amp | |
285 break | |
286 del amp | |
287 else: | |
288 del amp | |
289 | |
290 | |
291 if __name__ == "__main__": | |
292 """Performed job on execution script""" | |
293 args = command.parse_args() | |
294 primer_fasta = write_primer_fasta(args.forward, args.probe, args.reverse) | |
295 output_result = tempfile.NamedTemporaryFile(delete=False, mode="w+") | |
296 identity = int( | |
297 100 - args.error / float(max(len(args.forward), len(args.reverse))) * 100 | |
298 ) | |
299 | |
300 command = [ | |
301 gasst_exe, | |
302 "-p", | |
303 str(identity), | |
304 "-g", | |
305 "0", | |
306 "-w", | |
307 "6", | |
308 "-h", | |
309 "0", | |
310 "-i", | |
311 primer_fasta, | |
312 "-d", | |
313 args.database.name, | |
314 "-o", | |
315 output_result.name, | |
316 ] | |
317 performed_gassst(command) | |
318 result = read_result(output_result, args.debug) | |
319 | |
320 # load amplicons | |
321 sys.stderr.write("Search amplicons : ") | |
322 amplicons = {} | |
323 for seq in result.keys(): | |
324 if len(result[seq]) == 3: | |
325 # print "Search amplicon on seq: " + seq | |
326 for amplicon in get_amplicons(result[seq]): | |
327 if len(amplicon) > args.min and len(amplicon) < args.max: | |
328 amplicons.setdefault(seq, []).append(amplicon) | |
329 else: | |
330 del amplicon | |
331 # print "Find amplicons : " + str(len(amplicons[seq])) | |
332 sys.stderr.write(str(sum(map(len, amplicons.values()))) + "\n\n") | |
333 | |
334 # print amplicons | |
335 | |
336 sys.stderr.write("Write result : ") | |
337 sep = "\t" | |
338 args.out.write( | |
339 "Id\tAmpli number\tDescription\tStart\tStop\tError forward\tError probe\tError reverse\n" | |
340 ) | |
341 for record in SeqIO.parse(args.database, "fasta"): | |
342 if record.id not in amplicons: | |
343 continue | |
344 for i, amplicon in enumerate(amplicons.get(record.id)): | |
345 desc = record.description.lstrip(record.id) | |
346 if args.taxo: | |
347 desc = record.description.split(";")[-1].strip() | |
348 args.out.write( | |
349 record.id | |
350 + sep | |
351 + str(i + 1) | |
352 + sep | |
353 + desc | |
354 + sep | |
355 + sep.join(amplicon.get_tab_data()) | |
356 + "\n" | |
357 ) | |
358 sys.stderr.write("OK\n") | |
359 # delete tempfile | |
360 os.unlink(primer_fasta) | |
361 os.unlink(output_result.name) |