Mercurial > repos > iuc > virannot_otu
view rps2tsv.py @ 4:adcf06db3030 draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/virAnnot commit 7036ce0e06b6dc64332b1a5642fc58928523c5c6
author | iuc |
---|---|
date | Tue, 13 May 2025 11:52:27 +0000 |
parents | 40fb54cc6628 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Name: rps2ecsv # Author: Marie Lefebvre - INRAE # Aims: Convert rpsblast xml output to csv and add taxonomy """Module which converts rpsblast xml output to tsv and add taxonomy""" import argparse import json import logging as log from urllib import request from urllib.error import HTTPError, URLError from Bio.Blast import NCBIXML from ete3 import NCBITaxa ncbi = NCBITaxa() def main(): """ Main function """ options = _set_options() _set_log_level(options.verbosity) hits = _read_xml(options) _write_tsv(options, hits) def _read_xml(options): """ Parse XML RPSblast results file """ log.info("Read XML file " + options.xml_file) xml = open(options.xml_file, 'r') records = NCBIXML.parse(xml) xml_results = {} for blast_record in records: for aln in blast_record.alignments: for hit in aln.hsps: hsp = {} hit_evalue = hit.expect if hit_evalue > options.max_evalue: continue hit_frame = hit.frame[0] # frame hit_evalue = hit.expect # evalue hit_startQ = hit.query_start hit_endQ = hit.query_end hit_identity = hit.identities hit_aln_length = hit.align_length pident = "%0.3f" % (100 * float(hit_identity) / float(hit_aln_length)) if float(pident) < 0.1: continue hsp["pident"] = pident hsp["frame"] = hit_frame hsp["evalue"] = hit_evalue hsp["startQ"] = hit_startQ hsp["endQ"] = hit_endQ hsp["query_id"] = blast_record.query hsp["cdd_id"] = aln.hit_def.split(",")[0] hsp["hit_id"] = aln.hit_id hsp["query_length"] = blast_record.query_length # length of the query hsp["description"] = aln.hit_def hsp["accession"] = aln.accession hsp["pfam_id"] = hsp["description"].split(",")[0].replace("pfam", "PF") log.info("Requeting Interpro for " + hsp["pfam_id"]) url = "https://www.ebi.ac.uk/interpro/api/taxonomy/uniprot/entry/pfam/" + hsp["pfam_id"] req = request.Request(url) try: response = request.urlopen(req) except HTTPError as e: log.debug('Http error for interpro: ', e.code) except URLError as e: log.debug('Url error for interpro: ', e.reason) else: encoded_response = response.read() decoded_response = encoded_response.decode() payload = json.loads(decoded_response) kingdoms = [] for item in payload["results"][:6]: if item["metadata"]["parent"] is not None: lineage_parent = item["metadata"]["parent"] translation = ncbi.get_taxid_translator([int(lineage_parent)]) names = list(translation.values()) if len(names) > 0: if names[0] == "root": taxonomy = names[1:] # remove 'root' at the begining else: taxonomy = names else: taxonomy = names if len(taxonomy) != 0: kingdoms.append(taxonomy[0]) # {'Pseudomonadota': 9, 'cellular organisms': 4} frequency = {kingdom: kingdoms.count(kingdom) for kingdom in kingdoms} sorted_freq = dict(sorted(frequency.items(), key=lambda x: x[1], reverse=True)) concat_freq = ";".join("{}({})".format(k, v) for k, v in sorted_freq.items()) hsp["taxonomy"] = concat_freq xml_results[hsp["query_id"]] = hsp return xml_results def _write_tsv(options, hits): """ Write output """ log.info("Write output file " + options.output) headers = "#query_id\tquery_length\tcdd_id\thit_id\tevalue\tstartQ\tendQ\tframe\tdescription\tsuperkingdom\tpident\n" f = open(options.output, "w+") f.write(headers) for h in hits: f.write(h + "\t" + str(hits[h]["query_length"]) + "\t") f.write(hits[h]["cdd_id"] + "\t" + hits[h]["hit_id"] + "\t" + str(hits[h]["evalue"]) + "\t") f.write(str(hits[h]["startQ"]) + "\t" + str(hits[h]["endQ"]) + "\t" + str(hits[h]["frame"]) + "\t") f.write(hits[h]["description"] + "\t" + hits[h]["taxonomy"] + "\t" + hits[h]["pident"]) f.write("\n") f.close() def _set_options(): """ Script parameters """ parser = argparse.ArgumentParser() parser.add_argument('-x', '--xml', help='XML files with results of blast', action='store', required=True, dest='xml_file') parser.add_argument('-e', '--max_evalue', help='Max evalue', action='store', type=float, default=0.0001, dest='max_evalue') parser.add_argument('-o', '--out', help='The output file (.tab).', action='store', type=str, default='./rps2tsv_output.tab', dest='output') parser.add_argument('-v', '--verbosity', help='Verbose level', action='store', type=int, choices=[1, 2, 3, 4], default=1) args = parser.parse_args() return args def _set_log_level(verbosity): """ Debbug """ if verbosity == 1: log_format = '%(asctime)s %(levelname)-8s %(message)s' log.basicConfig(level=log.INFO, format=log_format) elif verbosity == 3: log_format = '%(filename)s:%(lineno)s - %(asctime)s %(levelname)-8s %(message)s' log.basicConfig(level=log.DEBUG, format=log_format) if __name__ == "__main__": main()