Mercurial > repos > bimib > cobraxy
comparison COBRAxy/rps_generator_beta.py @ 456:a6e45049c1b9 draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 12 Sep 2025 17:28:45 +0000 |
parents | 187cee1a00e2 |
children |
comparison
equal
deleted
inserted
replaced
455:4e2bc80764b6 | 456:a6e45049c1b9 |
---|---|
1 """ | |
2 Compute Reaction Propensity Scores (RPS) from metabolite abundances and reaction stoichiometry. | |
3 | |
4 Given a tabular dataset (metabolites x samples) and a reaction set, this script | |
5 maps metabolite names via synonyms, fills missing metabolites, and computes RPS | |
6 per reaction for each sample using a log-normalized formula. | |
7 """ | |
1 import math | 8 import math |
2 import argparse | 9 import argparse |
3 | 10 |
4 import numpy as np | 11 import numpy as np |
5 import pickle as pk | 12 import pickle as pk |
20 args (list): List of command-line arguments. | 27 args (list): List of command-line arguments. |
21 | 28 |
22 Returns: | 29 Returns: |
23 Namespace: An object containing parsed arguments. | 30 Namespace: An object containing parsed arguments. |
24 """ | 31 """ |
25 parser = argparse.ArgumentParser(usage = '%(prog)s [options]', | 32 parser = argparse.ArgumentParser( |
26 description = 'process some value\'s'+ | 33 usage='%(prog)s [options]', |
27 ' abundances and reactions to create RPS scores.') | 34 description='Process abundances and reactions to create RPS scores.' |
35 ) | |
28 | 36 |
29 parser.add_argument("-rl", "--model_upload", type = str, | 37 parser.add_argument("-rl", "--model_upload", type = str, |
30 help = "path to input file containing the reactions") | 38 help = "path to input file containing the reactions") |
31 | 39 |
32 # model_upload custom | |
33 parser.add_argument('-td', '--tool_dir', | 40 parser.add_argument('-td', '--tool_dir', |
34 type = str, | 41 type = str, |
35 required = True, | 42 required = True, |
36 help = 'your tool directory') | 43 help = 'your tool directory') |
37 parser.add_argument('-ol', '--out_log', | 44 parser.add_argument('-ol', '--out_log', |
52 def name_dataset(name_data :str, count :int) -> str: | 59 def name_dataset(name_data :str, count :int) -> str: |
53 """ | 60 """ |
54 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. | 61 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. |
55 | 62 |
56 Args: | 63 Args: |
57 name_data : name associated with the dataset (from frontend input params) | 64 name_data: Name associated with the dataset (from frontend input params). |
58 count : counter from 1 to make these names unique (external) | 65 count: Counter starting at 1 to make names unique when default. |
59 | 66 |
60 Returns: | 67 Returns: |
61 str : the name made unique | 68 str : the name made unique |
62 """ | 69 """ |
63 if str(name_data) == 'Dataset': | 70 if str(name_data) == 'Dataset': |
79 pd.Series or None: A series containing abundance values for the specified cell line. | 86 pd.Series or None: A series containing abundance values for the specified cell line. |
80 The name of the series is the name of the cell line. | 87 The name of the series is the name of the cell line. |
81 Returns None if the cell index is invalid. | 88 Returns None if the cell index is invalid. |
82 """ | 89 """ |
83 if cell_line_index < 0 or cell_line_index >= len(dataset.index): | 90 if cell_line_index < 0 or cell_line_index >= len(dataset.index): |
84 print(f"Errore: This cell line index: '{cell_line_index}' is not valid.") | 91 print(f"Error: cell line index '{cell_line_index}' is not valid.") |
85 return None | 92 return None |
86 | 93 |
87 cell_line_name = dataset.columns[cell_line_index] | 94 cell_line_name = dataset.columns[cell_line_index] |
88 abundances_series = dataset[cell_line_name][1:] | 95 abundances_series = dataset[cell_line_name][1:] |
89 | 96 |
136 | 143 |
137 Returns: | 144 Returns: |
138 list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1. | 145 list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1. |
139 | 146 |
140 Side effects: | 147 Side effects: |
141 dataset_by_rows : mut | 148 dataset_by_rows: mutated to include missing metabolites with default abundances. |
142 """ | 149 """ |
143 missing_list = [] | 150 missing_list = [] |
144 for reaction in reactions.values(): | 151 for reaction in reactions.values(): |
145 for metabolite in reaction.keys(): | 152 for metabolite in reaction.keys(): |
146 if metabolite not in dataset_by_rows: | 153 if metabolite not in dataset_by_rows: |
201 | 208 |
202 cell_lines = dataset[0][1:] | 209 cell_lines = dataset[0][1:] |
203 abundances_dict = {} | 210 abundances_dict = {} |
204 | 211 |
205 for row in dataset[1:]: | 212 for row in dataset[1:]: |
206 id = get_metabolite_id(row[0], syn_dict) #if translationIsApplied else row[0] | 213 id = get_metabolite_id(row[0], syn_dict) |
207 if id: | 214 if id: |
208 abundances_dict[id] = list(map(utils.Float(), row[1:])) | 215 abundances_dict[id] = list(map(utils.Float(), row[1:])) |
209 | 216 |
210 missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines))) | 217 missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines))) |
211 | 218 |
212 rps_scores :Dict[Dict[str, float]] = {} | 219 rps_scores :Dict[Dict[str, float]] = {} |
215 | 222 |
216 rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list, substrateFreqTable) | 223 rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list, substrateFreqTable) |
217 | 224 |
218 df = pd.DataFrame.from_dict(rps_scores) | 225 df = pd.DataFrame.from_dict(rps_scores) |
219 df = df.loc[list(reactions.keys()),:] | 226 df = df.loc[list(reactions.keys()),:] |
220 print(df.head(10)) | 227 # Optional preview: first 10 rows |
228 # print(df.head(10)) | |
221 df.index.name = 'Reactions' | 229 df.index.name = 'Reactions' |
222 df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True) | 230 df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True) |
223 | 231 |
224 ############################ main #################################### | 232 ############################ main #################################### |
225 def main(args:List[str] = None) -> None: | 233 def main(args:List[str] = None) -> None: |
230 None | 238 None |
231 """ | 239 """ |
232 global ARGS | 240 global ARGS |
233 ARGS = process_args(args) | 241 ARGS = process_args(args) |
234 | 242 |
235 # TODO:use utils functions vvv | 243 # Load support data (black list and synonyms) |
236 with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl: | 244 with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl: |
237 black_list = pk.load(bl) | 245 black_list = pk.load(bl) |
238 | 246 |
239 with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd: | 247 with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd: |
240 syn_dict = pk.load(sd) | 248 syn_dict = pk.load(sd) |
241 | 249 |
242 dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader = False) | 250 dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader=False) |
243 tmp_dict = None | 251 tmp_dict = None |
244 #if ARGS.reaction_choice == 'default': | 252 |
245 # reactions = pk.load(open(ARGS.tool_dir + '/local/pickle files/reactions.pickle', 'rb')) | 253 # Parse custom reactions from uploaded file |
246 # substrateFreqTable = pk.load(open(ARGS.tool_dir + '/local/pickle files/substrate_frequencies.pickle', 'rb')) | |
247 | |
248 #elif ARGS.reaction_choice == 'custom': | |
249 reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload) | 254 reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload) |
250 for r, s in reactions.items(): | 255 for r, s in reactions.items(): |
251 tmp_list = list(s.keys()) | 256 tmp_list = list(s.keys()) |
252 for k in tmp_list: | 257 for k in tmp_list: |
253 if k[-2] == '_': | 258 if k[-2] == '_': |
256 for _, substrates in reactions.items(): | 261 for _, substrates in reactions.items(): |
257 for substrateName, _ in substrates.items(): | 262 for substrateName, _ in substrates.items(): |
258 if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0 | 263 if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0 |
259 substrateFreqTable[substrateName] += 1 | 264 substrateFreqTable[substrateName] += 1 |
260 | 265 |
261 print(f"Reactions: {reactions}") | 266 # Debug prints (can be enabled during troubleshooting) |
262 print(f"Substrate Frequencies: {substrateFreqTable}") | 267 # print(f"Reactions: {reactions}") |
263 print(f"Synonyms: {syn_dict}") | 268 # print(f"Substrate Frequencies: {substrateFreqTable}") |
269 # print(f"Synonyms: {syn_dict}") | |
264 tmp_dict = {} | 270 tmp_dict = {} |
265 for metabName, freq in substrateFreqTable.items(): | 271 for metabName, freq in substrateFreqTable.items(): |
266 tmp_metabName = clean_metabolite_name(metabName) | 272 tmp_metabName = clean_metabolite_name(metabName) |
267 for syn_key, syn_list in syn_dict.items(): | 273 for syn_key, syn_list in syn_dict.items(): |
268 if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key): | 274 if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key): |
269 print(f"Mapping {tmp_metabName} to {syn_key}") | 275 # print(f"Mapping {tmp_metabName} to {syn_key}") |
270 tmp_dict[syn_key] = syn_list | 276 tmp_dict[syn_key] = syn_list |
271 tmp_dict[syn_key].append(tmp_metabName) | 277 tmp_dict[syn_key].append(tmp_metabName) |
272 | 278 |
273 rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable) | 279 rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable) |
274 print('Execution succeded') | 280 print('Execution succeeded') |
275 | 281 |
276 ############################################################################## | 282 ############################################################################## |
277 if __name__ == "__main__": main() | 283 if __name__ == "__main__": main() |