Mercurial > repos > workflow4metabolomics > kmd_hmdb_data_plot
comparison kmd_hmdb_interrogator.py @ 0:59c8bad5f6bc draft default tip
planemo upload for repository https://github.com/workflow4metabolomics/tools-metabolomics/blob/master/tools/kmd_hmdb_data_plot/ commit 7fa454b6a4268b89fe18043e8dd10f30a7b4c7ca
author | workflow4metabolomics |
---|---|
date | Tue, 29 Aug 2023 09:45:16 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:59c8bad5f6bc |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 import csv | |
4 import operator | |
5 | |
6 import click | |
7 | |
8 import kmd_hmdb_api_client.client | |
9 from kmd_hmdb_api_client.api.default import ( | |
10 api_annotation_get, | |
11 api_compound_find, | |
12 api_taxonomy_get, | |
13 ) | |
14 | |
15 __version__ = "1.0.0" | |
16 | |
17 | |
18 kmd_hmdb_client = kmd_hmdb_api_client.client.Client( | |
19 "https://kmd-hmdb-rest-api.metabolomics-chopin.e-metabohub.fr", | |
20 verify_ssl=False, | |
21 timeout=500, | |
22 ) | |
23 | |
24 find_compound = ( | |
25 lambda *args, **kwargs: | |
26 api_compound_find.sync(*args, **kwargs, client=kmd_hmdb_client) | |
27 ) | |
28 get_taxonomy = ( | |
29 lambda *args, **kwargs: | |
30 api_taxonomy_get.sync(*args, **kwargs, client=kmd_hmdb_client) | |
31 ) | |
32 get_annotation = ( | |
33 lambda *args, **kwargs: | |
34 api_annotation_get.sync(*args, **kwargs, client=kmd_hmdb_client) | |
35 ) | |
36 | |
37 positive_adducts = [ | |
38 "M+H", | |
39 "M+2H", | |
40 "M+H+NH4", | |
41 "M+H+Na", | |
42 "M+H+K", | |
43 "M+ACN+2H", | |
44 "M+2Na", | |
45 "M+H-2H2O", | |
46 "M+H-H2O", | |
47 "M+NH4", | |
48 "M+Na", | |
49 "M+CH3OH+H", | |
50 "M+K", | |
51 "M+ACN+H", | |
52 "M+2Na-H", | |
53 "M+IsoProp+H", | |
54 "M+ACN+Na", | |
55 "M+2K+H", | |
56 "M+DMSO+H", | |
57 "M+2ACN+H", | |
58 "2M+H", | |
59 "2M+NH4", | |
60 "2M+Na", | |
61 "2M+K", | |
62 ] | |
63 | |
64 negative_adducts = [ | |
65 "M-H", | |
66 "M-2H", | |
67 "M-H2O-H", | |
68 "M+Cl", | |
69 "M+FA-H", | |
70 "M+Hac-H", | |
71 "M-H+HCOONa", | |
72 "M+Br", | |
73 "M+TFA-H", | |
74 "2M-H", | |
75 "2M+FA-H", | |
76 "2M+Hac-H", | |
77 ] | |
78 | |
79 adduct_choices = positive_adducts + negative_adducts | |
80 | |
81 taxonomy_column_choices = [ | |
82 "class", | |
83 "kingdom", | |
84 "molecular_framework", | |
85 "sub_class", | |
86 "super_class", | |
87 "id", | |
88 ] | |
89 | |
90 annotation_column_choices = [ | |
91 "adduct", | |
92 "kendricks_mass", | |
93 "kendricks_mass_defect", | |
94 "monisotopic_molecular_weight", | |
95 "nominal_mass", | |
96 "polarity", | |
97 "annotation_id", | |
98 ] | |
99 | |
100 compound_column_choices = [ | |
101 | |
102 "database", | |
103 "metabolite_name", | |
104 "chemical_formula", | |
105 "hmdb_id", | |
106 "inchikey", | |
107 "compound_id", | |
108 ] + annotation_column_choices | |
109 | |
110 | |
111 @click.group() | |
112 def cli(): | |
113 pass | |
114 | |
115 | |
116 @cli.command(help="") | |
117 @click.option( | |
118 "--version", | |
119 is_flag=True, | |
120 ) | |
121 @click.option( | |
122 "--mz-ratio", | |
123 default=[303.05], | |
124 show_default=True, | |
125 multiple=True, | |
126 help="Provide the mz-ratio." | |
127 ) | |
128 @click.option( | |
129 "--database", | |
130 default=["farid"], | |
131 show_default=True, | |
132 multiple=True, | |
133 help="Provide the database." | |
134 ) | |
135 @click.option( | |
136 "--mass-tolerance", | |
137 default=10.5, | |
138 show_default=True, | |
139 help="Provide the mass-tolerance." | |
140 ) | |
141 @click.option( | |
142 "--adducts", | |
143 default=["M+H"], | |
144 type=click.Choice(adduct_choices), | |
145 multiple=True, | |
146 show_default=True, | |
147 show_choices=False, | |
148 help="Provide the adducts." | |
149 ) | |
150 @click.option( | |
151 "--columns", | |
152 default=compound_column_choices[:], | |
153 type=click.Choice(compound_column_choices), | |
154 multiple=True, | |
155 show_default=True, | |
156 show_choices=False, | |
157 help="Provide the outputed columns." | |
158 ) | |
159 @click.option( | |
160 "--output-path", | |
161 help="Provide the output path." | |
162 ) | |
163 def compound(*args, **kwargs): | |
164 | |
165 if kwargs.pop("version"): | |
166 print(__version__) | |
167 exit(0) | |
168 | |
169 adducts = kwargs.pop("adducts") | |
170 polarity = get_polarity(adducts) | |
171 | |
172 other_kwargs, compound_kwargs = build_kwargs( | |
173 adducts=adducts, | |
174 polarity=polarity, | |
175 **kwargs | |
176 ) | |
177 columns = other_kwargs["columns"] | |
178 result = find_compound(**compound_kwargs) | |
179 result = explode_compounds( | |
180 result, | |
181 with_annotations=any(map( | |
182 columns.__contains__, | |
183 annotation_column_choices | |
184 )) | |
185 ) | |
186 check_columns_in_result(result, columns) | |
187 output_csv_result( | |
188 result, | |
189 columns, | |
190 other_kwargs.get("output_path"), | |
191 delimiter="\t", | |
192 ) | |
193 | |
194 | |
195 def explode_compounds(result, with_annotations): | |
196 if with_annotations: | |
197 return [{ | |
198 "database": cpd.database, | |
199 "metabolite_name": cpd.metabolite_name, | |
200 "chemical_formula": cpd.chemical_formula, | |
201 "hmdb_id": cpd.hmdb_id, | |
202 "inchikey": cpd.inchikey, | |
203 "compound_id": cpd.id, | |
204 "adduct": annotation.name, | |
205 "kendricks_mass": annotation.kendricks_mass, | |
206 "kendricks_mass_defect": annotation.kendricks_mass_defect, | |
207 "monisotopic_molecular_weight": | |
208 annotation.monisotopic_molecular_weight, | |
209 "nominal_mass": annotation.nominal_mass, | |
210 "polarity": annotation.polarity, | |
211 "annotation_id": annotation.id, | |
212 } | |
213 for cpd in result | |
214 for annotation in cpd.annotations | |
215 ] | |
216 else: | |
217 return [{ | |
218 "database": cpd.database, | |
219 "metabolite_name": cpd.metabolite_name, | |
220 "chemical_formula": cpd.chemical_formula, | |
221 "hmdb_id": cpd.hmdb_id, | |
222 "inchikey": cpd.inchikey, | |
223 "compound_id": cpd.id, | |
224 } | |
225 for cpd in result | |
226 ] | |
227 | |
228 | |
229 @cli.command(help="") | |
230 @click.option( | |
231 "--id", | |
232 type=int, | |
233 help="Provide the wanted annotation's id." | |
234 ) | |
235 @click.option( | |
236 "--columns", | |
237 default=annotation_column_choices[:], | |
238 type=click.Choice(annotation_column_choices), | |
239 multiple=True, | |
240 show_default=True, | |
241 show_choices=False, | |
242 help="Provide the outputed columns." | |
243 ) | |
244 @click.option( | |
245 "--output-path", | |
246 help="Provide the output path." | |
247 ) | |
248 def annotation(*args, **kwargs): | |
249 result = get_annotation(id=kwargs.pop("id")) | |
250 result = [result] | |
251 columns = kwargs["columns"] | |
252 check_columns_in_result(result, columns) | |
253 output_csv_result( | |
254 result, | |
255 columns, | |
256 kwargs.get("output_path") | |
257 ) | |
258 | |
259 | |
260 def get_polarity(adducts): | |
261 if any(map(positive_adducts.__contains__, adducts)): | |
262 return "positive" | |
263 if any(map(negative_adducts.__contains__, adducts)): | |
264 return "negative" | |
265 # polarity = [] | |
266 # if any(map(positive_adducts.__contains__, adducts)): | |
267 # polarity.append("positive") | |
268 # if any(map(negative_adducts.__contains__, adducts)): | |
269 # polarity.append("negative") | |
270 | |
271 | |
272 def build_kwargs(**kwargs): | |
273 for original, replacement in ( | |
274 ("database", "database_list"), | |
275 ("polarity", "polarity_list"), | |
276 ): | |
277 if original in kwargs: | |
278 kwargs[replacement] = kwargs.pop(original) | |
279 other_kwargs = { | |
280 other_arg: kwargs.pop(other_arg) | |
281 for other_arg in ("columns", "output_path", "with_annotations") | |
282 if other_arg in kwargs | |
283 } | |
284 return other_kwargs, kwargs | |
285 | |
286 | |
287 def check_columns_in_result(result, columns): | |
288 if not result: | |
289 return | |
290 if not isinstance(result[0], dict): | |
291 result = [item.to_dict() for item in result] | |
292 keys = result[0].keys() | |
293 missing = [ | |
294 column for column in columns | |
295 if column not in keys | |
296 ] | |
297 if missing: | |
298 if len(missing) == 1: | |
299 raise ValueError( | |
300 f"Could not find the column {missing[0]} in the results." | |
301 ) | |
302 else: | |
303 raise ValueError( | |
304 "Could not find any of the columns " | |
305 + ','.join(missing) | |
306 + " in the results." | |
307 ) | |
308 | |
309 | |
310 def output_csv_result(result, columns, output_path, **csv_parameters): | |
311 if not output_path: | |
312 raise ValueError("Missing output path. Cannot output CSV results.") | |
313 with open(output_path, mode="w", newline='') as output_file: | |
314 writer = csv.writer(output_file, **csv_parameters) | |
315 write_result(result, columns, writer) | |
316 | |
317 | |
318 def write_result(result, columns, writer): | |
319 getters = list(map(operator.itemgetter, columns)) | |
320 writer.writerow(columns) | |
321 writer.writerows( | |
322 (getter(compound) for getter in getters) | |
323 for compound in result | |
324 ) | |
325 | |
326 | |
327 if __name__ == "__main__": | |
328 cli() |