comparison primer_search.py @ 0:1f4836da4a14 draft default tip

planemo upload for repository https://github.com/bvalot/galaxy commit d57c24d4b2c0c741d572af9ca0d09f8b82689640
author bvalot
date Tue, 14 Jun 2022 08:52:22 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:1f4836da4a14
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 """Search primers on database and return result"""
5
6 import argparse
7 import os
8 import subprocess
9 import sys
10 import tempfile
11
12 from Bio import SeqIO
13
14 gasst_exe = "Gassst"
15 nucleo = {
16 "R": ("A", "G"),
17 "Y": ("C", "T"),
18 "S": ("G", "C"),
19 "W": ("A", "T"),
20 "K": ("G", "T"),
21 "M": ("A", "C"),
22 "B": ("C", "G", "T"),
23 "D": ("A", "G", "T"),
24 "H": ("A", "C", "T"),
25 "V": ("A", "C", "G"),
26 "N": ("A", "T", "C", "G"),
27 }
28
29 desc = "Search primer on database and return amplicons."
30 command = argparse.ArgumentParser(
31 prog="primer_search.py",
32 description=desc,
33 usage="%(prog)s [options] forward reverse database",
34 )
35 command.add_argument(
36 "-o",
37 "--out",
38 nargs="?",
39 type=argparse.FileType("w"),
40 default=sys.stdout,
41 help="Return amplicon on fasta file, default=stdout",
42 )
43 command.add_argument(
44 "-d",
45 "--debug",
46 nargs="?",
47 type=argparse.FileType("w+"),
48 default=None,
49 help="Conserve a copy of primer search, default No",
50 )
51 command.add_argument(
52 "-e",
53 "--error",
54 nargs="?",
55 type=int,
56 default=1,
57 help="Maximun error allowed on match, default=1",
58 )
59 command.add_argument(
60 "-m",
61 "--min",
62 nargs="?",
63 type=int,
64 default=100,
65 help="Min len amplicon size, default=100",
66 )
67 command.add_argument(
68 "-M",
69 "--max",
70 nargs="?",
71 type=int,
72 default=1500,
73 help="Max len amplicon size, default=1500",
74 )
75 command.add_argument(
76 "-k",
77 "--keep",
78 action="store_true",
79 help="Keep description instead of report PCR position",
80 )
81 command.add_argument(
82 "-r", "--remove", action="store_true", help="Remove primer from reported amplicons"
83 )
84 command.add_argument("forward", type=str, help="Forward primer sequence")
85 command.add_argument("reverse", type=str, help="Reverse primer sequence")
86 command.add_argument(
87 "database", type=argparse.FileType("r"), help="Database to search on fasta"
88 )
89 command.add_argument("-v", "--version", action="version", version="%(prog)s 0.3.0")
90
91
92 class GassstResult:
93 """A simple Gassst class containing result
94 rev : True indicate if reverse primers, else forward
95 """
96
97 def __init__(self, line):
98 h = line.strip().split("\t")
99 if len(h) != 9:
100 raise Exception("Line seems not correspond to gassst result\n" + line)
101 self.primer = h[0]
102 self.rev = h[0] != "Forward"
103 self.match = h[1]
104 self.strand = int(h[2])
105 self.start = int(h[3])
106 self.error = int(h[5])
107 self.stop = self.start + len(h[7])
108
109 def __repr__(self):
110 return (
111 self.match
112 + " : "
113 + str(self.start)
114 + " in strand:"
115 + str(self.strand)
116 + " in primer: "
117 + self.primer
118 + " with error: "
119 + str(self.error)
120 )
121
122 def __eq__(self, other):
123 return (
124 isinstance(other, GassstResult)
125 and self.primer == other.primer
126 and self.rev == other.rev
127 and self.match == other.match
128 and self.strand == other.strand
129 and self.start == other.start
130 )
131
132 def __ne__(self, other):
133 return not self == other
134
135 def __lt__(self, other):
136 if self != other:
137 raise Exception("Only compare equal GassstResult")
138 return self.error < other.error
139
140
141 class Amplicon:
142 """A simple Amplicon class containing forward and reverse gassst result"""
143
144 def __init__(self, forw, reve):
145 if forw.strand == reve.strand:
146 raise Exception(
147 "Amplicon could be in inverse strand for reverse/forwar primer\n"
148 + forw
149 + reve
150 )
151 self.forw = forw
152 self.reve = reve
153 self.strand = forw.strand
154
155 def get_start(self):
156 if self.strand == 0:
157 return self.forw.start
158 return self.reve.start
159
160 def get_stop(self):
161 if self.strand == 1:
162 return self.forw.stop
163 return self.reve.stop
164
165 def __len__(self):
166 return self.get_stop() - self.get_start()
167
168 def __repr__(self):
169 return (
170 str(self.get_start())
171 + " : "
172 + str(self.get_stop())
173 + " in strand:"
174 + str(self.strand)
175 + "; error F:"
176 + str(self.forw.error)
177 + ",R:"
178 + str(self.reve.error)
179 )
180
181
182 def degenerate_primer(seq):
183 """Iterate all possible sequence
184 from degenerate primer
185 """
186 notdegene = True
187 for i, n in enumerate(seq.upper()):
188 if n in nucleo:
189 notdegene = False
190 for mod in nucleo.get(n):
191 seq_cut = list(seq)
192 seq_cut[i] = mod
193 for seq2 in degenerate_primer("".join(seq_cut)):
194 yield (seq2)
195 break
196 if notdegene:
197 yield (seq)
198
199
200 def write_primer_fasta(forward_de, reverse_de):
201 primer_fasta = tempfile.NamedTemporaryFile(delete=False, mode="w+t")
202 for forward in degenerate_primer(forward_de):
203 primer_fasta.write(">Forward\n" + forward + "\n")
204 for reverse in degenerate_primer(reverse_de):
205 primer_fasta.write(">Reverse\n" + reverse + "\n")
206 primer_fasta.close()
207 return primer_fasta.name
208
209
210 def performed_gassst(command):
211 proc = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=sys.stderr)
212 error = ""
213 for line in iter(proc.stderr.readline, b""):
214 error += line.decode()
215 if error != "":
216 sys.stdout.write("Error during processing Gassst\n")
217 raise Exception(error)
218
219
220 def read_result(f_res, f_debug):
221 result = {}
222 for line in iter(f_res.readline, ""):
223 if line[0] == "F" or line[0] == "R":
224 if f_debug is not None:
225 f_debug.write(line)
226 res = GassstResult(line)
227 other_res = result.setdefault(res.match, {}).setdefault(res.primer, [])
228 notfound = True
229 for i, res2 in enumerate(other_res):
230 if res == res2:
231 notfound = False
232 if res < res2:
233 other_res[i] = res
234 # print res
235 break
236 if notfound:
237 other_res.append(res)
238 return result
239
240
241 def get_amplicons(result):
242 """Function to get amplicon on a sequence from forward / reverse result"""
243 for forw in result["Forward"]:
244 for reve in result["Reverse"]:
245 if forw.strand != reve.strand:
246 amp = Amplicon(forw, reve)
247 if amp.get_start() < amp.get_stop(): # must be side by side
248 yield amp
249 else:
250 del amp
251
252
253 if __name__ == "__main__":
254 """Performed job on execution script"""
255 args = command.parse_args()
256 primer_fasta = write_primer_fasta(args.forward, args.reverse)
257 output_result = tempfile.NamedTemporaryFile(delete=False, mode="w+")
258 identity = int(
259 100 - args.error / float(max(len(args.forward), len(args.reverse))) * 100
260 )
261
262 command = [
263 gasst_exe,
264 "-p",
265 str(identity),
266 "-g",
267 "0",
268 "-w",
269 "6",
270 "-h",
271 "0",
272 "-i",
273 primer_fasta,
274 "-d",
275 args.database.name,
276 "-o",
277 output_result.name,
278 ]
279 performed_gassst(command)
280 result = read_result(output_result, args.debug)
281
282 # load amplicons
283 sys.stderr.write("Search amplicons : ")
284 amplicons = {}
285 for seq in result.keys():
286 if len(result[seq]) == 2:
287 # print "Search amplicon on seq: " + seq
288 for amplicon in get_amplicons(result[seq]):
289 if len(amplicon) > args.min and len(amplicon) < args.max:
290 amplicons.setdefault(seq, []).append(amplicon)
291 else:
292 del amplicon
293 # print "Find amplicons : " + str(len(amplicons[seq]))
294 sys.stderr.write(str(sum(map(len, amplicons.values()))) + "\n\n")
295
296 # print amplicons
297
298 sys.stderr.write("Write result : ")
299 for record in SeqIO.parse(args.database, "fasta"):
300 if record.id not in amplicons:
301 continue
302 for i, amplicon in enumerate(amplicons.get(record.id)):
303 sub = record[amplicon.get_start() - 1: amplicon.get_stop() - 1]
304 if amplicon.strand == 1:
305 sub = sub.reverse_complement()
306 if args.remove:
307 sub = sub[len(args.forward): len(sub) - len(args.reverse)]
308 if args.keep:
309 sub.id = record.id
310 sub.description = record.description
311 else:
312 sub.id = record.id + "_ampli_" + str(i)
313 sub.description = str(amplicon)
314 SeqIO.write(sub, args.out, "fasta")
315
316 sys.stderr.write("OK\n")
317 # delete tempfile
318 os.unlink(primer_fasta)
319 os.unlink(output_result.name)