comparison qpcr_search.py @ 0:1f4836da4a14 draft default tip

planemo upload for repository https://github.com/bvalot/galaxy commit d57c24d4b2c0c741d572af9ca0d09f8b82689640
author bvalot
date Tue, 14 Jun 2022 08:52:22 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:1f4836da4a14
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 """Search primers/probe on database and return result"""
5
6 import argparse
7 import os
8 import subprocess
9 import sys
10 import tempfile
11
12 from Bio import SeqIO
13
14 gasst_exe = "Gassst"
15 nucleo = {
16 "R": ("A", "G"),
17 "Y": ("C", "T"),
18 "S": ("G", "C"),
19 "W": ("A", "T"),
20 "K": ("G", "T"),
21 "M": ("A", "C"),
22 "B": ("C", "G", "T"),
23 "D": ("A", "G", "T"),
24 "H": ("A", "C", "T"),
25 "V": ("A", "C", "G"),
26 "N": ("A", "T", "C", "G"),
27 }
28
29 desc = "Search primer/probe on database and return position and errors."
30 command = argparse.ArgumentParser(
31 prog="primer_search.py",
32 description=desc,
33 usage="%(prog)s [options] forward probe reverse database",
34 )
35 command.add_argument(
36 "-o",
37 "--out",
38 nargs="?",
39 type=argparse.FileType("w"),
40 default=sys.stdout,
41 help="Return amplicon position on file, default=stdout",
42 )
43 command.add_argument(
44 "-d",
45 "--debug",
46 nargs="?",
47 type=argparse.FileType("w"),
48 default=None,
49 help="Conserve a copy of primer search, default No",
50 )
51 command.add_argument(
52 "-e",
53 "--error",
54 nargs="?",
55 type=int,
56 default=1,
57 help="Maximun error allowed on match, default=1",
58 )
59 command.add_argument(
60 "-m",
61 "--min",
62 nargs="?",
63 type=int,
64 default=50,
65 help="Min len amplicon size, default=50",
66 )
67 command.add_argument(
68 "-M",
69 "--max",
70 nargs="?",
71 type=int,
72 default=200,
73 help="Max len amplicon size, default=200",
74 )
75 command.add_argument(
76 "-t", "--taxo", action="store_true", help="Parse description to report species only"
77 )
78 command.add_argument("forward", type=str, help="Forward primer sequence")
79 command.add_argument("probe", type=str, help="Probe sequence")
80 command.add_argument("reverse", type=str, help="Reverse primer sequence")
81 command.add_argument(
82 "database", type=argparse.FileType("r"), help="Database to search on fasta"
83 )
84 command.add_argument("-v", "--version", action="version", version="%(prog)s 0.3.0")
85
86
87 class GassstResult:
88 """A simple Gassst class containing result
89 rev : True indicate if reverse primers, else forward
90 """
91
92 def __init__(self, line):
93 h = line.strip().split()
94 if len(h) != 9:
95 raise Exception("Line seems not correspond to gassst result\n" + line)
96 self.primer = h[0]
97 # self.rev = h[0] != "Forward"
98 self.match = h[1]
99 self.strand = int(h[2])
100 self.start = int(h[3])
101 self.error = int(h[5])
102 self.stop = self.start + len(h[7])
103
104 def __repr__(self):
105 return (
106 self.match
107 + " : "
108 + str(self.start)
109 + " in strand:"
110 + str(self.strand)
111 + " in primer: "
112 + self.primer
113 + " with error: "
114 + str(self.error)
115 )
116
117 def __eq__(self, other):
118 return (
119 isinstance(other, GassstResult)
120 and self.primer == other.primer
121 and self.match == other.match
122 and self.strand == other.strand
123 and self.start == other.start
124 )
125
126 def __ne__(self, other):
127 return not self == other
128
129 def __lt__(self, other):
130 if self != other:
131 raise Exception("Only compare equal GassstResult")
132 return self.error < other.error
133
134
135 class Amplicon:
136 """A simple Amplicon class containing forward and reverse gassst result
137 Add probe including
138 """
139
140 def __init__(self, forw, reve):
141 if forw.strand == reve.strand:
142 raise Exception(
143 "Amplicon could be in inverse strand for reverse/forward primer\n"
144 + forw
145 + reve
146 )
147 self.forw = forw
148 self.prob = None
149 self.reve = reve
150 self.strand = forw.strand
151
152 def contain_probe(self, probe):
153 if (
154 min(probe.start, probe.stop) > self.get_start()
155 and max(probe.start, probe.stop) < self.get_stop()
156 ):
157 self.prob = probe
158 return True
159 return False
160
161 def get_start(self):
162 if self.strand == 0:
163 return self.forw.start
164 return self.reve.start
165
166 def get_stop(self):
167 if self.strand == 1:
168 return self.forw.stop
169 return self.reve.stop
170
171 def get_tab_data(self):
172 return map(
173 str,
174 [
175 self.get_start(),
176 self.get_stop(),
177 self.forw.error,
178 self.prob.error,
179 self.reve.error,
180 ],
181 )
182
183 def __len__(self):
184 return self.get_stop() - self.get_start()
185
186 def __repr__(self):
187 if self.prob is None:
188 return (
189 str(self.get_start())
190 + " : "
191 + str(self.get_stop())
192 + " in strand:"
193 + str(self.strand)
194 + "; error F:"
195 + str(self.forw.error)
196 + ",R:"
197 + str(self.reve.error)
198 )
199 return (
200 str(self.get_start())
201 + " : "
202 + str(self.get_stop())
203 + " in strand:"
204 + str(self.strand)
205 + "; error F:"
206 + str(self.forw.error)
207 + ",P:"
208 + str(self.prob.error)
209 + ",R:"
210 + str(self.reve.error)
211 )
212
213
214 def degenerate_primer(seq):
215 """Iterate all possible sequence
216 from degenerate primer
217 """
218 notdegene = True
219 for i, n in enumerate(seq.upper()):
220 if n in nucleo:
221 notdegene = False
222 for mod in nucleo.get(n):
223 seq_cut = list(seq)
224 seq_cut[i] = mod
225 for seq2 in degenerate_primer("".join(seq_cut)):
226 yield (seq2)
227 break
228 if notdegene:
229 yield (seq)
230
231
232 def write_primer_fasta(forward_de, probe_de, reverse_de):
233 primer_fasta = tempfile.NamedTemporaryFile(delete=False, mode="w+t")
234 for forward in degenerate_primer(forward_de):
235 primer_fasta.write(">Forward\n" + forward + "\n")
236 for probe in degenerate_primer(probe_de):
237 primer_fasta.write(">Probe\n" + probe + "\n")
238 for reverse in degenerate_primer(reverse_de):
239 primer_fasta.write(">Reverse\n" + reverse + "\n")
240 primer_fasta.close()
241 return primer_fasta.name
242
243
244 def performed_gassst(command):
245 proc = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=sys.stderr)
246 error = ""
247 for line in iter(proc.stderr.readline, b""):
248 error += line.decode()
249 if error != "":
250 sys.stdout.write("Error during processing Gassst\n")
251 raise Exception(error)
252
253
254 def read_result(f_res, f_debug):
255 result = {}
256 for line in iter(f_res.readline, ""):
257 if line[0] == "F" or line[0] == "R" or line[0] == "P":
258 if f_debug is not None:
259 f_debug.write(line)
260 res = GassstResult(line)
261 other_res = result.setdefault(res.match, {}).setdefault(res.primer, [])
262 notfound = True
263 for i, res2 in enumerate(other_res):
264 if res == res2:
265 notfound = False
266 if res < res2:
267 other_res[i] = res
268 # print res
269 break
270 if notfound:
271 other_res.append(res)
272 return result
273
274
275 def get_amplicons(result):
276 """Function to get amplicon on a sequence from forward / reverse result"""
277 for forw in result["Forward"]:
278 for reve in result["Reverse"]:
279 if forw.strand != reve.strand:
280 amp = Amplicon(forw, reve)
281 if amp.get_start() < amp.get_stop(): # must be side by side
282 for probe in result["Probe"]:
283 if amp.contain_probe(probe):
284 yield amp
285 break
286 del amp
287 else:
288 del amp
289
290
291 if __name__ == "__main__":
292 """Performed job on execution script"""
293 args = command.parse_args()
294 primer_fasta = write_primer_fasta(args.forward, args.probe, args.reverse)
295 output_result = tempfile.NamedTemporaryFile(delete=False, mode="w+")
296 identity = int(
297 100 - args.error / float(max(len(args.forward), len(args.reverse))) * 100
298 )
299
300 command = [
301 gasst_exe,
302 "-p",
303 str(identity),
304 "-g",
305 "0",
306 "-w",
307 "6",
308 "-h",
309 "0",
310 "-i",
311 primer_fasta,
312 "-d",
313 args.database.name,
314 "-o",
315 output_result.name,
316 ]
317 performed_gassst(command)
318 result = read_result(output_result, args.debug)
319
320 # load amplicons
321 sys.stderr.write("Search amplicons : ")
322 amplicons = {}
323 for seq in result.keys():
324 if len(result[seq]) == 3:
325 # print "Search amplicon on seq: " + seq
326 for amplicon in get_amplicons(result[seq]):
327 if len(amplicon) > args.min and len(amplicon) < args.max:
328 amplicons.setdefault(seq, []).append(amplicon)
329 else:
330 del amplicon
331 # print "Find amplicons : " + str(len(amplicons[seq]))
332 sys.stderr.write(str(sum(map(len, amplicons.values()))) + "\n\n")
333
334 # print amplicons
335
336 sys.stderr.write("Write result : ")
337 sep = "\t"
338 args.out.write(
339 "Id\tAmpli number\tDescription\tStart\tStop\tError forward\tError probe\tError reverse\n"
340 )
341 for record in SeqIO.parse(args.database, "fasta"):
342 if record.id not in amplicons:
343 continue
344 for i, amplicon in enumerate(amplicons.get(record.id)):
345 desc = record.description.lstrip(record.id)
346 if args.taxo:
347 desc = record.description.split(";")[-1].strip()
348 args.out.write(
349 record.id
350 + sep
351 + str(i + 1)
352 + sep
353 + desc
354 + sep
355 + sep.join(amplicon.get_tab_data())
356 + "\n"
357 )
358 sys.stderr.write("OK\n")
359 # delete tempfile
360 os.unlink(primer_fasta)
361 os.unlink(output_result.name)