diff qpcr_search.py @ 0:1f4836da4a14 draft default tip

planemo upload for repository https://github.com/bvalot/galaxy commit d57c24d4b2c0c741d572af9ca0d09f8b82689640
author bvalot
date Tue, 14 Jun 2022 08:52:22 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qpcr_search.py	Tue Jun 14 08:52:22 2022 +0000
@@ -0,0 +1,361 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+"""Search primers/probe on database and return result"""
+
+import argparse
+import os
+import subprocess
+import sys
+import tempfile
+
+from Bio import SeqIO
+
+gasst_exe = "Gassst"
+nucleo = {
+    "R": ("A", "G"),
+    "Y": ("C", "T"),
+    "S": ("G", "C"),
+    "W": ("A", "T"),
+    "K": ("G", "T"),
+    "M": ("A", "C"),
+    "B": ("C", "G", "T"),
+    "D": ("A", "G", "T"),
+    "H": ("A", "C", "T"),
+    "V": ("A", "C", "G"),
+    "N": ("A", "T", "C", "G"),
+}
+
+desc = "Search primer/probe on database and return position and errors."
+command = argparse.ArgumentParser(
+    prog="primer_search.py",
+    description=desc,
+    usage="%(prog)s [options] forward probe reverse database",
+)
+command.add_argument(
+    "-o",
+    "--out",
+    nargs="?",
+    type=argparse.FileType("w"),
+    default=sys.stdout,
+    help="Return amplicon position on file, default=stdout",
+)
+command.add_argument(
+    "-d",
+    "--debug",
+    nargs="?",
+    type=argparse.FileType("w"),
+    default=None,
+    help="Conserve a copy of primer search, default No",
+)
+command.add_argument(
+    "-e",
+    "--error",
+    nargs="?",
+    type=int,
+    default=1,
+    help="Maximun error allowed on match, default=1",
+)
+command.add_argument(
+    "-m",
+    "--min",
+    nargs="?",
+    type=int,
+    default=50,
+    help="Min len amplicon size, default=50",
+)
+command.add_argument(
+    "-M",
+    "--max",
+    nargs="?",
+    type=int,
+    default=200,
+    help="Max len amplicon size, default=200",
+)
+command.add_argument(
+    "-t", "--taxo", action="store_true", help="Parse description to report species only"
+)
+command.add_argument("forward", type=str, help="Forward primer sequence")
+command.add_argument("probe", type=str, help="Probe sequence")
+command.add_argument("reverse", type=str, help="Reverse primer sequence")
+command.add_argument(
+    "database", type=argparse.FileType("r"), help="Database to search on fasta"
+)
+command.add_argument("-v", "--version", action="version", version="%(prog)s 0.3.0")
+
+
+class GassstResult:
+    """A simple Gassst class containing result
+    rev : True indicate if reverse primers, else forward
+    """
+
+    def __init__(self, line):
+        h = line.strip().split()
+        if len(h) != 9:
+            raise Exception("Line seems not correspond to gassst result\n" + line)
+        self.primer = h[0]
+        # self.rev = h[0] != "Forward"
+        self.match = h[1]
+        self.strand = int(h[2])
+        self.start = int(h[3])
+        self.error = int(h[5])
+        self.stop = self.start + len(h[7])
+
+    def __repr__(self):
+        return (
+            self.match
+            + " : "
+            + str(self.start)
+            + " in strand:"
+            + str(self.strand)
+            + " in primer: "
+            + self.primer
+            + " with error: "
+            + str(self.error)
+        )
+
+    def __eq__(self, other):
+        return (
+            isinstance(other, GassstResult)
+            and self.primer == other.primer
+            and self.match == other.match
+            and self.strand == other.strand
+            and self.start == other.start
+        )
+
+    def __ne__(self, other):
+        return not self == other
+
+    def __lt__(self, other):
+        if self != other:
+            raise Exception("Only compare equal GassstResult")
+        return self.error < other.error
+
+
+class Amplicon:
+    """A simple Amplicon class containing forward and reverse gassst result
+    Add probe including
+    """
+
+    def __init__(self, forw, reve):
+        if forw.strand == reve.strand:
+            raise Exception(
+                "Amplicon could be in inverse strand for reverse/forward primer\n"
+                + forw
+                + reve
+            )
+        self.forw = forw
+        self.prob = None
+        self.reve = reve
+        self.strand = forw.strand
+
+    def contain_probe(self, probe):
+        if (
+            min(probe.start, probe.stop) > self.get_start()
+            and max(probe.start, probe.stop) < self.get_stop()
+        ):
+            self.prob = probe
+            return True
+        return False
+
+    def get_start(self):
+        if self.strand == 0:
+            return self.forw.start
+        return self.reve.start
+
+    def get_stop(self):
+        if self.strand == 1:
+            return self.forw.stop
+        return self.reve.stop
+
+    def get_tab_data(self):
+        return map(
+            str,
+            [
+                self.get_start(),
+                self.get_stop(),
+                self.forw.error,
+                self.prob.error,
+                self.reve.error,
+            ],
+        )
+
+    def __len__(self):
+        return self.get_stop() - self.get_start()
+
+    def __repr__(self):
+        if self.prob is None:
+            return (
+                str(self.get_start())
+                + " : "
+                + str(self.get_stop())
+                + " in strand:"
+                + str(self.strand)
+                + "; error F:"
+                + str(self.forw.error)
+                + ",R:"
+                + str(self.reve.error)
+            )
+        return (
+            str(self.get_start())
+            + " : "
+            + str(self.get_stop())
+            + " in strand:"
+            + str(self.strand)
+            + "; error F:"
+            + str(self.forw.error)
+            + ",P:"
+            + str(self.prob.error)
+            + ",R:"
+            + str(self.reve.error)
+        )
+
+
+def degenerate_primer(seq):
+    """Iterate all possible sequence
+    from degenerate primer
+    """
+    notdegene = True
+    for i, n in enumerate(seq.upper()):
+        if n in nucleo:
+            notdegene = False
+            for mod in nucleo.get(n):
+                seq_cut = list(seq)
+                seq_cut[i] = mod
+                for seq2 in degenerate_primer("".join(seq_cut)):
+                    yield (seq2)
+            break
+    if notdegene:
+        yield (seq)
+
+
+def write_primer_fasta(forward_de, probe_de, reverse_de):
+    primer_fasta = tempfile.NamedTemporaryFile(delete=False, mode="w+t")
+    for forward in degenerate_primer(forward_de):
+        primer_fasta.write(">Forward\n" + forward + "\n")
+    for probe in degenerate_primer(probe_de):
+        primer_fasta.write(">Probe\n" + probe + "\n")
+    for reverse in degenerate_primer(reverse_de):
+        primer_fasta.write(">Reverse\n" + reverse + "\n")
+    primer_fasta.close()
+    return primer_fasta.name
+
+
+def performed_gassst(command):
+    proc = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=sys.stderr)
+    error = ""
+    for line in iter(proc.stderr.readline, b""):
+        error += line.decode()
+    if error != "":
+        sys.stdout.write("Error during processing Gassst\n")
+        raise Exception(error)
+
+
+def read_result(f_res, f_debug):
+    result = {}
+    for line in iter(f_res.readline, ""):
+        if line[0] == "F" or line[0] == "R" or line[0] == "P":
+            if f_debug is not None:
+                f_debug.write(line)
+            res = GassstResult(line)
+            other_res = result.setdefault(res.match, {}).setdefault(res.primer, [])
+            notfound = True
+            for i, res2 in enumerate(other_res):
+                if res == res2:
+                    notfound = False
+                    if res < res2:
+                        other_res[i] = res
+                        # print res
+                        break
+            if notfound:
+                other_res.append(res)
+    return result
+
+
+def get_amplicons(result):
+    """Function to get amplicon on a sequence from forward / reverse result"""
+    for forw in result["Forward"]:
+        for reve in result["Reverse"]:
+            if forw.strand != reve.strand:
+                amp = Amplicon(forw, reve)
+                if amp.get_start() < amp.get_stop():  # must be side by side
+                    for probe in result["Probe"]:
+                        if amp.contain_probe(probe):
+                            yield amp
+                            break
+                    del amp
+                else:
+                    del amp
+
+
+if __name__ == "__main__":
+    """Performed job on execution script"""
+    args = command.parse_args()
+    primer_fasta = write_primer_fasta(args.forward, args.probe, args.reverse)
+    output_result = tempfile.NamedTemporaryFile(delete=False, mode="w+")
+    identity = int(
+        100 - args.error / float(max(len(args.forward), len(args.reverse))) * 100
+    )
+
+    command = [
+        gasst_exe,
+        "-p",
+        str(identity),
+        "-g",
+        "0",
+        "-w",
+        "6",
+        "-h",
+        "0",
+        "-i",
+        primer_fasta,
+        "-d",
+        args.database.name,
+        "-o",
+        output_result.name,
+    ]
+    performed_gassst(command)
+    result = read_result(output_result, args.debug)
+
+    # load amplicons
+    sys.stderr.write("Search amplicons : ")
+    amplicons = {}
+    for seq in result.keys():
+        if len(result[seq]) == 3:
+            # print "Search amplicon on seq: " + seq
+            for amplicon in get_amplicons(result[seq]):
+                if len(amplicon) > args.min and len(amplicon) < args.max:
+                    amplicons.setdefault(seq, []).append(amplicon)
+                else:
+                    del amplicon
+            # print "Find amplicons : "  + str(len(amplicons[seq]))
+    sys.stderr.write(str(sum(map(len, amplicons.values()))) + "\n\n")
+
+    # print amplicons
+
+    sys.stderr.write("Write result : ")
+    sep = "\t"
+    args.out.write(
+        "Id\tAmpli number\tDescription\tStart\tStop\tError forward\tError probe\tError reverse\n"
+    )
+    for record in SeqIO.parse(args.database, "fasta"):
+        if record.id not in amplicons:
+            continue
+        for i, amplicon in enumerate(amplicons.get(record.id)):
+            desc = record.description.lstrip(record.id)
+            if args.taxo:
+                desc = record.description.split(";")[-1].strip()
+            args.out.write(
+                record.id
+                + sep
+                + str(i + 1)
+                + sep
+                + desc
+                + sep
+                + sep.join(amplicon.get_tab_data())
+                + "\n"
+            )
+    sys.stderr.write("OK\n")
+    # delete tempfile
+    os.unlink(primer_fasta)
+    os.unlink(output_result.name)