Mercurial > repos > workflow4metabolomics > kmd_hmdb_data_plot
diff kmd_hmdb_interrogator.py @ 0:59c8bad5f6bc draft default tip
planemo upload for repository https://github.com/workflow4metabolomics/tools-metabolomics/blob/master/tools/kmd_hmdb_data_plot/ commit 7fa454b6a4268b89fe18043e8dd10f30a7b4c7ca
author | workflow4metabolomics |
---|---|
date | Tue, 29 Aug 2023 09:45:16 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/kmd_hmdb_interrogator.py Tue Aug 29 09:45:16 2023 +0000 @@ -0,0 +1,328 @@ +#!/usr/bin/env python3 + +import csv +import operator + +import click + +import kmd_hmdb_api_client.client +from kmd_hmdb_api_client.api.default import ( + api_annotation_get, + api_compound_find, + api_taxonomy_get, +) + +__version__ = "1.0.0" + + +kmd_hmdb_client = kmd_hmdb_api_client.client.Client( + "https://kmd-hmdb-rest-api.metabolomics-chopin.e-metabohub.fr", + verify_ssl=False, + timeout=500, +) + +find_compound = ( + lambda *args, **kwargs: + api_compound_find.sync(*args, **kwargs, client=kmd_hmdb_client) +) +get_taxonomy = ( + lambda *args, **kwargs: + api_taxonomy_get.sync(*args, **kwargs, client=kmd_hmdb_client) +) +get_annotation = ( + lambda *args, **kwargs: + api_annotation_get.sync(*args, **kwargs, client=kmd_hmdb_client) +) + +positive_adducts = [ + "M+H", + "M+2H", + "M+H+NH4", + "M+H+Na", + "M+H+K", + "M+ACN+2H", + "M+2Na", + "M+H-2H2O", + "M+H-H2O", + "M+NH4", + "M+Na", + "M+CH3OH+H", + "M+K", + "M+ACN+H", + "M+2Na-H", + "M+IsoProp+H", + "M+ACN+Na", + "M+2K+H", + "M+DMSO+H", + "M+2ACN+H", + "2M+H", + "2M+NH4", + "2M+Na", + "2M+K", +] + +negative_adducts = [ + "M-H", + "M-2H", + "M-H2O-H", + "M+Cl", + "M+FA-H", + "M+Hac-H", + "M-H+HCOONa", + "M+Br", + "M+TFA-H", + "2M-H", + "2M+FA-H", + "2M+Hac-H", +] + +adduct_choices = positive_adducts + negative_adducts + +taxonomy_column_choices = [ + "class", + "kingdom", + "molecular_framework", + "sub_class", + "super_class", + "id", +] + +annotation_column_choices = [ + "adduct", + "kendricks_mass", + "kendricks_mass_defect", + "monisotopic_molecular_weight", + "nominal_mass", + "polarity", + "annotation_id", +] + +compound_column_choices = [ + + "database", + "metabolite_name", + "chemical_formula", + "hmdb_id", + "inchikey", + "compound_id", +] + annotation_column_choices + + +@click.group() +def cli(): + pass + + +@cli.command(help="") +@click.option( + "--version", + is_flag=True, +) +@click.option( + "--mz-ratio", + default=[303.05], + show_default=True, + multiple=True, + help="Provide the mz-ratio." +) +@click.option( + "--database", + default=["farid"], + show_default=True, + multiple=True, + help="Provide the database." +) +@click.option( + "--mass-tolerance", + default=10.5, + show_default=True, + help="Provide the mass-tolerance." +) +@click.option( + "--adducts", + default=["M+H"], + type=click.Choice(adduct_choices), + multiple=True, + show_default=True, + show_choices=False, + help="Provide the adducts." +) +@click.option( + "--columns", + default=compound_column_choices[:], + type=click.Choice(compound_column_choices), + multiple=True, + show_default=True, + show_choices=False, + help="Provide the outputed columns." +) +@click.option( + "--output-path", + help="Provide the output path." +) +def compound(*args, **kwargs): + + if kwargs.pop("version"): + print(__version__) + exit(0) + + adducts = kwargs.pop("adducts") + polarity = get_polarity(adducts) + + other_kwargs, compound_kwargs = build_kwargs( + adducts=adducts, + polarity=polarity, + **kwargs + ) + columns = other_kwargs["columns"] + result = find_compound(**compound_kwargs) + result = explode_compounds( + result, + with_annotations=any(map( + columns.__contains__, + annotation_column_choices + )) + ) + check_columns_in_result(result, columns) + output_csv_result( + result, + columns, + other_kwargs.get("output_path"), + delimiter="\t", + ) + + +def explode_compounds(result, with_annotations): + if with_annotations: + return [{ + "database": cpd.database, + "metabolite_name": cpd.metabolite_name, + "chemical_formula": cpd.chemical_formula, + "hmdb_id": cpd.hmdb_id, + "inchikey": cpd.inchikey, + "compound_id": cpd.id, + "adduct": annotation.name, + "kendricks_mass": annotation.kendricks_mass, + "kendricks_mass_defect": annotation.kendricks_mass_defect, + "monisotopic_molecular_weight": + annotation.monisotopic_molecular_weight, + "nominal_mass": annotation.nominal_mass, + "polarity": annotation.polarity, + "annotation_id": annotation.id, + } + for cpd in result + for annotation in cpd.annotations + ] + else: + return [{ + "database": cpd.database, + "metabolite_name": cpd.metabolite_name, + "chemical_formula": cpd.chemical_formula, + "hmdb_id": cpd.hmdb_id, + "inchikey": cpd.inchikey, + "compound_id": cpd.id, + } + for cpd in result + ] + + +@cli.command(help="") +@click.option( + "--id", + type=int, + help="Provide the wanted annotation's id." +) +@click.option( + "--columns", + default=annotation_column_choices[:], + type=click.Choice(annotation_column_choices), + multiple=True, + show_default=True, + show_choices=False, + help="Provide the outputed columns." +) +@click.option( + "--output-path", + help="Provide the output path." +) +def annotation(*args, **kwargs): + result = get_annotation(id=kwargs.pop("id")) + result = [result] + columns = kwargs["columns"] + check_columns_in_result(result, columns) + output_csv_result( + result, + columns, + kwargs.get("output_path") + ) + + +def get_polarity(adducts): + if any(map(positive_adducts.__contains__, adducts)): + return "positive" + if any(map(negative_adducts.__contains__, adducts)): + return "negative" + # polarity = [] + # if any(map(positive_adducts.__contains__, adducts)): + # polarity.append("positive") + # if any(map(negative_adducts.__contains__, adducts)): + # polarity.append("negative") + + +def build_kwargs(**kwargs): + for original, replacement in ( + ("database", "database_list"), + ("polarity", "polarity_list"), + ): + if original in kwargs: + kwargs[replacement] = kwargs.pop(original) + other_kwargs = { + other_arg: kwargs.pop(other_arg) + for other_arg in ("columns", "output_path", "with_annotations") + if other_arg in kwargs + } + return other_kwargs, kwargs + + +def check_columns_in_result(result, columns): + if not result: + return + if not isinstance(result[0], dict): + result = [item.to_dict() for item in result] + keys = result[0].keys() + missing = [ + column for column in columns + if column not in keys + ] + if missing: + if len(missing) == 1: + raise ValueError( + f"Could not find the column {missing[0]} in the results." + ) + else: + raise ValueError( + "Could not find any of the columns " + + ','.join(missing) + + " in the results." + ) + + +def output_csv_result(result, columns, output_path, **csv_parameters): + if not output_path: + raise ValueError("Missing output path. Cannot output CSV results.") + with open(output_path, mode="w", newline='') as output_file: + writer = csv.writer(output_file, **csv_parameters) + write_result(result, columns, writer) + + +def write_result(result, columns, writer): + getters = list(map(operator.itemgetter, columns)) + writer.writerow(columns) + writer.writerows( + (getter(compound) for getter in getters) + for compound in result + ) + + +if __name__ == "__main__": + cli()