Mercurial > repos > bvalot > in_sillico_pcr
comparison primer_search.py @ 0:1f4836da4a14 draft default tip
planemo upload for repository https://github.com/bvalot/galaxy commit d57c24d4b2c0c741d572af9ca0d09f8b82689640
| author | bvalot |
|---|---|
| date | Tue, 14 Jun 2022 08:52:22 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:1f4836da4a14 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 # -*- coding: utf-8 -*- | |
| 3 | |
| 4 """Search primers on database and return result""" | |
| 5 | |
| 6 import argparse | |
| 7 import os | |
| 8 import subprocess | |
| 9 import sys | |
| 10 import tempfile | |
| 11 | |
| 12 from Bio import SeqIO | |
| 13 | |
| 14 gasst_exe = "Gassst" | |
| 15 nucleo = { | |
| 16 "R": ("A", "G"), | |
| 17 "Y": ("C", "T"), | |
| 18 "S": ("G", "C"), | |
| 19 "W": ("A", "T"), | |
| 20 "K": ("G", "T"), | |
| 21 "M": ("A", "C"), | |
| 22 "B": ("C", "G", "T"), | |
| 23 "D": ("A", "G", "T"), | |
| 24 "H": ("A", "C", "T"), | |
| 25 "V": ("A", "C", "G"), | |
| 26 "N": ("A", "T", "C", "G"), | |
| 27 } | |
| 28 | |
| 29 desc = "Search primer on database and return amplicons." | |
| 30 command = argparse.ArgumentParser( | |
| 31 prog="primer_search.py", | |
| 32 description=desc, | |
| 33 usage="%(prog)s [options] forward reverse database", | |
| 34 ) | |
| 35 command.add_argument( | |
| 36 "-o", | |
| 37 "--out", | |
| 38 nargs="?", | |
| 39 type=argparse.FileType("w"), | |
| 40 default=sys.stdout, | |
| 41 help="Return amplicon on fasta file, default=stdout", | |
| 42 ) | |
| 43 command.add_argument( | |
| 44 "-d", | |
| 45 "--debug", | |
| 46 nargs="?", | |
| 47 type=argparse.FileType("w+"), | |
| 48 default=None, | |
| 49 help="Conserve a copy of primer search, default No", | |
| 50 ) | |
| 51 command.add_argument( | |
| 52 "-e", | |
| 53 "--error", | |
| 54 nargs="?", | |
| 55 type=int, | |
| 56 default=1, | |
| 57 help="Maximun error allowed on match, default=1", | |
| 58 ) | |
| 59 command.add_argument( | |
| 60 "-m", | |
| 61 "--min", | |
| 62 nargs="?", | |
| 63 type=int, | |
| 64 default=100, | |
| 65 help="Min len amplicon size, default=100", | |
| 66 ) | |
| 67 command.add_argument( | |
| 68 "-M", | |
| 69 "--max", | |
| 70 nargs="?", | |
| 71 type=int, | |
| 72 default=1500, | |
| 73 help="Max len amplicon size, default=1500", | |
| 74 ) | |
| 75 command.add_argument( | |
| 76 "-k", | |
| 77 "--keep", | |
| 78 action="store_true", | |
| 79 help="Keep description instead of report PCR position", | |
| 80 ) | |
| 81 command.add_argument( | |
| 82 "-r", "--remove", action="store_true", help="Remove primer from reported amplicons" | |
| 83 ) | |
| 84 command.add_argument("forward", type=str, help="Forward primer sequence") | |
| 85 command.add_argument("reverse", type=str, help="Reverse primer sequence") | |
| 86 command.add_argument( | |
| 87 "database", type=argparse.FileType("r"), help="Database to search on fasta" | |
| 88 ) | |
| 89 command.add_argument("-v", "--version", action="version", version="%(prog)s 0.3.0") | |
| 90 | |
| 91 | |
| 92 class GassstResult: | |
| 93 """A simple Gassst class containing result | |
| 94 rev : True indicate if reverse primers, else forward | |
| 95 """ | |
| 96 | |
| 97 def __init__(self, line): | |
| 98 h = line.strip().split("\t") | |
| 99 if len(h) != 9: | |
| 100 raise Exception("Line seems not correspond to gassst result\n" + line) | |
| 101 self.primer = h[0] | |
| 102 self.rev = h[0] != "Forward" | |
| 103 self.match = h[1] | |
| 104 self.strand = int(h[2]) | |
| 105 self.start = int(h[3]) | |
| 106 self.error = int(h[5]) | |
| 107 self.stop = self.start + len(h[7]) | |
| 108 | |
| 109 def __repr__(self): | |
| 110 return ( | |
| 111 self.match | |
| 112 + " : " | |
| 113 + str(self.start) | |
| 114 + " in strand:" | |
| 115 + str(self.strand) | |
| 116 + " in primer: " | |
| 117 + self.primer | |
| 118 + " with error: " | |
| 119 + str(self.error) | |
| 120 ) | |
| 121 | |
| 122 def __eq__(self, other): | |
| 123 return ( | |
| 124 isinstance(other, GassstResult) | |
| 125 and self.primer == other.primer | |
| 126 and self.rev == other.rev | |
| 127 and self.match == other.match | |
| 128 and self.strand == other.strand | |
| 129 and self.start == other.start | |
| 130 ) | |
| 131 | |
| 132 def __ne__(self, other): | |
| 133 return not self == other | |
| 134 | |
| 135 def __lt__(self, other): | |
| 136 if self != other: | |
| 137 raise Exception("Only compare equal GassstResult") | |
| 138 return self.error < other.error | |
| 139 | |
| 140 | |
| 141 class Amplicon: | |
| 142 """A simple Amplicon class containing forward and reverse gassst result""" | |
| 143 | |
| 144 def __init__(self, forw, reve): | |
| 145 if forw.strand == reve.strand: | |
| 146 raise Exception( | |
| 147 "Amplicon could be in inverse strand for reverse/forwar primer\n" | |
| 148 + forw | |
| 149 + reve | |
| 150 ) | |
| 151 self.forw = forw | |
| 152 self.reve = reve | |
| 153 self.strand = forw.strand | |
| 154 | |
| 155 def get_start(self): | |
| 156 if self.strand == 0: | |
| 157 return self.forw.start | |
| 158 return self.reve.start | |
| 159 | |
| 160 def get_stop(self): | |
| 161 if self.strand == 1: | |
| 162 return self.forw.stop | |
| 163 return self.reve.stop | |
| 164 | |
| 165 def __len__(self): | |
| 166 return self.get_stop() - self.get_start() | |
| 167 | |
| 168 def __repr__(self): | |
| 169 return ( | |
| 170 str(self.get_start()) | |
| 171 + " : " | |
| 172 + str(self.get_stop()) | |
| 173 + " in strand:" | |
| 174 + str(self.strand) | |
| 175 + "; error F:" | |
| 176 + str(self.forw.error) | |
| 177 + ",R:" | |
| 178 + str(self.reve.error) | |
| 179 ) | |
| 180 | |
| 181 | |
| 182 def degenerate_primer(seq): | |
| 183 """Iterate all possible sequence | |
| 184 from degenerate primer | |
| 185 """ | |
| 186 notdegene = True | |
| 187 for i, n in enumerate(seq.upper()): | |
| 188 if n in nucleo: | |
| 189 notdegene = False | |
| 190 for mod in nucleo.get(n): | |
| 191 seq_cut = list(seq) | |
| 192 seq_cut[i] = mod | |
| 193 for seq2 in degenerate_primer("".join(seq_cut)): | |
| 194 yield (seq2) | |
| 195 break | |
| 196 if notdegene: | |
| 197 yield (seq) | |
| 198 | |
| 199 | |
| 200 def write_primer_fasta(forward_de, reverse_de): | |
| 201 primer_fasta = tempfile.NamedTemporaryFile(delete=False, mode="w+t") | |
| 202 for forward in degenerate_primer(forward_de): | |
| 203 primer_fasta.write(">Forward\n" + forward + "\n") | |
| 204 for reverse in degenerate_primer(reverse_de): | |
| 205 primer_fasta.write(">Reverse\n" + reverse + "\n") | |
| 206 primer_fasta.close() | |
| 207 return primer_fasta.name | |
| 208 | |
| 209 | |
| 210 def performed_gassst(command): | |
| 211 proc = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=sys.stderr) | |
| 212 error = "" | |
| 213 for line in iter(proc.stderr.readline, b""): | |
| 214 error += line.decode() | |
| 215 if error != "": | |
| 216 sys.stdout.write("Error during processing Gassst\n") | |
| 217 raise Exception(error) | |
| 218 | |
| 219 | |
| 220 def read_result(f_res, f_debug): | |
| 221 result = {} | |
| 222 for line in iter(f_res.readline, ""): | |
| 223 if line[0] == "F" or line[0] == "R": | |
| 224 if f_debug is not None: | |
| 225 f_debug.write(line) | |
| 226 res = GassstResult(line) | |
| 227 other_res = result.setdefault(res.match, {}).setdefault(res.primer, []) | |
| 228 notfound = True | |
| 229 for i, res2 in enumerate(other_res): | |
| 230 if res == res2: | |
| 231 notfound = False | |
| 232 if res < res2: | |
| 233 other_res[i] = res | |
| 234 # print res | |
| 235 break | |
| 236 if notfound: | |
| 237 other_res.append(res) | |
| 238 return result | |
| 239 | |
| 240 | |
| 241 def get_amplicons(result): | |
| 242 """Function to get amplicon on a sequence from forward / reverse result""" | |
| 243 for forw in result["Forward"]: | |
| 244 for reve in result["Reverse"]: | |
| 245 if forw.strand != reve.strand: | |
| 246 amp = Amplicon(forw, reve) | |
| 247 if amp.get_start() < amp.get_stop(): # must be side by side | |
| 248 yield amp | |
| 249 else: | |
| 250 del amp | |
| 251 | |
| 252 | |
| 253 if __name__ == "__main__": | |
| 254 """Performed job on execution script""" | |
| 255 args = command.parse_args() | |
| 256 primer_fasta = write_primer_fasta(args.forward, args.reverse) | |
| 257 output_result = tempfile.NamedTemporaryFile(delete=False, mode="w+") | |
| 258 identity = int( | |
| 259 100 - args.error / float(max(len(args.forward), len(args.reverse))) * 100 | |
| 260 ) | |
| 261 | |
| 262 command = [ | |
| 263 gasst_exe, | |
| 264 "-p", | |
| 265 str(identity), | |
| 266 "-g", | |
| 267 "0", | |
| 268 "-w", | |
| 269 "6", | |
| 270 "-h", | |
| 271 "0", | |
| 272 "-i", | |
| 273 primer_fasta, | |
| 274 "-d", | |
| 275 args.database.name, | |
| 276 "-o", | |
| 277 output_result.name, | |
| 278 ] | |
| 279 performed_gassst(command) | |
| 280 result = read_result(output_result, args.debug) | |
| 281 | |
| 282 # load amplicons | |
| 283 sys.stderr.write("Search amplicons : ") | |
| 284 amplicons = {} | |
| 285 for seq in result.keys(): | |
| 286 if len(result[seq]) == 2: | |
| 287 # print "Search amplicon on seq: " + seq | |
| 288 for amplicon in get_amplicons(result[seq]): | |
| 289 if len(amplicon) > args.min and len(amplicon) < args.max: | |
| 290 amplicons.setdefault(seq, []).append(amplicon) | |
| 291 else: | |
| 292 del amplicon | |
| 293 # print "Find amplicons : " + str(len(amplicons[seq])) | |
| 294 sys.stderr.write(str(sum(map(len, amplicons.values()))) + "\n\n") | |
| 295 | |
| 296 # print amplicons | |
| 297 | |
| 298 sys.stderr.write("Write result : ") | |
| 299 for record in SeqIO.parse(args.database, "fasta"): | |
| 300 if record.id not in amplicons: | |
| 301 continue | |
| 302 for i, amplicon in enumerate(amplicons.get(record.id)): | |
| 303 sub = record[amplicon.get_start() - 1: amplicon.get_stop() - 1] | |
| 304 if amplicon.strand == 1: | |
| 305 sub = sub.reverse_complement() | |
| 306 if args.remove: | |
| 307 sub = sub[len(args.forward): len(sub) - len(args.reverse)] | |
| 308 if args.keep: | |
| 309 sub.id = record.id | |
| 310 sub.description = record.description | |
| 311 else: | |
| 312 sub.id = record.id + "_ampli_" + str(i) | |
| 313 sub.description = str(amplicon) | |
| 314 SeqIO.write(sub, args.out, "fasta") | |
| 315 | |
| 316 sys.stderr.write("OK\n") | |
| 317 # delete tempfile | |
| 318 os.unlink(primer_fasta) | |
| 319 os.unlink(output_result.name) |
