view cpt_related_genome_prot/relatedness_prot.py @ 0:ebcc87a27f9c draft default tip

Uploaded
author cpt
date Fri, 10 Jun 2022 08:46:28 +0000
parents
children
line wrap: on
line source

#!/usr/bin/env python
import sys
import argparse
import json
import logging
from Bio.Blast import NCBIXML

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger()

def parse_blast(blast, isXML = False):
    res = []
    finalRes = []
    if isXML:
      for iter_num, blast_record in enumerate(NCBIXML.parse(blast), 1):
        for alignment in blast_record.alignments:
            tempID = alignment.hit_id[alignment.hit_id.find("gb|") + 3:]
            tempID = tempID[:tempID.find("|")]
            tempDesc = alignment.title
            while tempDesc.find("|") >= 0:
              tempDesc = tempDesc[tempDesc.find("|") + 1:]
            tempDesc = tempDesc.strip()
            tempID = tempID.strip()
            #for hsp in alignment.hsps:
            line = [str(blast_record.query)[:str(blast_record.query).find("[")].strip()]
            line.append(alignment.hit_id)
            line.append(tempDesc)
            line.append(alignment.accession)
            res.append(line)
      blast.seek(0)
      resInd = -1
      taxLine = blast.readline()
      while taxLine: 
        if "<Hit>" in taxLine:
          resInd += 1
          taxSlice = ""
        elif "<taxid>" in taxLine:
          taxSlice = taxLine[taxLine.find("<taxid>") + 7:taxLine.find("</taxid>")]
          finalRes.append(res[resInd])
          finalRes[-1].append(taxSlice)
        taxLine = blast.readline()
      return finalRes
    else:
      for line in blast:
        finalRes.append(line.strip("\n").split("\t"))
    return finalRes

def with_dice(blast):
    for data in blast:
        dice = 2 * int(data[14]) / (float(data[22]) + float(data[23]))
        yield data + [dice]


def filter_dice(blast, threshold=0.5):
    for data in blast:
        if data[-1] > threshold:
            yield data


def split_identifiers_nucl(_, ident):
    if "<>" in ident:
        idents = ident.split("<>")
    else:
        idents = [ident]
    return idents


def split_identifiers_prot(_, ident):
    if "<>" in ident:
        idents = ident.split("<>")
    else:
        idents = [ident]
    return [
        x[x.index("[") + 1 : x.rindex("]")]
        for x in idents
        # MULTISPECIES: recombination-associated protein RdgC [Enterobacteriaceae]<>RecName: Full=Recombination-associated protein RdgC<>putative exonuclease, RdgC [Enterobacter sp. 638]
        if "[" in x and "]" in x
    ]


def split_identifiers_phage(par, ident):
    par = par.replace("lcl|", "")
    par = par[0 : par.index("_prot_")]
    return [par]


def important_only(blast, split_identifiers):
    for data in blast:
        yield [
            data[0],  # 01 Query Seq-id (ID of your sequence)
            data[1],  # 13 All subject Seq-id(s), separated by a ';'
            split_identifiers(
                data[1], data[2]
            ),  # 25 All subject title(s), separated by a '<>'
            data[3].split(";"),  # Extra: All Subject Accessions
            data[4].split(";"),  # Extra: All TaxIDs
        ]


def deform_scores(blast):
    for data in blast:
        for org in data[2]:
            yield [data[0], data[1], org, data[3], data[4]]


def expand_fields(blast):
    for data in blast:
        for x in range(0, len(data[4])):
            yield [data[0], data[1], data[2][x], data[3], int(data[4][x])]

def expand_taxIDs(blast, taxFilter):
    for data in blast:
        # if(len(data[4]) > 0):
        #  print(data[0])
        for ID in data[4]:
            if ID != "N/A":
              filterOut = False
              for tax in taxFilter:
                if str(ID).strip() == tax:
                  filterOut = True
              if not filterOut:
                yield [data[0], data[1], data[2], data[3], int(ID)]


def expand_titles(blast):
    for data in blast:
        for title in data[2]:
            yield [data[0], data[1], title, data[3], data[4]]


def filter_phage(blast, phageTaxLookup, phageSciNames):
    for data in blast:
        for x in range(0, len(phageTaxLookup)):
            if (data[4]) == phageTaxLookup[x]:
                yield [data[0], data[1], phageSciNames[x], data[3], data[4]]
                break


def remove_dupes(data):
    has_seen = {}
    for row in data:
        # qseqid, sseqid
        key = (row[0], row[4])
        # If we've seen the key before, we can exit
        if key in has_seen:
            continue

        # Otherwise, continue on
        has_seen[key] = True
        # Pretty simple
        yield row

def scoreMap(blast):
    c = {}
    m = {}
    for (qseq, subID, subTitle, access, ID) in blast:
        if (str(subTitle), ID) not in c:
            m[(str(subTitle), ID)] = access
            c[(str(subTitle), ID)] = 0

        c[(str(subTitle), ID)] += 1
    return c, m


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Top related genomes")
    parser.add_argument(
        "blast", type=argparse.FileType("r"), help="Blast 25 Column Results"
    )
    parser.add_argument("phagedb", type=argparse.FileType("r"))
    parser.add_argument("--access", action="store_true")
    parser.add_argument("--protein", action="store_true")
    parser.add_argument("--canonical", action="store_true")
    parser.add_argument("--noFilter", action="store_true")
    #parser.add_argument("--title", action="store_true") # Add when ready to update XML after semester
    parser.add_argument("--hits", type=int, default=5)
    parser.add_argument("--xmlMode", action="store_true")   
    parser.add_argument("--taxFilter", type=str) 

    args = parser.parse_args()

    phageDb = args.phagedb
    phageTaxLookup = []
    sciName = []
    line = phageDb.readline()
    
    taxList = []
    if args.taxFilter and args.taxFilter != "" :
      args.taxFilter = args.taxFilter.split(" ")
      for ind in args.taxFilter:
        taxList.append(ind.strip())

    while line:
        line = line.split("\t")
        phageTaxLookup.append(int(line[0]))
        line[1] = line[1].strip()
        if (line[1] == ""):
            line[1] = "Novel Genome"
        sciName.append(line[1])
        line = phageDb.readline()

    if args.protein:
        splitId = split_identifiers_prot
        # phageNameLookup = {k['source'].rstrip('.'): k['id'] for k in phageDb}
    elif args.canonical:
        splitId = split_identifiers_phage
        # phageNameLookup = {k['source'].rstrip('.'): k['id'] for k in phageDb}
    else:
        splitId = split_identifiers_nucl
        # phageNameLookup = {k['desc'].rstrip('.'): k['id'] for k in phageDb}

    data = parse_blast(args.blast, args.xmlMode)
    # data = with_dice(data)
    # data = filter_dice(data, threshold=0.0)
    data = important_only(data, splitId)
    
    data = expand_taxIDs(data, taxList)
    data = remove_dupes(data)
    if not args.noFilter:
        data = filter_phage(data, phageTaxLookup, sciName)
    listify = []
    for x in data:
        listify.append(x)
    #listify = greatest_taxID(listify)
       
    count_label = "Similar Unique Proteins"
    
    counts, accessions = scoreMap(listify)
    
    nameRec = listify[0][0]
    sys.stdout.write(
            "Top %d matches for BLASTp results of %s\n"
            % (args.hits, nameRec)
        )
    header = "# TaxID\t"
    #if args.title:
    header += "Name\t"
    if args.access:
        header += "Accessions\t"
    header += "Similar Unique Proteins\n"
    sys.stdout.write(header)

    for idx, ((name, ID), num) in enumerate(
            sorted(counts.items(), key=lambda item: -item[1])
        ):
        if idx > args.hits - 1:
            break
        line = str(ID) + "\t"
        #if args.title:
        line += str(name) + "\t"
        if args.access:
          line += str(accessions[(name, ID)][0]) + "\t"
        line += str(num) + "\n" 
        sys.stdout.write(line)