Mercurial > repos > bvalot > in_sillico_pcr
view qpcr_search.py @ 0:1f4836da4a14 draft default tip
planemo upload for repository https://github.com/bvalot/galaxy commit d57c24d4b2c0c741d572af9ca0d09f8b82689640
author | bvalot |
---|---|
date | Tue, 14 Jun 2022 08:52:22 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Search primers/probe on database and return result""" import argparse import os import subprocess import sys import tempfile from Bio import SeqIO gasst_exe = "Gassst" nucleo = { "R": ("A", "G"), "Y": ("C", "T"), "S": ("G", "C"), "W": ("A", "T"), "K": ("G", "T"), "M": ("A", "C"), "B": ("C", "G", "T"), "D": ("A", "G", "T"), "H": ("A", "C", "T"), "V": ("A", "C", "G"), "N": ("A", "T", "C", "G"), } desc = "Search primer/probe on database and return position and errors." command = argparse.ArgumentParser( prog="primer_search.py", description=desc, usage="%(prog)s [options] forward probe reverse database", ) command.add_argument( "-o", "--out", nargs="?", type=argparse.FileType("w"), default=sys.stdout, help="Return amplicon position on file, default=stdout", ) command.add_argument( "-d", "--debug", nargs="?", type=argparse.FileType("w"), default=None, help="Conserve a copy of primer search, default No", ) command.add_argument( "-e", "--error", nargs="?", type=int, default=1, help="Maximun error allowed on match, default=1", ) command.add_argument( "-m", "--min", nargs="?", type=int, default=50, help="Min len amplicon size, default=50", ) command.add_argument( "-M", "--max", nargs="?", type=int, default=200, help="Max len amplicon size, default=200", ) command.add_argument( "-t", "--taxo", action="store_true", help="Parse description to report species only" ) command.add_argument("forward", type=str, help="Forward primer sequence") command.add_argument("probe", type=str, help="Probe sequence") command.add_argument("reverse", type=str, help="Reverse primer sequence") command.add_argument( "database", type=argparse.FileType("r"), help="Database to search on fasta" ) command.add_argument("-v", "--version", action="version", version="%(prog)s 0.3.0") class GassstResult: """A simple Gassst class containing result rev : True indicate if reverse primers, else forward """ def __init__(self, line): h = line.strip().split() if len(h) != 9: raise Exception("Line seems not correspond to gassst result\n" + line) self.primer = h[0] # self.rev = h[0] != "Forward" self.match = h[1] self.strand = int(h[2]) self.start = int(h[3]) self.error = int(h[5]) self.stop = self.start + len(h[7]) def __repr__(self): return ( self.match + " : " + str(self.start) + " in strand:" + str(self.strand) + " in primer: " + self.primer + " with error: " + str(self.error) ) def __eq__(self, other): return ( isinstance(other, GassstResult) and self.primer == other.primer and self.match == other.match and self.strand == other.strand and self.start == other.start ) def __ne__(self, other): return not self == other def __lt__(self, other): if self != other: raise Exception("Only compare equal GassstResult") return self.error < other.error class Amplicon: """A simple Amplicon class containing forward and reverse gassst result Add probe including """ def __init__(self, forw, reve): if forw.strand == reve.strand: raise Exception( "Amplicon could be in inverse strand for reverse/forward primer\n" + forw + reve ) self.forw = forw self.prob = None self.reve = reve self.strand = forw.strand def contain_probe(self, probe): if ( min(probe.start, probe.stop) > self.get_start() and max(probe.start, probe.stop) < self.get_stop() ): self.prob = probe return True return False def get_start(self): if self.strand == 0: return self.forw.start return self.reve.start def get_stop(self): if self.strand == 1: return self.forw.stop return self.reve.stop def get_tab_data(self): return map( str, [ self.get_start(), self.get_stop(), self.forw.error, self.prob.error, self.reve.error, ], ) def __len__(self): return self.get_stop() - self.get_start() def __repr__(self): if self.prob is None: return ( str(self.get_start()) + " : " + str(self.get_stop()) + " in strand:" + str(self.strand) + "; error F:" + str(self.forw.error) + ",R:" + str(self.reve.error) ) return ( str(self.get_start()) + " : " + str(self.get_stop()) + " in strand:" + str(self.strand) + "; error F:" + str(self.forw.error) + ",P:" + str(self.prob.error) + ",R:" + str(self.reve.error) ) def degenerate_primer(seq): """Iterate all possible sequence from degenerate primer """ notdegene = True for i, n in enumerate(seq.upper()): if n in nucleo: notdegene = False for mod in nucleo.get(n): seq_cut = list(seq) seq_cut[i] = mod for seq2 in degenerate_primer("".join(seq_cut)): yield (seq2) break if notdegene: yield (seq) def write_primer_fasta(forward_de, probe_de, reverse_de): primer_fasta = tempfile.NamedTemporaryFile(delete=False, mode="w+t") for forward in degenerate_primer(forward_de): primer_fasta.write(">Forward\n" + forward + "\n") for probe in degenerate_primer(probe_de): primer_fasta.write(">Probe\n" + probe + "\n") for reverse in degenerate_primer(reverse_de): primer_fasta.write(">Reverse\n" + reverse + "\n") primer_fasta.close() return primer_fasta.name def performed_gassst(command): proc = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=sys.stderr) error = "" for line in iter(proc.stderr.readline, b""): error += line.decode() if error != "": sys.stdout.write("Error during processing Gassst\n") raise Exception(error) def read_result(f_res, f_debug): result = {} for line in iter(f_res.readline, ""): if line[0] == "F" or line[0] == "R" or line[0] == "P": if f_debug is not None: f_debug.write(line) res = GassstResult(line) other_res = result.setdefault(res.match, {}).setdefault(res.primer, []) notfound = True for i, res2 in enumerate(other_res): if res == res2: notfound = False if res < res2: other_res[i] = res # print res break if notfound: other_res.append(res) return result def get_amplicons(result): """Function to get amplicon on a sequence from forward / reverse result""" for forw in result["Forward"]: for reve in result["Reverse"]: if forw.strand != reve.strand: amp = Amplicon(forw, reve) if amp.get_start() < amp.get_stop(): # must be side by side for probe in result["Probe"]: if amp.contain_probe(probe): yield amp break del amp else: del amp if __name__ == "__main__": """Performed job on execution script""" args = command.parse_args() primer_fasta = write_primer_fasta(args.forward, args.probe, args.reverse) output_result = tempfile.NamedTemporaryFile(delete=False, mode="w+") identity = int( 100 - args.error / float(max(len(args.forward), len(args.reverse))) * 100 ) command = [ gasst_exe, "-p", str(identity), "-g", "0", "-w", "6", "-h", "0", "-i", primer_fasta, "-d", args.database.name, "-o", output_result.name, ] performed_gassst(command) result = read_result(output_result, args.debug) # load amplicons sys.stderr.write("Search amplicons : ") amplicons = {} for seq in result.keys(): if len(result[seq]) == 3: # print "Search amplicon on seq: " + seq for amplicon in get_amplicons(result[seq]): if len(amplicon) > args.min and len(amplicon) < args.max: amplicons.setdefault(seq, []).append(amplicon) else: del amplicon # print "Find amplicons : " + str(len(amplicons[seq])) sys.stderr.write(str(sum(map(len, amplicons.values()))) + "\n\n") # print amplicons sys.stderr.write("Write result : ") sep = "\t" args.out.write( "Id\tAmpli number\tDescription\tStart\tStop\tError forward\tError probe\tError reverse\n" ) for record in SeqIO.parse(args.database, "fasta"): if record.id not in amplicons: continue for i, amplicon in enumerate(amplicons.get(record.id)): desc = record.description.lstrip(record.id) if args.taxo: desc = record.description.split(";")[-1].strip() args.out.write( record.id + sep + str(i + 1) + sep + desc + sep + sep.join(amplicon.get_tab_data()) + "\n" ) sys.stderr.write("OK\n") # delete tempfile os.unlink(primer_fasta) os.unlink(output_result.name)