Mercurial > repos > cpt > cpt_related_genomes_nuc
view cpt_related_genome_nuc/relatedness.py @ 0:5a5fe0a6f78d draft default tip
Uploaded
author | cpt |
---|---|
date | Fri, 10 Jun 2022 08:45:13 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python import sys import argparse import json import logging from Bio.Blast import NCBIXML logging.basicConfig(level=logging.DEBUG) log = logging.getLogger() def parse_blast(blast, isXML = False): res = [] finalRes = [] if isXML: for iter_num, blast_record in enumerate(NCBIXML.parse(blast), 1): for alignment in blast_record.alignments: tempID = alignment.hit_id[alignment.hit_id.find("gb|") + 3:] tempID = tempID[:tempID.find("|")] tempDesc = alignment.title while tempDesc.find("|") >= 0: tempDesc = tempDesc[tempDesc.find("|") + 1:] tempDesc = tempDesc.strip() tempID = tempID.strip() for hsp in alignment.hsps: line = [str(blast_record.query)] line.append(str(hsp.align_length)) line.append(str(hsp.identities)) line.append(str(blast_record.query_length)) line.append(str(alignment.length)) line.append(tempDesc) line.append(tempID) #line.append("0000000") #print(line) res.append(line) blast.seek(0) resInd = -1 taxLine = blast.readline() while taxLine: if "<Hit>" in taxLine: resInd += 1 taxSlice = "" elif "<taxid>" in taxLine: taxSlice = taxLine[taxLine.find("<taxid>") + 7:taxLine.find("</taxid>")] finalRes.append(res[resInd]) finalRes[-1].append(taxSlice) #print(finalRes[-1]) taxLine = blast.readline() return finalRes else: for line in blast: taxSplit = [] preTaxSplit = line.strip("\n").split("\t") for tax in preTaxSplit[-1].split(";"): shallowCopy = [] for x in range(len(preTaxSplit)): shallowCopy.append(preTaxSplit[x]) shallowCopy[-1] = tax res.append(shallowCopy) for line in res: for access in line[6].split(";"): shallowCopy = [] for x in range(len(line)): shallowCopy.append(line[x]) shallowCopy[6] = access finalRes.append(shallowCopy) # for x in finalRes: # print(x) # exit() return finalRes def add_dice(blast): res = [] for data in blast: dice = (2 * float(data[2])) / (float(data[3]) + float(data[4])) res.append(data + [dice]) return res def make_num(blast): res = [] for data in blast: res.append( [ data[0], int(data[1]), int(data[2]), int(data[3]), int(data[4]), data[5], data[6], (data[7]), ] ) return res def bundle_dice(blast): res = [] ind = 0 seen = {} for x in blast: if (x[6] + "_" + (x[7])) in seen.keys(): res[seen[(x[6] + "_" + (x[7]))]][1] += x[1] res[seen[(x[6] + "_" + (x[7]))]][2] += x[2] res[seen[(x[6] + "_" + (x[7]))]][8] += x[8] res[seen[(x[6] + "_" + (x[7]))]][9] += 1 # Num HSPs else: seen[(x[6] + "_" + (x[7]))] = ind shallowCopy = [] for i in range(len(x)): shallowCopy.append(x[i]) shallowCopy.append(1) # print(shallowCopy) res.append(shallowCopy) ind += 1 # print(seen) # stop = 0 # for i in res: # print(i) # stop += 1 # if stop > 7: # exit() return res """def bundle_dice(blast): res = [] ind = 0 seen = {} for x in blast: if ((x[0] + x[5])) in seen.keys(): res[seen[(x[0] + x[5])]][1] += x[1] res[seen[(x[0] + x[5])]][2] += x[2] res[seen[(x[0] + x[5])]][9] += 1 # Num HSPs res[seen[(x[0] + x[5])]][8] = ((res[seen[(x[0] + x[5])]][8] * (res[seen[(x[0] + x[5])]][9] - 1)) + x[8]) / res[seen[(x[0] + x[5])]][9] else: seen[(x[0] + x[5])] = ind res.append(x + [1]) ind += 1 return res """ def filter_dice(blast, threshold=0.5): for data in blast: if data[-1] > threshold: yield data def split_identifiers_nucl(_, ident): if "<>" in ident: idents = ident.split("<>") else: idents = [ident] return idents def split_identifiers_prot(_, ident): if "<>" in ident: idents = ident.split("<>") else: idents = [ident] return [ x[x.index("[") + 1 : x.rindex("]")] for x in idents # MULTISPECIES: recombination-associated protein RdgC [Enterobacteriaceae]<>RecName: Full=Recombination-associated protein RdgC<>putative exonuclease, RdgC [Enterobacter sp. 638] if "[" in x and "]" in x ] def split_identifiers_phage(par, ident): par = par.replace("lcl|", "") par = par[0 : par.index("_prot_")] return [par] def important_only(blast, split_identifiers): for data in blast: yield [ data[0], # 01 Query Seq-id (ID of your sequence) data[1], # 13 All subject Seq-id(s), separated by a ';' split_identifiers( data[1], data[2] ), # 25 All subject title(s), separated by a '<>' data[3].split(";"), # Extra: All Subject Accessions data[4].split(";"), # Extra: All TaxIDs ] def deform_scores(blast): for data in blast: for org in data[2]: yield [data[0], data[1], org, data[3], data[4]] def expand_taxIDs(blast): for data in blast: # if(len(data[4]) > 0): # print(data[0]) for ID in data[7]: yield [ data[0], data[1], data[2], data[3], data[4], data[5], data[6], int(ID), data[8], ] def expand_titles(blast): for data in blast: for title in data[5]: yield [ data[0], data[1], data[2], data[3], data[4], title, data[6], data[7], data[8], ] def filter_phage(blast, phageTaxLookup): res = [] for data in blast: if int(data[7]) in phageTaxLookup: res.append(data) return res def remove_dupes(data): has_seen = {} res = [] for row in data: # qseqid, sseqid key = (row[0], row[6], row[7]) # If we've seen the key before, we can exit if key in has_seen: continue # Otherwise, continue on has_seen[key] = True # Pretty simple res.append(row) return res def scoreMap(blast): c = {} m = {} for (qseq, subID, subTitle, access, ID) in blast: if (subTitle, ID) not in c: m[(subTitle, ID)] = access c[(subTitle, ID)] = 0 c[(subTitle, ID)] += 1 return c, m if __name__ == "__main__": parser = argparse.ArgumentParser(description="Top related genomes") parser.add_argument( "blast", type=argparse.FileType("r"), help="Blast 25 Column Results" ) parser.add_argument("phagedb", type=argparse.FileType("r")) parser.add_argument("--access", action="store_true") parser.add_argument("--protein", action="store_true") parser.add_argument("--canonical", action="store_true") parser.add_argument("--hits", type=int, default=5) parser.add_argument("--noFilter", action="store_true") parser.add_argument("--xmlMode", action="store_true") args = parser.parse_args() phageDb = args.phagedb phageTaxLookup = [] line = phageDb.readline() while line: line = line.split("\t") phageTaxLookup.append(int(line[0])) line = phageDb.readline() if args.protein: splitId = split_identifiers_prot # phageNameLookup = {k['source'].rstrip('.'): k['id'] for k in phageDb} elif args.canonical: splitId = split_identifiers_phage # phageNameLookup = {k['source'].rstrip('.'): k['id'] for k in phageDb} else: splitId = split_identifiers_nucl # phageNameLookup = {k['desc'].rstrip('.'): k['id'] for k in phageDb} data = [] # Reformatting to list rather than generator data = parse_blast(args.blast, args.xmlMode) nameRec = data[0][0] data = make_num(data) data = add_dice(data) data = bundle_dice(data) # data = filter_dice(data, threshold=0.0) # data = important_only(data, splitId) # data = expand_taxIDs(data) # data = deform_scores(data) if not args.noFilter: data = filter_phage(data, phageTaxLookup) # data = expand_titles(data) if args.protein or args.canonical: data = remove_dupes(data) # Probably obsolete, bundle dice should do this count_label = "Similar Unique Proteins" else: count_label = "Nucleotide Hits" # data = with_dice(data) data.sort(key=lambda data: -data[8]) # counts, accessions = scoreMap(data) if args.access: sys.stdout.write( "Top %d matches for BLASTn results of %s\t\t\t\t\t\t\n" % (args.hits, nameRec) ) sys.stdout.write( "TaxID\tName\tAccessions\tSubject Length\tNumber of HSPs\tTotal Aligned Length\tDice Score\n" ) ind = 0 for out in data: if ind >= args.hits: break ind += 1 sys.stdout.write( "%s\t%s\t%s\t%s\t%s\t%s\t%.4f\n" % (out[7], out[5], out[6], out[4], out[9], out[2], out[8]) ) else: sys.stdout.write( "Top %d matches for BLASTn results of %s\t\t\t\t\t\n" % (args.hits, data[0][0]) ) sys.stdout.write( "TaxID\tName\tSubject Length\tNumber of HSPs\tTotal Aligned Length\tDice Score\n" ) ind = 0 for out in data: if ind >= args.hits: break ind += 1 sys.stdout.write( "%s\t%s\t%s\t%s\t%s\t%.4f\n" % (out[7], out[5], out[4], out[9], out[1], out[8]) )