Mercurial > repos > nedias > blast_tool
diff blast_tool.py @ 0:45fbe538fa01 draft
Uploaded
author | nedias |
---|---|
date | Wed, 12 Oct 2016 18:11:41 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/blast_tool.py Wed Oct 12 18:11:41 2016 -0400 @@ -0,0 +1,176 @@ +""" + Util class of BLAST API + Use Biopython's BLAST interface to access BLAST API + BLAST call may take more than 5 min to complete. + + Author Nedias Sept, 2016 + +""" + + +# BLAST Web api, use to calling BLAST using url +from Bio.Blast import NCBIWWW +# BLAST XML util, use to read data from BLAST's response +from Bio.Blast import NCBIXML + + +# Get command and parameter from entry and call corresponding function +def exec_tool(options): + if options.format and options.format == "fasta": + blast_call(options.input, options.output, options.program, options.database) + + +# Call BLAST protein API +# input: 1.in_file: an fasta format file with polypeptide data +# 2.out_file: an XML format file with BLAST result +# 3.program: define which program of BLAST is used +# 4.database: define which database of BLAST is used +# output: the searching result file from BLAST in XML format +# return: execute status code(developing) +def blast_call(in_file, out_file, program, database): + + # Open input file(read only) + input_file = open(in_file, "rU") + # Create an output file(create or override) + output_file = open(out_file, "w+") + + # Read all data from the input fasta file + fasta_string = input_file.read() + + # Call BLAST to search similar protein data + result_handle = NCBIWWW.qblast(program, database, fasta_string) + + # Read result form responded data and save them in output file + result = result_handle.read() + output_file.write(result) + + return 0 + + +# Call BLAST protein API (text version) +# input: 1.in_text: polypeptide data text +# 2.program: define which program of BLAST is used +# 3.database: define which database of BLAST is used +# return: the searching result from BLAST in StringIO format +def blast_call_text(in_text, program, database): + + # Call BLAST to search similar protein data + result_handle = NCBIWWW.qblast(program, database, in_text) + + return result_handle + + +# Abstract data from BLAST responded XML +# input: the searching result file from BLAST in XML format +# return: an dictionary include all data from BLAST result. +# the structure of the dictionary is: +# dict -- hits: Integer, number of total hits during searching +# -- alignments: List, alignments for all polypeptide chains(there could be many chains in a single file) +# | (TODO:May consider change to dictionary and use the tag of every polypeptide chain as entry) +# | +# -- hsps: List, all high-scoring pairs(alignment) for one polypeptide chain +# | +# hps: Dictionary, contains all data of the high-scoring pair +def read_blast_result(blast_xml): + + # Open the BLAST result + result_file = open(blast_xml) + # Read all result using biopython + blast_record = NCBIXML.read(result_file) + # E-value limit, every data with a less-than 96% e-value will be filtered + E_VALUE_THRESH = 0.04 + + # Initialize the result dictionary + result_dict = dict() + + # Store the hits number + result_dict['hits': len(blast_record.alignments)] + # Initialize the alignment list + aligns = list() + # Scan through all alignments + for alignment in blast_record.alignments: + # Initialize the high-score pairs list + hsps = list() + # Scan through all hsps + for hsp in alignment.hsps: + # filter all hsp has less e-value than E_VALUE_THRESH + if hsp.expect < E_VALUE_THRESH: + # Initialize high-score pair dictionary + hsp_dict = dict() + # Store all data obtained from BLAST into the dictionary + hsp_dict['seq'] = alignment.title + hsp_dict['len'] = alignment.length + hsp_dict['eval'] = hsp.expect + # Identity is calculated as the percentage of identities in the alignment + hsp_dict['ident'] = int(round(hsp.identities * float(100) / alignment.length)) + hsp_dict['query'] = hsp.query + hsp_dict['match'] = hsp.match + hsp_dict['sbjct'] = hsp.sbjct + hsps.append(hsp_dict) + # Store the hsps into the alignment data + aligns.append(hsps) + + # Store alignment data into the dictionary + result_dict['alignments', aligns] + return result_dict + + +# Abstract data from BLAST responded XML (text version) +# input: the searching result file from BLAST in StringIO +# return: an dictionary include all data from BLAST result. +def read_blast_result_text(blast_handle): + + blast_record = NCBIXML.read(blast_handle) + + E_VALUE_THRESH = 0.04 + + result_dict = dict() + + result_dict['hits'] = len(blast_record.alignments) + aligns = list() + for alignment in blast_record.alignments: + hsps = list() + for hsp in alignment.hsps: + + if hsp.expect < E_VALUE_THRESH: + hsp_dict = dict() + hsp_dict['seq'] = alignment.title + hsp_dict['len'] = alignment.length + hsp_dict['eval'] = hsp.expect + hsp_dict['ident'] = int(round(hsp.identities * float(100) / alignment.length)) + hsp_dict['query'] = hsp.query + hsp_dict['match'] = hsp.match + hsp_dict['sbjct'] = hsp.sbjct + hsps.append(hsp_dict) + aligns.append(hsps) + + result_dict['alignments'] = aligns + return result_dict + + +# Sample of usage (text version) +def text(): + result = blast_call_text('MTSQTTINKPGPGDGSPAGSAPAKGWRGHPWVTLVTVAVGVMMVALDGTIVAIANPAIQD\ + DLDASLADVQWITNAYFLALAVALITAGKLGDRFGHRQTFLIGVAGFAASSGAIGLSGSI\ + AAVIVFRVFQGLFGALLMPAALGLLRATFPAEKLNMAIGIWGMVIGASTAGGPILGGVLV\ + EHVNWQSVFFINVPVGIVAVVLGVMILLDHRAANAPRSFDVVGIVLLSASMFALVWALIK\ + APEWGWGSGQTWVYIGGSVVGFVLFSVWETKVKEPLIPLAMFRSVPLSAGVVLMVLMAIA\ + FMGGLFFVTFYLQNVHGMSPVDAGLHLLPLTGMMIVASPLAGAMITKVGPRIPLAGGMVC\ + TAVAMFGISTLETDTGSGLMSIWFGLLGLGLAPVMVGATEVIVGNAPMELSGVAGGLQQA\ + AMQIGGSLGTAVLGAVMASKVDSDLAGNWKDAGLPELTPQQADQASEAVRVGVPPVAPGT\ + PAEVAGKITDVAHDTFISGMSLASLVAAGVAVVAVFVAFLTKRGENAEAGAGVGHI' + ) + re_dict = read_blast_result_text(result) + print(re_dict['hits']) + for alignments in re_dict['alignments']: + for hsp in alignments: + print(hsp['seq']) + print(hsp['len']) + print(hsp['eval']) + print(hsp['ident']) + print(hsp['query'][0:75] + "...") + print(hsp['match'][0:75] + "...") + print(hsp['sbjct'][0:75] + "...") + + +