Mercurial > repos > earlhaminst > ete
view ete_lineage_generator.py @ 14:d40b9a7debe5 draft default tip
planemo upload for repository https://github.com/TGAC/earlham-galaxytools/tree/master/tools/ete commit 71b9c1035f713be174bfcf5ecb20804495f39258
author | earlhaminst |
---|---|
date | Thu, 07 Mar 2024 19:39:30 +0000 |
parents | 2db72467da51 |
children |
line wrap: on
line source
import optparse import sys from ete3 import NCBITaxa # - compared to gi2taxonomy the root is excluded, since # the value is always "root", i.e. useless information # - additional levels that appear in the ncbi taxdb have # been added # (order from https://en.wikipedia.org/wiki/Taxonomic_rank#All_ranks) # TODO the full list of ranks could be derived from the input DB LONG_RANKS = [u"superkingdom", u"kingdom", u"subkingdom", u"superphylum", u"phylum", u"subphylum", u"superclass", u"class", u"subclass", "infraclass", u"cohort", u"superorder", u"order", u"suborder", u"infraorder", u"parvorder", u"superfamily", u"family", u"subfamily", u"tribe", u"subtribe", u"genus", u"subgenus", u"species group", u"species subgroup", u"species", u"subspecies", u"varietas", "forma"] SHORT_RANKS = [u"kingdom", u"phylum", u"class", u"order", u"family", u"genus", u"species"] def process_taxid(ncbi, taxid, ranks, RANK_IDX, lower=False): """ process one taxid: - get lineage (as list of taxids, ranks, and names) - reverse the lineage if lower ranks are to be used for filling - fill the ranks with the data from the lineage ncbi: ete NCBITaxa object taxid: a taxid (int) ranks: list of ranks (should be initialized with "NA" x number of levels of interest) RANK_IDX: mapping from rank names to indices (distance to root/leaf?) lower: use lower taxa for filling "NA"s """ try: lineage_taxids = ncbi.get_lineage(taxid) except ValueError: sys.stderr.write("[%s] could not determine lineage!\n" % taxid) return if lineage_taxids is None: sys.stderr.write("[%s] could not determine lineage!\n" % taxid) return lineage_ranks = ncbi.get_rank(lineage_taxids) lineage_names = ncbi.get_taxid_translator(lineage_taxids, try_synonyms=True) if lower: lineage_taxids.reverse() for parent_taxid in lineage_taxids: parent_rank = lineage_ranks[parent_taxid] if parent_rank not in RANK_IDX: continue parent_rank_index = RANK_IDX[parent_rank] if ranks[parent_rank_index] != "NA": continue ranks[parent_rank_index] = lineage_names[parent_taxid] # get command line options parser = optparse.OptionParser() parser.add_option('-s', '--species', dest="input_species_filename", help='Species/taxid list in text format one species in each line') parser.add_option('-d', '--database', dest="database", default=None, help='ETE sqlite data base to use (default: ~/.etetoolkit/taxa.sqlite)') parser.add_option('-o', '--output', dest="output", help='output file name (default: stdout)') parser.add_option('-f', dest="full", action="store_true", default=False, help='Show all available (named) taxonomic ranks (default: only primary levels)') parser.add_option('-c', dest="compress", action="store_true", default=False, help='Fill unnamed ranks with super/sub ranks (see -l)') parser.add_option('-l', dest="lower", action="store_true", default=False, help='Prefer lower levels when compressed') parser.add_option('-r', '--rank', dest='ranks', action="append", help='include rank - multiple ones can be specified') parser.add_option('-i', '--includeid', dest="addid", action="store_true", default=False, help='add taxid column') options, args = parser.parse_args() # check command line options if options.input_species_filename is None: parser.error("-s option must be specified, Species list in text format one species in each line") if options.full and options.ranks: parser.error("-f and -r can not be used at the same time") if options.ranks: for r in options.ranks: if r not in LONG_RANKS: parser.error("unknown rank %s" % r) # setup output if not options.output: # if filename is not given of = sys.stdout else: of = open(options.output, "w") # load NCBI taxonomy DB ncbi = NCBITaxa(dbfile=options.database) # get list of ranks that are of interest if options.ranks: RANKS = [] for r in LONG_RANKS: if r in options.ranks: RANKS.append(r) else: if options.full: RANKS = LONG_RANKS else: RANKS = SHORT_RANKS RANK_IDX = {item: index for index, item in enumerate(RANKS)} COMP_RANK_IDX = RANK_IDX if options.compress: for ir in range(len(RANKS)): for ilr in range(len(LONG_RANKS)): if RANKS[ir] in LONG_RANKS[ilr]: COMP_RANK_IDX[LONG_RANKS[ilr]] = ir # write header of.write("# query") if options.addid: of.write("\ttaxid") of.write("\t%s\n" % ("\t".join(RANKS))) # get and write data with open(options.input_species_filename) as f: for line in f.readlines(): line = line.strip().replace('_', ' ') if line == "": continue try: taxid = int(line) except ValueError: # TODO: one could use fuzzy name lookup (i.e. accept typos in the species names), # but then a pysqlite version that supports this is needed (needs to be enabled # during compilation) name2tax = ncbi.get_name_translator([line]) if line in name2tax: taxid = name2tax[line][0] else: sys.stderr.write("[%s] could not be translated into a taxid!\n" % line) continue ranks = ["NA"] * len(RANKS) process_taxid(ncbi, taxid, ranks, RANK_IDX) if options.compress: process_taxid(ncbi, taxid, ranks, COMP_RANK_IDX, options.lower) of.write("%s" % line) if options.addid: of.write("\t%d" % taxid) of.write("\t%s\n" % "\t".join(ranks)) of.close()