456
|
1 """
|
|
2 Compute Reaction Propensity Scores (RPS) from metabolite abundances and reaction stoichiometry.
|
|
3
|
|
4 Given a tabular dataset (metabolites x samples) and a reaction set, this script
|
|
5 maps metabolite names via synonyms, fills missing metabolites, and computes RPS
|
|
6 per reaction for each sample using a log-normalized formula.
|
|
7 """
|
406
|
8 import math
|
|
9 import argparse
|
|
10
|
|
11 import numpy as np
|
|
12 import pickle as pk
|
|
13 import pandas as pd
|
|
14
|
|
15 from typing import Optional, List, Dict
|
|
16
|
|
17 import utils.general_utils as utils
|
|
18 import utils.reaction_parsing as reactionUtils
|
|
19
|
|
20 ########################## argparse ##########################################
|
|
21 ARGS :argparse.Namespace
|
|
22 def process_args(args:List[str] = None) -> argparse.Namespace:
|
|
23 """
|
|
24 Processes command-line arguments.
|
|
25
|
|
26 Args:
|
|
27 args (list): List of command-line arguments.
|
|
28
|
|
29 Returns:
|
|
30 Namespace: An object containing parsed arguments.
|
|
31 """
|
456
|
32 parser = argparse.ArgumentParser(
|
|
33 usage='%(prog)s [options]',
|
|
34 description='Process abundances and reactions to create RPS scores.'
|
|
35 )
|
406
|
36
|
|
37 parser.add_argument("-rl", "--model_upload", type = str,
|
|
38 help = "path to input file containing the reactions")
|
|
39
|
|
40 parser.add_argument('-td', '--tool_dir',
|
|
41 type = str,
|
|
42 required = True,
|
|
43 help = 'your tool directory')
|
|
44 parser.add_argument('-ol', '--out_log',
|
|
45 help = "Output log")
|
|
46 parser.add_argument('-id', '--input',
|
|
47 type = str,
|
|
48 required = True,
|
|
49 help = 'input dataset')
|
|
50 parser.add_argument('-rp', '--rps_output',
|
|
51 type = str,
|
|
52 required = True,
|
|
53 help = 'rps output')
|
|
54
|
|
55 args = parser.parse_args(args)
|
|
56 return args
|
|
57
|
|
58 ############################ dataset name #####################################
|
|
59 def name_dataset(name_data :str, count :int) -> str:
|
|
60 """
|
|
61 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
|
|
62
|
|
63 Args:
|
456
|
64 name_data: Name associated with the dataset (from frontend input params).
|
|
65 count: Counter starting at 1 to make names unique when default.
|
406
|
66
|
|
67 Returns:
|
|
68 str : the name made unique
|
|
69 """
|
|
70 if str(name_data) == 'Dataset':
|
|
71 return str(name_data) + '_' + str(count)
|
|
72 else:
|
|
73 return str(name_data)
|
|
74
|
|
75 ############################ get_abund_data ####################################
|
|
76 def get_abund_data(dataset: pd.DataFrame, cell_line_index:int) -> Optional[pd.Series]:
|
|
77 """
|
|
78 Extracts abundance data and turns it into a series for a specific cell line from the dataset, which rows are
|
|
79 metabolites and columns are cell lines.
|
|
80
|
|
81 Args:
|
|
82 dataset (pandas.DataFrame): The DataFrame containing abundance data for all cell lines and metabolites.
|
|
83 cell_line_index (int): The index of the cell line of interest in the dataset.
|
|
84
|
|
85 Returns:
|
|
86 pd.Series or None: A series containing abundance values for the specified cell line.
|
|
87 The name of the series is the name of the cell line.
|
|
88 Returns None if the cell index is invalid.
|
|
89 """
|
|
90 if cell_line_index < 0 or cell_line_index >= len(dataset.index):
|
456
|
91 print(f"Error: cell line index '{cell_line_index}' is not valid.")
|
406
|
92 return None
|
|
93
|
|
94 cell_line_name = dataset.columns[cell_line_index]
|
|
95 abundances_series = dataset[cell_line_name][1:]
|
|
96
|
|
97 return abundances_series
|
|
98
|
|
99 ############################ clean_metabolite_name ####################################
|
|
100 def clean_metabolite_name(name :str) -> str:
|
|
101 """
|
|
102 Removes some characters from a metabolite's name, provided as input, and makes it lowercase in order to simplify
|
|
103 the search of a match in the dictionary of synonyms.
|
|
104
|
|
105 Args:
|
|
106 name : the metabolite's name, as given in the dataset.
|
|
107
|
|
108 Returns:
|
|
109 str : a new string with the cleaned name.
|
|
110 """
|
|
111 return "".join(ch for ch in name if ch not in ",;-_'([{ }])").lower()
|
|
112
|
|
113 ############################ get_metabolite_id ####################################
|
|
114 def get_metabolite_id(name :str, syn_dict :Dict[str, List[str]]) -> str:
|
|
115 """
|
|
116 Looks through a dictionary of synonyms to find a match for a given metabolite's name.
|
|
117
|
|
118 Args:
|
|
119 name : the metabolite's name, as given in the dataset.
|
|
120 syn_dict : the dictionary of synonyms, using unique identifiers as keys and lists of clean synonyms as values.
|
|
121
|
|
122 Returns:
|
|
123 str : the internal :str unique identifier of that metabolite, used in all other parts of the model in use.
|
|
124 An empty string is returned if a match isn't found.
|
|
125 """
|
|
126 name = clean_metabolite_name(name)
|
|
127 for id, synonyms in syn_dict.items():
|
|
128 if name in synonyms:
|
|
129 return id
|
|
130
|
|
131 return ""
|
|
132
|
|
133 ############################ check_missing_metab ####################################
|
|
134 def check_missing_metab(reactions: Dict[str, Dict[str, int]], dataset_by_rows: Dict[str, List[float]], cell_lines_amt :int) -> List[str]:
|
|
135 """
|
|
136 Check for missing metabolites in the abundances dictionary compared to the reactions dictionary and update abundances accordingly.
|
|
137
|
|
138 Parameters:
|
|
139 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and
|
|
140 stoichiometric coefficients as values.
|
|
141 dataset_by_rows (dict): A dictionary representing abundances where keys are metabolite names and values are their corresponding abundances for all cell lines.
|
|
142 cell_lines_amt : amount of cell lines, needed to add a new list of abundances for missing metabolites.
|
|
143
|
|
144 Returns:
|
|
145 list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1.
|
|
146
|
|
147 Side effects:
|
456
|
148 dataset_by_rows: mutated to include missing metabolites with default abundances.
|
406
|
149 """
|
|
150 missing_list = []
|
|
151 for reaction in reactions.values():
|
|
152 for metabolite in reaction.keys():
|
|
153 if metabolite not in dataset_by_rows:
|
|
154 dataset_by_rows[metabolite] = [1] * cell_lines_amt
|
|
155 missing_list.append(metabolite)
|
|
156
|
|
157 return missing_list
|
|
158
|
|
159 ############################ calculate_rps ####################################
|
|
160 def calculate_rps(reactions: Dict[str, Dict[str, int]], abundances: Dict[str, float], black_list: List[str], missing_list: List[str], substrateFreqTable: Dict[str, int]) -> Dict[str, float]:
|
|
161 """
|
|
162 Calculate the Reaction Propensity scores (RPS) based on the availability of reaction substrates, for (ideally) each input model reaction and for each sample.
|
|
163 The score is computed as the product of the concentrations of the reacting substances, with each concentration raised to a power equal to its stoichiometric coefficient
|
|
164 for each reaction using the provided coefficient and abundance values. The value is then normalized, based on how frequent the metabolite is in the selected model's reactions,
|
|
165 and log-transformed.
|
|
166
|
|
167 Parameters:
|
|
168 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values.
|
|
169 abundances (dict): A dictionary representing metabolite abundances where keys are metabolite names and values are their corresponding abundances.
|
|
170 black_list (list): A list containing metabolite names that should be excluded from the RPS calculation.
|
|
171 missing_list (list): A list containing metabolite names that were missing in the original abundances dictionary and thus their values were set to 1.
|
|
172 substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value).
|
|
173
|
|
174 Returns:
|
|
175 dict: A dictionary containing Reaction Propensity Scores (RPS) where keys are reaction names and values are the corresponding RPS scores.
|
|
176 """
|
|
177 rps_scores = {}
|
|
178
|
|
179 for reaction_name, substrates in reactions.items():
|
|
180 total_contribution = 0
|
|
181 metab_significant = False
|
|
182 for metabolite, stoichiometry in substrates.items():
|
|
183 abundance = 1 if math.isnan(abundances[metabolite]) else abundances[metabolite]
|
|
184 if metabolite not in black_list and metabolite not in missing_list:
|
|
185 metab_significant = True
|
|
186
|
|
187 total_contribution += math.log((abundance + np.finfo(float).eps) / substrateFreqTable[metabolite]) * stoichiometry
|
|
188
|
|
189 rps_scores[reaction_name] = total_contribution if metab_significant else math.nan
|
|
190
|
|
191 return rps_scores
|
|
192
|
|
193 ############################ rps_for_cell_lines ####################################
|
|
194 def rps_for_cell_lines(dataset: List[List[str]], reactions: Dict[str, Dict[str, int]], black_list: List[str], syn_dict: Dict[str, List[str]], substrateFreqTable: Dict[str, int]) -> None:
|
|
195 """
|
|
196 Calculate Reaction Propensity Scores (RPS) for each cell line represented in the dataframe and creates an output file.
|
|
197
|
|
198 Parameters:
|
|
199 dataset : the dataset's data, by rows
|
|
200 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values.
|
|
201 black_list (list): A list containing metabolite names that should be excluded from the RPS calculation.
|
|
202 syn_dict (dict): A dictionary where keys are general metabolite names and values are lists of possible synonyms.
|
|
203 substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value).
|
|
204
|
|
205 Returns:
|
|
206 None
|
|
207 """
|
|
208
|
|
209 cell_lines = dataset[0][1:]
|
|
210 abundances_dict = {}
|
|
211
|
|
212 for row in dataset[1:]:
|
456
|
213 id = get_metabolite_id(row[0], syn_dict)
|
|
214 if id:
|
406
|
215 abundances_dict[id] = list(map(utils.Float(), row[1:]))
|
|
216
|
|
217 missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines)))
|
|
218
|
|
219 rps_scores :Dict[Dict[str, float]] = {}
|
|
220 for pos, cell_line_name in enumerate(cell_lines):
|
|
221 abundances = { metab : abundances[pos] for metab, abundances in abundances_dict.items() }
|
|
222
|
|
223 rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list, substrateFreqTable)
|
|
224
|
|
225 df = pd.DataFrame.from_dict(rps_scores)
|
|
226 df = df.loc[list(reactions.keys()),:]
|
456
|
227 # Optional preview: first 10 rows
|
|
228 # print(df.head(10))
|
406
|
229 df.index.name = 'Reactions'
|
|
230 df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True)
|
|
231
|
|
232 ############################ main ####################################
|
|
233 def main(args:List[str] = None) -> None:
|
|
234 """
|
|
235 Initializes everything and sets the program in motion based on the fronted input arguments.
|
|
236
|
|
237 Returns:
|
|
238 None
|
|
239 """
|
|
240 global ARGS
|
|
241 ARGS = process_args(args)
|
|
242
|
456
|
243 # Load support data (black list and synonyms)
|
406
|
244 with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl:
|
|
245 black_list = pk.load(bl)
|
|
246
|
|
247 with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd:
|
|
248 syn_dict = pk.load(sd)
|
|
249
|
456
|
250 dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader=False)
|
406
|
251 tmp_dict = None
|
456
|
252
|
|
253 # Parse custom reactions from uploaded file
|
406
|
254 reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload)
|
|
255 for r, s in reactions.items():
|
|
256 tmp_list = list(s.keys())
|
|
257 for k in tmp_list:
|
|
258 if k[-2] == '_':
|
|
259 s[k[:-2]] = s.pop(k)
|
|
260 substrateFreqTable = {}
|
|
261 for _, substrates in reactions.items():
|
|
262 for substrateName, _ in substrates.items():
|
|
263 if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0
|
|
264 substrateFreqTable[substrateName] += 1
|
|
265
|
456
|
266 # Debug prints (can be enabled during troubleshooting)
|
|
267 # print(f"Reactions: {reactions}")
|
|
268 # print(f"Substrate Frequencies: {substrateFreqTable}")
|
|
269 # print(f"Synonyms: {syn_dict}")
|
406
|
270 tmp_dict = {}
|
|
271 for metabName, freq in substrateFreqTable.items():
|
|
272 tmp_metabName = clean_metabolite_name(metabName)
|
|
273 for syn_key, syn_list in syn_dict.items():
|
|
274 if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key):
|
456
|
275 # print(f"Mapping {tmp_metabName} to {syn_key}")
|
406
|
276 tmp_dict[syn_key] = syn_list
|
|
277 tmp_dict[syn_key].append(tmp_metabName)
|
|
278
|
|
279 rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable)
|
456
|
280 print('Execution succeeded')
|
406
|
281
|
|
282 ##############################################################################
|
|
283 if __name__ == "__main__": main()
|