view rps2tsv.py @ 1:84d20d9c292d draft

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/virAnnot commit 48a3a1fa033745898a131f9dca7132947683c02f
author iuc
date Sat, 18 May 2024 18:14:29 +0000
parents bbaa89f070f4
children fd7104249a3c
line wrap: on
line source

#!/usr/bin/env python3


# Name: rps2ecsv
# Author: Marie Lefebvre - INRAE
# Aims: Convert rpsblast xml output to csv and add taxonomy


import argparse
import json
import logging as log
from urllib import request
from urllib.error import HTTPError, URLError

from Bio.Blast import NCBIXML
from ete3 import NCBITaxa

ncbi = NCBITaxa()


def main():
    options = _set_options()
    _set_log_level(options.verbosity)
    hits = _read_xml(options)
    _write_tsv(options, hits)


def _read_xml(options):
    """
    Parse XML RPSblast results file
    """
    log.info("Read XML file " + options.xml_file)
    xml = open(options.xml_file, 'r')
    records = NCBIXML.parse(xml)
    xml_results = {}
    for blast_record in records:
        for aln in blast_record.alignments:
            for hit in aln.hsps:
                hsp = {}
                hit_evalue = hit.expect
                if hit_evalue > options.max_evalue:
                    continue
                hit_frame = hit.frame[0]  # frame
                hit_evalue = hit.expect  # evalue
                hit_startQ = hit.query_start
                hit_endQ = hit.query_end
            hsp["frame"] = hit_frame
            hsp["evalue"] = hit_evalue
            hsp["startQ"] = hit_startQ
            hsp["endQ"] = hit_endQ
            hsp["query_id"] = blast_record.query_id
            hsp["cdd_id"] = aln.hit_def.split(",")[0]
            hsp["hit_id"] = aln.hit_id
            hsp["query_length"] = blast_record.query_length  # length of the query
            hsp["description"] = aln.hit_def
            hsp["accession"] = aln.accession
            hsp["pfam_id"] = hsp["description"].split(",")[0].replace("pfam", "PF")
            log.info("Requeting Interpro for " + hsp["pfam_id"])
            url = "https://www.ebi.ac.uk/interpro/api/entry/pfam/" + hsp["pfam_id"] + "/taxonomy/uniprot/"
            req = request.Request(url)
            try:
                response = request.urlopen(req)
            except HTTPError as e:
                log.debug('Http error for interpro: ', e.code)
            except URLError as e:
                log.debug('Url error for interpro: ', e.reason)
            else:
                encoded_response = response.read()
                decoded_response = encoded_response.decode()
                payload = json.loads(decoded_response)
                kingdoms = []
                for item in payload["taxonomy_subset"]:
                    lineage_string = item["lineage"]
                    lineage = [int(i) for i in lineage_string]
                    translation = ncbi.get_taxid_translator(lineage)
                    names = list(translation.values())
                    taxonomy = names[1:]  # remove 'root' at the begining
                    kingdoms.append(taxonomy[0])
                frequency = {kingdom: kingdoms.count(kingdom) for kingdom in kingdoms}  # {'Pseudomonadota': 9, 'cellular organisms': 4}
                sorted_freq = dict(sorted(frequency.items(), key=lambda x: x[1], reverse=True))
                concat_freq = ";".join("{}({})".format(k, v) for k, v in sorted_freq.items())
                hsp["taxonomy"] = concat_freq
            xml_results[hsp["query_id"]] = hsp
    return xml_results


def _write_tsv(options, hits):
    """
    Write output
    """
    log.info("Write output file " + options.output)
    headers = "#query_id\tquery_length\tcdd_id\thit_id\tevalue\tstartQ\tendQ\tframe\tdescription\tsuperkingdom\n"
    f = open(options.output, "w+")
    f.write(headers)
    for h in hits:
        f.write(h + "\t" + str(hits[h]["query_length"]) + "\t")
        f.write(hits[h]["cdd_id"] + "\t" + hits[h]["hit_id"] + "\t" + str(hits[h]["evalue"]) + "\t")
        f.write(str(hits[h]["startQ"]) + "\t" + str(hits[h]["endQ"]) + "\t" + str(hits[h]["frame"]) + "\t")
        f.write(hits[h]["description"] + "\t" + hits[h]["taxonomy"])
        f.write("\n")
    f.close()


def _set_options():
    parser = argparse.ArgumentParser()
    parser.add_argument('-x', '--xml', help='XML files with results of blast', action='store', required=True, dest='xml_file')
    parser.add_argument('-e', '--max_evalue', help='Max evalue', action='store', type=float, default=0.0001, dest='max_evalue')
    parser.add_argument('-o', '--out', help='The output file (.tab).', action='store', type=str, default='./rps2tsv_output.tab', dest='output')
    parser.add_argument('-v', '--verbosity', help='Verbose level', action='store', type=int, choices=[1, 2, 3, 4], default=1)
    args = parser.parse_args()
    return args


def _set_log_level(verbosity):
    if verbosity == 1:
        log_format = '%(asctime)s %(levelname)-8s %(message)s'
        log.basicConfig(level=log.INFO, format=log_format)
    elif verbosity == 3:
        log_format = '%(filename)s:%(lineno)s - %(asctime)s %(levelname)-8s %(message)s'
        log.basicConfig(level=log.DEBUG, format=log_format)


if __name__ == "__main__":
    main()