Mercurial > repos > cpt > cpt_related_genomes_prot
diff cpt_related_genome_prot/relatedness_prot.py @ 0:ebcc87a27f9c draft default tip
Uploaded
author | cpt |
---|---|
date | Fri, 10 Jun 2022 08:46:28 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cpt_related_genome_prot/relatedness_prot.py Fri Jun 10 08:46:28 2022 +0000 @@ -0,0 +1,254 @@ +#!/usr/bin/env python +import sys +import argparse +import json +import logging +from Bio.Blast import NCBIXML + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger() + +def parse_blast(blast, isXML = False): + res = [] + finalRes = [] + if isXML: + for iter_num, blast_record in enumerate(NCBIXML.parse(blast), 1): + for alignment in blast_record.alignments: + tempID = alignment.hit_id[alignment.hit_id.find("gb|") + 3:] + tempID = tempID[:tempID.find("|")] + tempDesc = alignment.title + while tempDesc.find("|") >= 0: + tempDesc = tempDesc[tempDesc.find("|") + 1:] + tempDesc = tempDesc.strip() + tempID = tempID.strip() + #for hsp in alignment.hsps: + line = [str(blast_record.query)[:str(blast_record.query).find("[")].strip()] + line.append(alignment.hit_id) + line.append(tempDesc) + line.append(alignment.accession) + res.append(line) + blast.seek(0) + resInd = -1 + taxLine = blast.readline() + while taxLine: + if "<Hit>" in taxLine: + resInd += 1 + taxSlice = "" + elif "<taxid>" in taxLine: + taxSlice = taxLine[taxLine.find("<taxid>") + 7:taxLine.find("</taxid>")] + finalRes.append(res[resInd]) + finalRes[-1].append(taxSlice) + taxLine = blast.readline() + return finalRes + else: + for line in blast: + finalRes.append(line.strip("\n").split("\t")) + return finalRes + +def with_dice(blast): + for data in blast: + dice = 2 * int(data[14]) / (float(data[22]) + float(data[23])) + yield data + [dice] + + +def filter_dice(blast, threshold=0.5): + for data in blast: + if data[-1] > threshold: + yield data + + +def split_identifiers_nucl(_, ident): + if "<>" in ident: + idents = ident.split("<>") + else: + idents = [ident] + return idents + + +def split_identifiers_prot(_, ident): + if "<>" in ident: + idents = ident.split("<>") + else: + idents = [ident] + return [ + x[x.index("[") + 1 : x.rindex("]")] + for x in idents + # MULTISPECIES: recombination-associated protein RdgC [Enterobacteriaceae]<>RecName: Full=Recombination-associated protein RdgC<>putative exonuclease, RdgC [Enterobacter sp. 638] + if "[" in x and "]" in x + ] + + +def split_identifiers_phage(par, ident): + par = par.replace("lcl|", "") + par = par[0 : par.index("_prot_")] + return [par] + + +def important_only(blast, split_identifiers): + for data in blast: + yield [ + data[0], # 01 Query Seq-id (ID of your sequence) + data[1], # 13 All subject Seq-id(s), separated by a ';' + split_identifiers( + data[1], data[2] + ), # 25 All subject title(s), separated by a '<>' + data[3].split(";"), # Extra: All Subject Accessions + data[4].split(";"), # Extra: All TaxIDs + ] + + +def deform_scores(blast): + for data in blast: + for org in data[2]: + yield [data[0], data[1], org, data[3], data[4]] + + +def expand_fields(blast): + for data in blast: + for x in range(0, len(data[4])): + yield [data[0], data[1], data[2][x], data[3], int(data[4][x])] + +def expand_taxIDs(blast, taxFilter): + for data in blast: + # if(len(data[4]) > 0): + # print(data[0]) + for ID in data[4]: + if ID != "N/A": + filterOut = False + for tax in taxFilter: + if str(ID).strip() == tax: + filterOut = True + if not filterOut: + yield [data[0], data[1], data[2], data[3], int(ID)] + + +def expand_titles(blast): + for data in blast: + for title in data[2]: + yield [data[0], data[1], title, data[3], data[4]] + + +def filter_phage(blast, phageTaxLookup, phageSciNames): + for data in blast: + for x in range(0, len(phageTaxLookup)): + if (data[4]) == phageTaxLookup[x]: + yield [data[0], data[1], phageSciNames[x], data[3], data[4]] + break + + +def remove_dupes(data): + has_seen = {} + for row in data: + # qseqid, sseqid + key = (row[0], row[4]) + # If we've seen the key before, we can exit + if key in has_seen: + continue + + # Otherwise, continue on + has_seen[key] = True + # Pretty simple + yield row + +def scoreMap(blast): + c = {} + m = {} + for (qseq, subID, subTitle, access, ID) in blast: + if (str(subTitle), ID) not in c: + m[(str(subTitle), ID)] = access + c[(str(subTitle), ID)] = 0 + + c[(str(subTitle), ID)] += 1 + return c, m + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Top related genomes") + parser.add_argument( + "blast", type=argparse.FileType("r"), help="Blast 25 Column Results" + ) + parser.add_argument("phagedb", type=argparse.FileType("r")) + parser.add_argument("--access", action="store_true") + parser.add_argument("--protein", action="store_true") + parser.add_argument("--canonical", action="store_true") + parser.add_argument("--noFilter", action="store_true") + #parser.add_argument("--title", action="store_true") # Add when ready to update XML after semester + parser.add_argument("--hits", type=int, default=5) + parser.add_argument("--xmlMode", action="store_true") + parser.add_argument("--taxFilter", type=str) + + args = parser.parse_args() + + phageDb = args.phagedb + phageTaxLookup = [] + sciName = [] + line = phageDb.readline() + + taxList = [] + if args.taxFilter and args.taxFilter != "" : + args.taxFilter = args.taxFilter.split(" ") + for ind in args.taxFilter: + taxList.append(ind.strip()) + + while line: + line = line.split("\t") + phageTaxLookup.append(int(line[0])) + line[1] = line[1].strip() + if (line[1] == ""): + line[1] = "Novel Genome" + sciName.append(line[1]) + line = phageDb.readline() + + if args.protein: + splitId = split_identifiers_prot + # phageNameLookup = {k['source'].rstrip('.'): k['id'] for k in phageDb} + elif args.canonical: + splitId = split_identifiers_phage + # phageNameLookup = {k['source'].rstrip('.'): k['id'] for k in phageDb} + else: + splitId = split_identifiers_nucl + # phageNameLookup = {k['desc'].rstrip('.'): k['id'] for k in phageDb} + + data = parse_blast(args.blast, args.xmlMode) + # data = with_dice(data) + # data = filter_dice(data, threshold=0.0) + data = important_only(data, splitId) + + data = expand_taxIDs(data, taxList) + data = remove_dupes(data) + if not args.noFilter: + data = filter_phage(data, phageTaxLookup, sciName) + listify = [] + for x in data: + listify.append(x) + #listify = greatest_taxID(listify) + + count_label = "Similar Unique Proteins" + + counts, accessions = scoreMap(listify) + + nameRec = listify[0][0] + sys.stdout.write( + "Top %d matches for BLASTp results of %s\n" + % (args.hits, nameRec) + ) + header = "# TaxID\t" + #if args.title: + header += "Name\t" + if args.access: + header += "Accessions\t" + header += "Similar Unique Proteins\n" + sys.stdout.write(header) + + for idx, ((name, ID), num) in enumerate( + sorted(counts.items(), key=lambda item: -item[1]) + ): + if idx > args.hits - 1: + break + line = str(ID) + "\t" + #if args.title: + line += str(name) + "\t" + if args.access: + line += str(accessions[(name, ID)][0]) + "\t" + line += str(num) + "\n" + sys.stdout.write(line)