view prophage_relatedness.py @ 1:7ba8b1f0fdf0 draft default tip

planemo upload commit f8aa1f8dc7acaa1187d197da50a3eb21ad4b6dc9-dirty
author cpt
date Sun, 11 Aug 2024 22:26:57 +0000
parents 7a23dda2e932
children
line wrap: on
line source

#!/usr/bin/env python
import argparse
from math import floor
from Bio.Blast import NCBIXML
import logging

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger()


def parseXML(blastxml, outFile):  # Modified from intron_detection
    blast = []
    for iter_num, blast_record in enumerate(NCBIXML.parse(blastxml), 1):
        align_num = 0
        outFile.write("Query ID\tQuery Length\tTotal Number of Hits\n")
        outFile.write(
            "%s\t%d\t%d\n\n"
            % (
                blast_record.query_id,
                blast_record.query_length,
                len(blast_record.alignments),
            )
        )

        for alignment in blast_record.alignments:

            align_num += 1
            gi_nos = str(alignment.accession)
            blast_gene = []
            for hsp in alignment.hsps:

                x = float(hsp.identities - 1) / ((hsp.query_end) - hsp.query_start)
                nice_name = blast_record.query
                if " " in nice_name:
                    nice_name = nice_name[0 : nice_name.index(" ")]
                blast_gene.append(
                    {
                        "gi_nos": gi_nos,
                        "sbjct_length": alignment.length,
                        "query_length": blast_record.query_length,
                        "sbjct_range": (hsp.sbjct_start, hsp.sbjct_end),
                        "query_range": (hsp.query_start, hsp.query_end),
                        "name": nice_name,
                        "evalue": hsp.expect,
                        "identity": hsp.identities,
                        "identity_percent": x,
                        "hit_num": align_num,
                        "iter_num": iter_num,
                        "match_id": alignment.title.partition(">")[0],
                        "align_len": hsp.align_length,
                    }
                )
            blast.append(blast_gene)

    return blast


def openTSV(blasttsv, outFile):  # Modified from intron_detection
    blast = []
    activeAlign = ""
    numAlignments = 0
    qLen = 0
    for line in blasttsv:
        line = line.strip("\n")
        data = line.split("\t")
        for x in range(0, len(data)):
            data[x] = data[x].strip()
        qLen = data[22]
        if activeAlign == "":
            numAlignments += 1
            blast_gene = []
            hsp_num = 1
        elif activeAlign != data[1]:
            numAlignments += 1
            blast.append(blast_gene)
            blast_gene = []
            hsp_num = 1
        else:
            hsp_num += 1
        gi_nos = data[12]
        activeAlign = data[1]
        x = float(float(data[14]) - 1) / (float(data[7]) - float(data[6]))
        nice_name = data[1]
        if " " in nice_name:
            nice_name = nice_name[0 : nice_name.index(" ")]
        blast_gene.append(
            {
                "gi_nos": gi_nos,
                "sbjct_length": int(data[23]),
                "query_length": int(data[22]),
                "sbjct_range": (int(data[8]), int(data[9])),
                "query_range": (int(data[6]), int(data[7])),
                "name": nice_name,
                "evalue": float(data[10]),
                "identity": int(data[14]),
                "identity_percent": x,
                "hit_num": numAlignments,
                "iter_num": hsp_num,
                "match_id": data[24].partition(">")[0],
                "align_len": int(data[3]),
            }
        )

    blast.append(blast_gene)
    outFile.write("Query ID\tQuery Length\tTotal Number of Hits\n")
    outFile.write("%s\t%d\t%d\n\n" % (data[0], int(data[22]), numAlignments))

    return blast


def test_true(feature, **kwargs):
    return True


def superSets(inSets):
    inSets.sort(key=len, reverse=True)
    nextInd = 0
    res = []
    for i in range(0, len(inSets)):
        if i == 0:
            res.append(inSets[i])
            continue
        for par in res:
            complete = True
            for x in inSets[i]:
                if not (x in par):
                    complete = False
            if complete:
                break  # Subset of at least one member
        if not complete:
            res.append(inSets[i])
    return res


def disjointSets(inSets):
    inSets.sort(key=lambda x: x[0]["sbjct_range"][0])
    res = [inSets[0]]
    for i in range(1, len(inSets)):
        disjoint = True
        for elem in inSets[i]:
            for cand in res:
                if elem in cand:
                    disjoint = False
                    break
            if not disjoint:
                break
        if disjoint:
            res.append(inSets[i])
    return res


def compPhage(inRec, outFile, padding=1.2, cutoff=0.3, numReturn=20, isTSV=False):

    if isTSV:
        inRec = openTSV(inRec, outFile)
    else:
        inRec = parseXML(inRec, outFile)
    res = []
    for group in inRec:
        window = floor(padding * float(group[0]["query_length"]))
        group = sorted(group, key=lambda x: x["sbjct_range"][0])
        hspGroups = []
        lastInd = len(res)

        for x in range(0, len(group)):
            hspGroups.append([group[x]])
            startBound = group[x]["sbjct_range"][0]
            endBound = startBound + window
            for hsp in group[x + 1 :]:
                if (
                    hsp["sbjct_range"][0] >= startBound
                    and hsp["sbjct_range"][1] <= endBound
                ):
                    hspGroups[-1].append(hsp)

        for x in disjointSets(superSets(hspGroups)):
            res.append(x)

        maxID = 0.0
        for x in res[lastInd:]:
            sumID = 0.0
            totAlign = 0
            for y in x:
                totAlign += y["align_len"]
                sumID += float(y["identity"])
            x.append(totAlign)
            x.append(sumID / float(x[0]["query_length"]))
            maxID = max(maxID, x[-1])

    res = sorted(res, key=lambda x: x[-1], reverse=True)

    outList = []
    outNum = 0
    for x in res:
        if outNum == numReturn or x[-1] < cutoff:
            break
        outNum += 1
        outList.append(x)

    #    Original request was that low scoring clusters would make it to the final results IF
    #    they were part of an Accession cluster that did have at least one high scoring member.

    outFile.write(
        "Accession Number\tCluster Start Location\tEnd Location\tSubject Cluster Length\t# HSPs in Cluster\tTotal Aligned Length\t% of Query Aligned\tOverall % Query Identity\tOverall % Subject Identity\tComplete Accession Info\n"
    )
    for x in outList:
        minStart = min(x[0]["sbjct_range"][0], x[0]["sbjct_range"][1])
        maxEnd = max(x[0]["sbjct_range"][0], x[0]["sbjct_range"][1])
        if "|gb|" in x[0]["match_id"]:
            startSlice = x[0]["match_id"].index("gb|") + 3
            endSlice = (x[0]["match_id"][startSlice:]).index("|")
            accOut = x[0]["match_id"][startSlice : startSlice + endSlice]
        else:
            accOut = x[0]["gi_nos"]
        for y in x[0:-2]:
            # ("\t%.3f\t" % (x[-1]))
            minStart = min(minStart, y["sbjct_range"][0])
            maxEnd = max(maxEnd, y["sbjct_range"][1])
        outFile.write(
            accOut
            + "\t"
            + str(minStart)
            + "\t"
            + str(maxEnd)
            + "\t"
            + str(maxEnd - minStart + 1)
            + "\t"
            + str(len(x) - 1)
            + "\t"
            + str(x[-2])
            + ("\t%.3f" % (float(x[-2]) / float(x[0]["query_length"]) * 100.00))
            + ("\t%.3f" % (x[-1] * 100.00))
            + ("\t%.3f" % (float(x[-2]) / float(maxEnd - minStart + 1) * 100.00))
            + "\t"
            + x[0]["match_id"]
            + "\n"
        )

    # accession start end number


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Intron detection")
    parser.add_argument(
        "inRec", type=argparse.FileType("r"), help="blast XML protein results"
    )
    parser.add_argument(
        "--outFile",
        type=argparse.FileType("w"),
        help="Output Error Log",
        default="./compPro.tsv",
    )
    parser.add_argument(
        "--padding",
        help="Gap minimum (Default -1, set to a negative number to allow overlap)",
        default=1.2,
        type=float,
    )
    parser.add_argument(
        "--cutoff",
        help="Gap minimum (Default -1, set to a negative number to allow overlap)",
        default=0.3,
        type=float,
    )
    parser.add_argument(
        "--numReturn",
        help="Gap maximum in genome (Default 10000)",
        default=20,
        type=int,
    )
    parser.add_argument(
        "--isTSV",
        help="Opening Blast TSV result",
        action="store_true",
    )
    args = parser.parse_args()

    compPhage(**vars(args))