comparison kmd_hmdb_interrogator.py @ 0:59c8bad5f6bc draft default tip

planemo upload for repository https://github.com/workflow4metabolomics/tools-metabolomics/blob/master/tools/kmd_hmdb_data_plot/ commit 7fa454b6a4268b89fe18043e8dd10f30a7b4c7ca
author workflow4metabolomics
date Tue, 29 Aug 2023 09:45:16 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:59c8bad5f6bc
1 #!/usr/bin/env python3
2
3 import csv
4 import operator
5
6 import click
7
8 import kmd_hmdb_api_client.client
9 from kmd_hmdb_api_client.api.default import (
10 api_annotation_get,
11 api_compound_find,
12 api_taxonomy_get,
13 )
14
15 __version__ = "1.0.0"
16
17
18 kmd_hmdb_client = kmd_hmdb_api_client.client.Client(
19 "https://kmd-hmdb-rest-api.metabolomics-chopin.e-metabohub.fr",
20 verify_ssl=False,
21 timeout=500,
22 )
23
24 find_compound = (
25 lambda *args, **kwargs:
26 api_compound_find.sync(*args, **kwargs, client=kmd_hmdb_client)
27 )
28 get_taxonomy = (
29 lambda *args, **kwargs:
30 api_taxonomy_get.sync(*args, **kwargs, client=kmd_hmdb_client)
31 )
32 get_annotation = (
33 lambda *args, **kwargs:
34 api_annotation_get.sync(*args, **kwargs, client=kmd_hmdb_client)
35 )
36
37 positive_adducts = [
38 "M+H",
39 "M+2H",
40 "M+H+NH4",
41 "M+H+Na",
42 "M+H+K",
43 "M+ACN+2H",
44 "M+2Na",
45 "M+H-2H2O",
46 "M+H-H2O",
47 "M+NH4",
48 "M+Na",
49 "M+CH3OH+H",
50 "M+K",
51 "M+ACN+H",
52 "M+2Na-H",
53 "M+IsoProp+H",
54 "M+ACN+Na",
55 "M+2K+H",
56 "M+DMSO+H",
57 "M+2ACN+H",
58 "2M+H",
59 "2M+NH4",
60 "2M+Na",
61 "2M+K",
62 ]
63
64 negative_adducts = [
65 "M-H",
66 "M-2H",
67 "M-H2O-H",
68 "M+Cl",
69 "M+FA-H",
70 "M+Hac-H",
71 "M-H+HCOONa",
72 "M+Br",
73 "M+TFA-H",
74 "2M-H",
75 "2M+FA-H",
76 "2M+Hac-H",
77 ]
78
79 adduct_choices = positive_adducts + negative_adducts
80
81 taxonomy_column_choices = [
82 "class",
83 "kingdom",
84 "molecular_framework",
85 "sub_class",
86 "super_class",
87 "id",
88 ]
89
90 annotation_column_choices = [
91 "adduct",
92 "kendricks_mass",
93 "kendricks_mass_defect",
94 "monisotopic_molecular_weight",
95 "nominal_mass",
96 "polarity",
97 "annotation_id",
98 ]
99
100 compound_column_choices = [
101
102 "database",
103 "metabolite_name",
104 "chemical_formula",
105 "hmdb_id",
106 "inchikey",
107 "compound_id",
108 ] + annotation_column_choices
109
110
111 @click.group()
112 def cli():
113 pass
114
115
116 @cli.command(help="")
117 @click.option(
118 "--version",
119 is_flag=True,
120 )
121 @click.option(
122 "--mz-ratio",
123 default=[303.05],
124 show_default=True,
125 multiple=True,
126 help="Provide the mz-ratio."
127 )
128 @click.option(
129 "--database",
130 default=["farid"],
131 show_default=True,
132 multiple=True,
133 help="Provide the database."
134 )
135 @click.option(
136 "--mass-tolerance",
137 default=10.5,
138 show_default=True,
139 help="Provide the mass-tolerance."
140 )
141 @click.option(
142 "--adducts",
143 default=["M+H"],
144 type=click.Choice(adduct_choices),
145 multiple=True,
146 show_default=True,
147 show_choices=False,
148 help="Provide the adducts."
149 )
150 @click.option(
151 "--columns",
152 default=compound_column_choices[:],
153 type=click.Choice(compound_column_choices),
154 multiple=True,
155 show_default=True,
156 show_choices=False,
157 help="Provide the outputed columns."
158 )
159 @click.option(
160 "--output-path",
161 help="Provide the output path."
162 )
163 def compound(*args, **kwargs):
164
165 if kwargs.pop("version"):
166 print(__version__)
167 exit(0)
168
169 adducts = kwargs.pop("adducts")
170 polarity = get_polarity(adducts)
171
172 other_kwargs, compound_kwargs = build_kwargs(
173 adducts=adducts,
174 polarity=polarity,
175 **kwargs
176 )
177 columns = other_kwargs["columns"]
178 result = find_compound(**compound_kwargs)
179 result = explode_compounds(
180 result,
181 with_annotations=any(map(
182 columns.__contains__,
183 annotation_column_choices
184 ))
185 )
186 check_columns_in_result(result, columns)
187 output_csv_result(
188 result,
189 columns,
190 other_kwargs.get("output_path"),
191 delimiter="\t",
192 )
193
194
195 def explode_compounds(result, with_annotations):
196 if with_annotations:
197 return [{
198 "database": cpd.database,
199 "metabolite_name": cpd.metabolite_name,
200 "chemical_formula": cpd.chemical_formula,
201 "hmdb_id": cpd.hmdb_id,
202 "inchikey": cpd.inchikey,
203 "compound_id": cpd.id,
204 "adduct": annotation.name,
205 "kendricks_mass": annotation.kendricks_mass,
206 "kendricks_mass_defect": annotation.kendricks_mass_defect,
207 "monisotopic_molecular_weight":
208 annotation.monisotopic_molecular_weight,
209 "nominal_mass": annotation.nominal_mass,
210 "polarity": annotation.polarity,
211 "annotation_id": annotation.id,
212 }
213 for cpd in result
214 for annotation in cpd.annotations
215 ]
216 else:
217 return [{
218 "database": cpd.database,
219 "metabolite_name": cpd.metabolite_name,
220 "chemical_formula": cpd.chemical_formula,
221 "hmdb_id": cpd.hmdb_id,
222 "inchikey": cpd.inchikey,
223 "compound_id": cpd.id,
224 }
225 for cpd in result
226 ]
227
228
229 @cli.command(help="")
230 @click.option(
231 "--id",
232 type=int,
233 help="Provide the wanted annotation's id."
234 )
235 @click.option(
236 "--columns",
237 default=annotation_column_choices[:],
238 type=click.Choice(annotation_column_choices),
239 multiple=True,
240 show_default=True,
241 show_choices=False,
242 help="Provide the outputed columns."
243 )
244 @click.option(
245 "--output-path",
246 help="Provide the output path."
247 )
248 def annotation(*args, **kwargs):
249 result = get_annotation(id=kwargs.pop("id"))
250 result = [result]
251 columns = kwargs["columns"]
252 check_columns_in_result(result, columns)
253 output_csv_result(
254 result,
255 columns,
256 kwargs.get("output_path")
257 )
258
259
260 def get_polarity(adducts):
261 if any(map(positive_adducts.__contains__, adducts)):
262 return "positive"
263 if any(map(negative_adducts.__contains__, adducts)):
264 return "negative"
265 # polarity = []
266 # if any(map(positive_adducts.__contains__, adducts)):
267 # polarity.append("positive")
268 # if any(map(negative_adducts.__contains__, adducts)):
269 # polarity.append("negative")
270
271
272 def build_kwargs(**kwargs):
273 for original, replacement in (
274 ("database", "database_list"),
275 ("polarity", "polarity_list"),
276 ):
277 if original in kwargs:
278 kwargs[replacement] = kwargs.pop(original)
279 other_kwargs = {
280 other_arg: kwargs.pop(other_arg)
281 for other_arg in ("columns", "output_path", "with_annotations")
282 if other_arg in kwargs
283 }
284 return other_kwargs, kwargs
285
286
287 def check_columns_in_result(result, columns):
288 if not result:
289 return
290 if not isinstance(result[0], dict):
291 result = [item.to_dict() for item in result]
292 keys = result[0].keys()
293 missing = [
294 column for column in columns
295 if column not in keys
296 ]
297 if missing:
298 if len(missing) == 1:
299 raise ValueError(
300 f"Could not find the column {missing[0]} in the results."
301 )
302 else:
303 raise ValueError(
304 "Could not find any of the columns "
305 + ','.join(missing)
306 + " in the results."
307 )
308
309
310 def output_csv_result(result, columns, output_path, **csv_parameters):
311 if not output_path:
312 raise ValueError("Missing output path. Cannot output CSV results.")
313 with open(output_path, mode="w", newline='') as output_file:
314 writer = csv.writer(output_file, **csv_parameters)
315 write_result(result, columns, writer)
316
317
318 def write_result(result, columns, writer):
319 getters = list(map(operator.itemgetter, columns))
320 writer.writerow(columns)
321 writer.writerows(
322 (getter(compound) for getter in getters)
323 for compound in result
324 )
325
326
327 if __name__ == "__main__":
328 cli()