Mercurial > repos > cpt > cpt_search_file
diff searchFile.py @ 1:6e3a843b6304 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:53:18 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/searchFile.py Mon Jun 05 02:53:18 2023 +0000 @@ -0,0 +1,450 @@ +##### User input File(s), that are BLAST XML, gff3, and/or Genbank and then searched for containing user designated terms + +import argparse +import explodeJSON as ej +import gffutils # THIS IS REQUIREMENT +from Bio.Blast import NCBIXML +from Bio import SeqIO +import re +import os + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + +####### TERM FUNCTIONS +def dbaseTerms(terms, galaxy=True): + """Index into dictionary object and retrieve all desired terms""" + db_path = os.path.join(SCRIPT_DIR, "data/lysis-family-v1.0.3.json") + db = ej.explodeJSON(db_path) + db = db.readJSON() + dbase_terms = [] + if terms: + for term in terms: + index_list = term.split(",") + for t in index_list: + if t != "None": + dbase_terms.extend(db[t]) + else: + dbase_terms = [] + return dbase_terms + else: + pass + + +def userTerms(file, text): + """Select terms input by user""" + user_terms = [] + if file: + terms = open(file.name).read().splitlines() + user_terms.extend(terms) + else: + pass + if text: + if re.search(("__cn__"), str(text[0])): + # s = text[0].split("__cn__") + # print(s) + # print(text[0]) + s = text[0] + # print(type(s)) + split = s.split("__cn__") + # print(split) + user_terms.extend(split) + else: + user_terms.extend(text) + else: + pass + + return user_terms + + +def glueTerms(dbase_terms, user_terms): + """glue dbaseTerms and userTerms together for eventual query item""" + glued = [] + if dbase_terms: + glued.extend(dbase_terms) + else: + pass + if user_terms: + glued.extend(user_terms) + else: + pass + + return glued + + +####### FILE FUNCTIONS +def glueFiles(gff, gbk, fa, blast): + """glue files into one list...I think this is a decent way to go about this...#CHECK LATER#...""" + files = [] + gffs = [] + gbks = [] + blasts = [] + if gff: + for gff_file in gff: + gffs.extend(gff_file) + else: + pass + if gbk: + for gbk_file in gbk: + gbks.extend(gbk_file) + # print(gbks) + else: + pass + fas = [] + if fa: + for fa_file in fa: + fas.extend(fa_file) + else: + pass + if blast: + for blast_file in blast: + blasts.extend(blast_file) + else: + pass + files = [gffs, gbks, fas, blasts] + + return files + + +######## PARSE FILE FUNCTIONS +def readGFF3(files, search_list): + "Searches through gff3 file(s) and appends" + if files: + for idx, file in enumerate(files): + if idx == 0: + print("Parsing - " + file.name) + db = gffutils.create_db( + file.name, dbfn="file.db", force=True, keep_order=False + ) + db = gffutils.FeatureDB("file.db") + features = db.all_features() + gff3_matches = [] + for feature in features: + gff3_matches.extend( + searchInput(str(feature), search_list=search_list) + ) + gff3_matches = list( + set(gff3_matches) + ) # make sure we don't fluff the list + else: + print("Parsing - " + file.name) + db = gffutils.create_db( + file.name, dbfn=str(idx) + "_file.db", force=True, keep_order=False + ) + db = gffutils.FeatureDB(str(idx) + "_file.db") + features = db.all_features() + for feature in features: + gff3_matches.extend( + searchInput(str(feature), search_list=search_list) + ) + gff3_matches = list( + set(gff3_matches) + ) # make sure we don't fluff the list + gff3_matches.sort() + return gff3_matches + else: + pass + + +def readGBK(files, search_list): + if files: + for idx, file in enumerate(files): + if idx == 0: + print("Parsing - " + file.name) + record = SeqIO.read(file.name, "genbank") + gbk_matches = [] + for feature in record.features: + try: + if ( + searchInput( + str(feature.qualifiers["product"]), + search_list=search_list, + ) + or searchInput( + str(feature.qualifiers["note"]), search_list=search_list + ) + or searchInput( + str(feature.qualifiers["dbxref"]), + search_list=search_list, + ) + ): + gbk_matches.extend([str(feature)]) + else: + continue + except KeyError: + continue + gbk_matches = list(set(gbk_matches)) + else: + print("Parsing - " + file.name) + record = SeqIO.read(file.name, "genbank") + for feature in record.features: + try: + if ( + searchInput( + str(feature.qualifiers["product"]), + search_list=search_list, + ) + or searchInput( + str(feature.qualifiers["note"]), search_list=search_list + ) + or searchInput( + str(feature.qualifiers["dbxref"]), + search_list=search_list, + ) + ): + gbk_matches.extend([str(feature)]) + else: + continue + except KeyError: + continue + gbk_matches = list(set(gbk_matches)) + gbk_matches.sort() + return gbk_matches + else: + pass + + +def readFASTA(files, search_list): + if files: + for idx, file in enumerate(files): + if idx == 0: + print("Parsing - " + file.name) + record = SeqIO.parse(file.name, "fasta") + fa_matches = [] + for feature in record: + fa_matches.extend( + searchInput(feature.description, search_list=search_list) + ) + fa_matches = list(set(fa_matches)) + else: + print("Parsing - " + file.name) + record = SeqIO.parse(file.name, "fasta") + for feature in record: + fa_matches.extend( + searchInput(feature.description, search_list=search_list) + ) + fa_matches = list(set(fa_matches)) + fa_matches.sort() + return fa_matches + else: + pass + + +def readBLAST(files, search_list): + if files: + for idx, file in enumerate(files): + if idx == 0: + print("Parsing - " + file.name) + blast_records = NCBIXML.parse(open(file.name)) + blast_matches = [] + for blast_record in blast_records: + for desc in blast_record.descriptions: + pretty = prettifyXML(str(desc)) + for each_ret in pretty: + blast_matches.extend( + searchInput( + each_ret, + search_list=search_list, + blast=True, + q_id=blast_record.query, + ) + ) + blast_matches = list(set(blast_matches)) + else: + print("Parsing - " + file.name) + blast_records = NCBIXML.parse(open(file.name)) + for blast_record in blast_records: + for desc in blast_record.descriptions: + pretty = prettifyXML(str(desc)) + blast_matches.extend( + searchInput( + each_ret, + search_list=search_list, + blast=True, + q_id=blast_record.query, + ) + ) + blast_matches = list(set(blast_matches)) + blast_matches.sort() + return blast_matches + else: + pass + + +######## SEARCH FILE FUNCTIONS +def searchInput(input, search_list, blast=False, q_id=None): + """Takes an input search string, and returns uniques of passing""" + output = [] + for search_term in search_list: + if blast: + if re.search(re.escape(search_term), input): + add_query = ( + "QueryID: " + + str(q_id) + + "\nSearchQuery: " + + search_term + + "\nMatch: " + + input + + "\n" + ) + output.extend([add_query]) + else: + continue + # print(search_term) + # st = r"\b"+search_term+r"\b" + else: + if re.search(re.escape(search_term), input): + # print(search_term+" -> was found") + output.extend([input]) + else: + continue + return list(set(output)) + + +######## prettify-XML function +def prettifyXML(input): + """prettifies a string input from a BLAST-xml""" + s = input + split = s.split(">") + + return split + + +########## Output File Writer +def writeResults(gffs, gbks, fas, blasts, outName="termHits.txt"): + """Takes an input list for each parameter, and writes each result to the output file""" + + with open(outName.name, "w+") as out_file: + if gffs: + + out_file.writelines( + "\n==================== GFF3 Term Hits ====================\n\n" + ) + for gff_hits in gffs: + out_file.writelines(gff_hits + "\n") + else: + gffs = [] + if gbks: + out_file.writelines( + "\n==================== GBK Term Hits ====================\n\n" + ) + for gbk_hits in gbks: + out_file.writelines(gbk_hits + "\n") + else: + gbks = [] + if fas: + + out_file.writelines( + "\n==================== FASTA Term Hits ====================\n\n" + ) + for fa_hits in fas: + out_file.writelines(fa_hits + "\n") + else: + fas = [] + if blasts: + + out_file.writelines( + "\n==================== BLAST Term Hits ====================\n\n" + ) + for blast_hits in blasts: + out_file.writelines(blast_hits + "\n") + else: + blasts = [] + if len(gffs) or len(gbks) or len(fas) or len(blasts): + print("Terms Found") + else: + out_file.writelines("No query matches, try again with new terms!") + print("No query matches, try again with new terms!") + + +def write_gff3(gffs, outName="proxHits.gff3"): + """writes output to gff3 file for prox2lysis pipeline""" + + with open(outName.name, "w+") as out_file: + out_file.writelines("##gff-version 3\n") + if gffs: + for gff_hits in gffs: + out_file.writelines(gff_hits + "\n") + else: + # raise Exception("No terms were found from query set") + out_file.writelines("##No terms were found from query set\n") + + +if __name__ == "__main__": + print(os.getcwd()) + parser = argparse.ArgumentParser( + description="Uses a selection of terms to query an input file for matching cases" + ) + parser.add_argument( + "--dbaseTerms", nargs="*", help="dbase terms to search" + ) # will be a select option, based on KEY within the JSON dbase + parser.add_argument( + "--custom_txt", + nargs="*", + help="custom user input terms, if using Galaxy, terms will be __cn__ sep, otherwise by space", + ) + parser.add_argument( + "--custom_file", + type=argparse.FileType("r"), + help="custom new line separated search term file", + ) + parser.add_argument( + "--gff3_files", + type=argparse.FileType("r"), + nargs="*", + action="append", + help="GFF3 File(s), if multiple files, use another flag", + ) + parser.add_argument( + "--gbk_files", + type=argparse.FileType("r"), + nargs="*", + action="append", + help="GBK File(s), if multiple files, use another flag", + ) + parser.add_argument( + "--fa_files", + type=argparse.FileType("r"), + nargs="*", + action="append", + help="FASTA File(s), if multiple files, use another flag", + ) + parser.add_argument( + "--blast_files", + type=argparse.FileType("r"), + nargs="*", + action="append", + help="BLAST.xml File(s), if multiple files, use another flag", + ) + parser.add_argument( + "--output", type=argparse.FileType("w+"), default="termHits.txt" + ) + parser.add_argument( + "--prox", action="store_true", help="Use when running the prox2lysis pipeline" + ) + args = parser.parse_args() + + ############ STEP I + ##### Determine user's terms to query + dbase_terms = dbaseTerms(terms=args.dbaseTerms, galaxy=True) + user_terms = userTerms(file=args.custom_file, text=args.custom_txt) + glued_terms = glueTerms(dbase_terms=dbase_terms, user_terms=user_terms) + + ############ STEP II + ##### Create list with matches + files = glueFiles( + gff=args.gff3_files, + gbk=args.gbk_files, + fa=args.fa_files, + blast=args.blast_files, + ) + gffs = readGFF3(files=files[0], search_list=glued_terms) + gbks = readGBK(files=files[1], search_list=glued_terms) + fas = readFASTA(files=files[2], search_list=glued_terms) + blasts = readBLAST(files=files[3], search_list=glued_terms) + + ############ STEP III + ##### Output results to a text file or gff3 + if args.prox: + write_gff3(gffs, outName=args.output) + else: + writeResults(gffs, gbks, fas, blasts, outName=args.output)