Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/model_utils.py @ 456:a6e45049c1b9 draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 12 Sep 2025 17:28:45 +0000 |
parents | 4e2bc80764b6 |
children |
comparison
equal
deleted
inserted
replaced
455:4e2bc80764b6 | 456:a6e45049c1b9 |
---|---|
1 """ | |
2 Utilities for generating and manipulating COBRA models and related metadata. | |
3 | |
4 This module includes helpers to: | |
5 - extract rules, reactions, bounds, objective coefficients, and compartments | |
6 - build a COBRA model from a tabular file | |
7 - set objective and medium from dataframes | |
8 - validate a model and convert gene identifiers | |
9 - translate model GPRs using mapping tables | |
10 """ | |
1 import os | 11 import os |
2 import csv | |
3 import cobra | 12 import cobra |
4 import pickle | |
5 import argparse | |
6 import pandas as pd | 13 import pandas as pd |
7 import re | 14 import re |
8 import logging | 15 import logging |
9 from typing import Optional, Tuple, Union, List, Dict, Set | 16 from typing import Optional, Tuple, Union, List, Dict, Set |
10 from collections import defaultdict | 17 from collections import defaultdict |
11 import utils.general_utils as utils | |
12 import utils.rule_parsing as rulesUtils | 18 import utils.rule_parsing as rulesUtils |
13 import utils.reaction_parsing as reactionUtils | 19 import utils.reaction_parsing as reactionUtils |
14 from cobra import Model as cobraModel, Reaction, Metabolite | 20 from cobra import Model as cobraModel, Reaction, Metabolite |
15 | 21 |
16 ################################- DATA GENERATION -################################ | 22 ################################- DATA GENERATION -################################ |
17 ReactionId = str | 23 ReactionId = str |
18 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | 24 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: |
19 """ | 25 """ |
20 Generates a dictionary mapping reaction ids to rules from the model. | 26 Generate a dictionary mapping reaction IDs to GPR rules from the model. |
21 | 27 |
22 Args: | 28 Args: |
23 model : the model to derive data from. | 29 model: COBRA model to derive data from. |
24 asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings. | 30 asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings. |
25 | 31 |
26 Returns: | 32 Returns: |
27 Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules. | 33 Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID. |
28 Dict[ReactionId, str] : the generated dictionary of raw rules. | 34 Dict[ReactionId, str]: Raw rules by reaction ID. |
29 """ | 35 """ |
30 # Is the below approach convoluted? yes | |
31 # Ok but is it inefficient? probably | |
32 # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane) | |
33 _ruleGetter = lambda reaction : reaction.gene_reaction_rule | 36 _ruleGetter = lambda reaction : reaction.gene_reaction_rule |
34 ruleExtractor = (lambda reaction : | 37 ruleExtractor = (lambda reaction : |
35 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter | 38 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter |
36 | 39 |
37 return { | 40 return { |
39 for reaction in model.reactions | 42 for reaction in model.reactions |
40 if reaction.gene_reaction_rule } | 43 if reaction.gene_reaction_rule } |
41 | 44 |
42 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: | 45 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: |
43 """ | 46 """ |
44 Generates a dictionary mapping reaction ids to reaction formulas from the model. | 47 Generate a dictionary mapping reaction IDs to reaction formulas from the model. |
45 | 48 |
46 Args: | 49 Args: |
47 model : the model to derive data from. | 50 model: COBRA model to derive data from. |
48 asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are. | 51 asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings. |
49 | 52 |
50 Returns: | 53 Returns: |
51 Dict[ReactionId, str] : the generated dictionary. | 54 Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested). |
52 """ | 55 """ |
53 | 56 |
54 unparsedReactions = { | 57 unparsedReactions = { |
55 reaction.id : reaction.reaction | 58 reaction.id : reaction.reaction |
56 for reaction in model.reactions | 59 for reaction in model.reactions |
60 if not asParsed: return unparsedReactions | 63 if not asParsed: return unparsedReactions |
61 | 64 |
62 return reactionUtils.create_reaction_dict(unparsedReactions) | 65 return reactionUtils.create_reaction_dict(unparsedReactions) |
63 | 66 |
64 def get_medium(model:cobraModel) -> pd.DataFrame: | 67 def get_medium(model:cobraModel) -> pd.DataFrame: |
68 """ | |
69 Extract the uptake reactions representing the model medium. | |
70 | |
71 Returns a DataFrame with a single column 'reaction' listing exchange reactions | |
72 with negative lower bound and no positive stoichiometric coefficients (uptake only). | |
73 """ | |
65 trueMedium=[] | 74 trueMedium=[] |
66 for r in model.reactions: | 75 for r in model.reactions: |
67 positiveCoeff=0 | 76 positiveCoeff=0 |
68 for m in r.metabolites: | 77 for m in r.metabolites: |
69 if r.get_coefficient(m.id)>0: | 78 if r.get_coefficient(m.id)>0: |
75 df_medium["reaction"] = trueMedium | 84 df_medium["reaction"] = trueMedium |
76 return df_medium | 85 return df_medium |
77 | 86 |
78 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame: | 87 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame: |
79 """ | 88 """ |
80 Estrae i coefficienti della funzione obiettivo per ciascuna reazione del modello. | 89 Extract objective coefficients for each reaction. |
81 | 90 |
82 Args: | 91 Args: |
83 model : cobra.Model | 92 model: COBRA model |
84 | 93 |
85 Returns: | 94 Returns: |
86 pd.DataFrame con colonne: ReactionID, ObjectiveCoefficient | 95 pd.DataFrame with columns: ReactionID, ObjectiveCoefficient |
87 """ | 96 """ |
88 coeffs = [] | 97 coeffs = [] |
89 # model.objective.expression è un'espressione lineare | 98 # model.objective.expression is a linear expression |
90 objective_expr = model.objective.expression.as_coefficients_dict() | 99 objective_expr = model.objective.expression.as_coefficients_dict() |
91 | 100 |
92 for reaction in model.reactions: | 101 for reaction in model.reactions: |
93 coeff = objective_expr.get(reaction.forward_variable, 0.0) | 102 coeff = objective_expr.get(reaction.forward_variable, 0.0) |
94 coeffs.append({ | 103 coeffs.append({ |
97 }) | 106 }) |
98 | 107 |
99 return pd.DataFrame(coeffs) | 108 return pd.DataFrame(coeffs) |
100 | 109 |
101 def generate_bounds(model:cobraModel) -> pd.DataFrame: | 110 def generate_bounds(model:cobraModel) -> pd.DataFrame: |
111 """ | |
112 Build a DataFrame of lower/upper bounds for all reactions. | |
113 | |
114 Returns: | |
115 pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound']. | |
116 """ | |
102 | 117 |
103 rxns = [] | 118 rxns = [] |
104 for reaction in model.reactions: | 119 for reaction in model.reactions: |
105 rxns.append(reaction.id) | 120 rxns.append(reaction.id) |
106 | 121 |
158 | 173 |
159 | 174 |
160 | 175 |
161 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: | 176 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: |
162 """ | 177 """ |
163 Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. | 178 Build a COBRApy model from a tabular file with reaction data. |
164 | 179 |
165 Args: | 180 Args: |
166 csv_path: Path al file CSV (separato da tab) | 181 csv_path: Path to the tab-separated file. |
167 model_id: ID del modello da creare | 182 model_id: ID for the newly created model. |
168 | 183 |
169 Returns: | 184 Returns: |
170 cobra.Model: Il modello COBRApy costruito | 185 cobra.Model: The constructed COBRApy model. |
171 """ | 186 """ |
172 | 187 |
173 # Leggi i dati dal CSV | |
174 df = pd.read_csv(csv_path, sep='\t') | 188 df = pd.read_csv(csv_path, sep='\t') |
175 | 189 |
176 # Crea il modello vuoto | |
177 model = cobraModel(model_id) | 190 model = cobraModel(model_id) |
178 | 191 |
179 # Dict per tenere traccia di metaboliti e compartimenti | |
180 metabolites_dict = {} | 192 metabolites_dict = {} |
181 compartments_dict = {} | 193 compartments_dict = {} |
182 | 194 |
183 print(f"Costruendo modello da {len(df)} reazioni...") | 195 print(f"Building model from {len(df)} reactions...") |
184 | 196 |
185 # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni | |
186 for idx, row in df.iterrows(): | 197 for idx, row in df.iterrows(): |
187 reaction_formula = str(row['Formula']).strip() | 198 reaction_formula = str(row['Formula']).strip() |
188 if not reaction_formula or reaction_formula == 'nan': | 199 if not reaction_formula or reaction_formula == 'nan': |
189 continue | 200 continue |
190 | 201 |
191 # Estrai metaboliti dalla formula della reazione | |
192 metabolites = extract_metabolites_from_reaction(reaction_formula) | 202 metabolites = extract_metabolites_from_reaction(reaction_formula) |
193 | 203 |
194 for met_id in metabolites: | 204 for met_id in metabolites: |
195 compartment = extract_compartment_from_metabolite(met_id) | 205 compartment = extract_compartment_from_metabolite(met_id) |
196 | 206 |
197 # Aggiungi compartimento se non esiste | |
198 if compartment not in compartments_dict: | 207 if compartment not in compartments_dict: |
199 compartments_dict[compartment] = compartment | 208 compartments_dict[compartment] = compartment |
200 | 209 |
201 # Aggiungi metabolita se non esiste | |
202 if met_id not in metabolites_dict: | 210 if met_id not in metabolites_dict: |
203 metabolites_dict[met_id] = Metabolite( | 211 metabolites_dict[met_id] = Metabolite( |
204 id=met_id, | 212 id=met_id, |
205 compartment=compartment, | 213 compartment=compartment, |
206 name=met_id.replace(f"_{compartment}", "").replace("__", "_") | 214 name=met_id.replace(f"_{compartment}", "").replace("__", "_") |
207 ) | 215 ) |
208 | 216 |
209 # Aggiungi compartimenti al modello | |
210 model.compartments = compartments_dict | 217 model.compartments = compartments_dict |
211 | 218 |
212 # Aggiungi metaboliti al modello | |
213 model.add_metabolites(list(metabolites_dict.values())) | 219 model.add_metabolites(list(metabolites_dict.values())) |
214 | 220 |
215 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") | 221 print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments") |
216 | 222 |
217 # Seconda passata: aggiungi le reazioni | |
218 reactions_added = 0 | 223 reactions_added = 0 |
219 reactions_skipped = 0 | 224 reactions_skipped = 0 |
220 | 225 |
221 for idx, row in df.iterrows(): | 226 for idx, row in df.iterrows(): |
222 | 227 |
223 reaction_id = str(row['ReactionID']).strip() | 228 reaction_id = str(row['ReactionID']).strip() |
224 reaction_formula = str(row['Formula']).strip() | 229 reaction_formula = str(row['Formula']).strip() |
225 | 230 |
226 # Salta reazioni senza formula | |
227 if not reaction_formula or reaction_formula == 'nan': | 231 if not reaction_formula or reaction_formula == 'nan': |
228 raise ValueError(f"Formula della reazione mancante {reaction_id}") | 232 raise ValueError(f"Missing reaction formula for {reaction_id}") |
229 | 233 |
230 # Crea la reazione | |
231 reaction = Reaction(reaction_id) | 234 reaction = Reaction(reaction_id) |
232 reaction.name = reaction_id | 235 reaction.name = reaction_id |
233 | 236 |
234 # Imposta bounds | |
235 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 | 237 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 |
236 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 | 238 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 |
237 | 239 |
238 # Aggiungi gene rule se presente | |
239 if pd.notna(row['GPR']) and str(row['GPR']).strip(): | 240 if pd.notna(row['GPR']) and str(row['GPR']).strip(): |
240 reaction.gene_reaction_rule = str(row['GPR']).strip() | 241 reaction.gene_reaction_rule = str(row['GPR']).strip() |
241 | 242 |
242 # Parse della formula della reazione | |
243 try: | 243 try: |
244 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) | 244 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) |
245 except Exception as e: | 245 except Exception as e: |
246 print(f"Errore nel parsing della reazione {reaction_id}: {e}") | 246 print(f"Error parsing reaction {reaction_id}: {e}") |
247 reactions_skipped += 1 | 247 reactions_skipped += 1 |
248 continue | 248 continue |
249 | 249 |
250 # Aggiungi la reazione al modello | |
251 model.add_reactions([reaction]) | 250 model.add_reactions([reaction]) |
252 reactions_added += 1 | 251 reactions_added += 1 |
253 | 252 |
254 | 253 |
255 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") | 254 print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions") |
256 | 255 |
257 # set objective function | 256 # set objective function |
258 set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient") | 257 set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient") |
259 | 258 |
260 # Imposta il medium | |
261 set_medium_from_data(model, df) | 259 set_medium_from_data(model, df) |
262 | 260 |
263 print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") | 261 print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites") |
264 | 262 |
265 return model | 263 return model |
266 | 264 |
267 | 265 |
268 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) | 266 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) |
269 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: | 267 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: |
270 """ | 268 """ |
271 Estrae gli ID dei metaboliti da una formula di reazione. | 269 Extract metabolite IDs from a reaction formula. |
272 Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) | 270 Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e), |
273 e permette che comincino con cifre o underscore. | 271 allowing leading digits/underscores. |
274 """ | 272 """ |
275 metabolites = set() | 273 metabolites = set() |
276 # coefficiente opzionale seguito da un token che termina con _<letters> | 274 # optional coefficient followed by a token ending with _<letters> |
277 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' | 275 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' |
278 matches = re.findall(pattern, reaction_formula) | 276 matches = re.findall(pattern, reaction_formula) |
279 metabolites.update(matches) | 277 metabolites.update(matches) |
280 return metabolites | 278 return metabolites |
281 | 279 |
282 | 280 |
283 def extract_compartment_from_metabolite(metabolite_id: str) -> str: | 281 def extract_compartment_from_metabolite(metabolite_id: str) -> str: |
284 """ | 282 """Extract the compartment from a metabolite ID.""" |
285 Estrae il compartimento dall'ID del metabolita. | |
286 """ | |
287 # Il compartimento è solitamente l'ultima lettera dopo l'underscore | |
288 if '_' in metabolite_id: | 283 if '_' in metabolite_id: |
289 return metabolite_id.split('_')[-1] | 284 return metabolite_id.split('_')[-1] |
290 return 'c' # default cytoplasm | 285 return 'c' # default cytoplasm |
291 | 286 |
292 | 287 |
293 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): | 288 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): |
294 """ | 289 """Parse a reaction formula and set metabolites with their coefficients.""" |
295 Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. | 290 |
296 """ | |
297 | |
298 if reaction.id == 'EX_thbpt_e': | |
299 print(reaction.id) | |
300 print(formula) | |
301 # Dividi in parte sinistra e destra | |
302 if '<=>' in formula: | 291 if '<=>' in formula: |
303 left, right = formula.split('<=>') | 292 left, right = formula.split('<=>') |
304 reversible = True | 293 reversible = True |
305 elif '<--' in formula: | 294 elif '<--' in formula: |
306 left, right = formula.split('<--') | 295 left, right = formula.split('<--') |
307 reversible = False | 296 reversible = False |
308 left, right = left, right | |
309 elif '-->' in formula: | 297 elif '-->' in formula: |
310 left, right = formula.split('-->') | 298 left, right = formula.split('-->') |
311 reversible = False | 299 reversible = False |
312 elif '<-' in formula: | 300 elif '<-' in formula: |
313 left, right = formula.split('<-') | 301 left, right = formula.split('<-') |
314 reversible = False | 302 reversible = False |
315 left, right = left, right | |
316 else: | 303 else: |
317 raise ValueError(f"Formato reazione non riconosciuto: {formula}") | 304 raise ValueError(f"Unrecognized reaction format: {formula}") |
318 | 305 |
319 # Parse dei metaboliti e coefficienti | |
320 reactants = parse_metabolites_side(left.strip()) | 306 reactants = parse_metabolites_side(left.strip()) |
321 products = parse_metabolites_side(right.strip()) | 307 products = parse_metabolites_side(right.strip()) |
322 | 308 |
323 # Aggiungi metaboliti alla reazione | |
324 metabolites_to_add = {} | 309 metabolites_to_add = {} |
325 | 310 |
326 # Reagenti (coefficienti negativi) | |
327 for met_id, coeff in reactants.items(): | 311 for met_id, coeff in reactants.items(): |
328 if met_id in metabolites_dict: | 312 if met_id in metabolites_dict: |
329 metabolites_to_add[metabolites_dict[met_id]] = -coeff | 313 metabolites_to_add[metabolites_dict[met_id]] = -coeff |
330 | 314 |
331 # Prodotti (coefficienti positivi) | |
332 for met_id, coeff in products.items(): | 315 for met_id, coeff in products.items(): |
333 if met_id in metabolites_dict: | 316 if met_id in metabolites_dict: |
334 metabolites_to_add[metabolites_dict[met_id]] = coeff | 317 metabolites_to_add[metabolites_dict[met_id]] = coeff |
335 | 318 |
336 reaction.add_metabolites(metabolites_to_add) | 319 reaction.add_metabolites(metabolites_to_add) |
337 | 320 |
338 | 321 |
339 def parse_metabolites_side(side_str: str) -> Dict[str, float]: | 322 def parse_metabolites_side(side_str: str) -> Dict[str, float]: |
340 """ | 323 """Parse one side of a reaction and extract metabolites with coefficients.""" |
341 Parsa un lato della reazione per estrarre metaboliti e coefficienti. | |
342 """ | |
343 metabolites = {} | 324 metabolites = {} |
344 if not side_str or side_str.strip() == '': | 325 if not side_str or side_str.strip() == '': |
345 return metabolites | 326 return metabolites |
346 | 327 |
347 terms = side_str.split('+') | 328 terms = side_str.split('+') |
348 for term in terms: | 329 for term in terms: |
349 term = term.strip() | 330 term = term.strip() |
350 if not term: | 331 if not term: |
351 continue | 332 continue |
352 | 333 |
353 # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> | 334 # optional coefficient + id ending with _<compartment> |
354 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) | 335 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) |
355 if match: | 336 if match: |
356 coeff_str, met_id = match.groups() | 337 coeff_str, met_id = match.groups() |
357 coeff = float(coeff_str) if coeff_str else 1.0 | 338 coeff = float(coeff_str) if coeff_str else 1.0 |
358 metabolites[met_id] = coeff | 339 metabolites[met_id] = coeff |
370 | 351 |
371 for idx, row in df.iterrows(): | 352 for idx, row in df.iterrows(): |
372 reaction_id = str(row['ReactionID']).strip() | 353 reaction_id = str(row['ReactionID']).strip() |
373 coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0 | 354 coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0 |
374 if coeff != 0: | 355 if coeff != 0: |
375 # Assicuriamoci che la reazione esista nel modello | |
376 if reaction_id in model.reactions: | 356 if reaction_id in model.reactions: |
377 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff | 357 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff |
378 else: | 358 else: |
379 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.") | 359 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.") |
380 | 360 |
381 if not obj_dict: | 361 if not obj_dict: |
382 raise ValueError("No reactions found with non-zero objective coefficient.") | 362 raise ValueError("No reactions found with non-zero objective coefficient.") |
383 | 363 |
384 # Imposta la funzione obiettivo | |
385 model.objective = obj_dict | 364 model.objective = obj_dict |
386 print(f"Objective set with {len(obj_dict)} reactions.") | 365 print(f"Objective set with {len(obj_dict)} reactions.") |
387 | 366 |
388 | 367 |
389 | 368 |
390 | 369 |
391 def set_medium_from_data(model: cobraModel, df: pd.DataFrame): | 370 def set_medium_from_data(model: cobraModel, df: pd.DataFrame): |
392 """ | 371 """Set the medium based on the 'InMedium' column in the dataframe.""" |
393 Imposta il medium basato sulla colonna InMedium. | |
394 """ | |
395 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() | 372 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() |
396 | 373 |
397 medium_dict = {} | 374 medium_dict = {} |
398 for rxn_id in medium_reactions: | 375 for rxn_id in medium_reactions: |
399 if rxn_id in [r.id for r in model.reactions]: | 376 if rxn_id in [r.id for r in model.reactions]: |
401 if reaction.lower_bound < 0: # Solo reazioni di uptake | 378 if reaction.lower_bound < 0: # Solo reazioni di uptake |
402 medium_dict[rxn_id] = abs(reaction.lower_bound) | 379 medium_dict[rxn_id] = abs(reaction.lower_bound) |
403 | 380 |
404 if medium_dict: | 381 if medium_dict: |
405 model.medium = medium_dict | 382 model.medium = medium_dict |
406 print(f"Medium impostato con {len(medium_dict)} componenti") | 383 print(f"Medium set with {len(medium_dict)} components") |
407 | 384 |
408 | 385 |
409 def validate_model(model: cobraModel) -> Dict[str, any]: | 386 def validate_model(model: cobraModel) -> Dict[str, any]: |
410 """ | 387 """Validate the model and return basic statistics.""" |
411 Valida il modello e fornisce statistiche di base. | |
412 """ | |
413 validation = { | 388 validation = { |
414 'num_reactions': len(model.reactions), | 389 'num_reactions': len(model.reactions), |
415 'num_metabolites': len(model.metabolites), | 390 'num_metabolites': len(model.metabolites), |
416 'num_genes': len(model.genes), | 391 'num_genes': len(model.genes), |
417 'num_compartments': len(model.compartments), | 392 'num_compartments': len(model.compartments), |
420 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), | 395 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), |
421 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), | 396 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), |
422 } | 397 } |
423 | 398 |
424 try: | 399 try: |
425 # Test di crescita | 400 # Growth test |
426 solution = model.optimize() | 401 solution = model.optimize() |
427 validation['growth_rate'] = solution.objective_value | 402 validation['growth_rate'] = solution.objective_value |
428 validation['status'] = solution.status | 403 validation['status'] = solution.status |
429 except Exception as e: | 404 except Exception as e: |
430 validation['growth_rate'] = None | 405 validation['growth_rate'] = None |
431 validation['status'] = f"Error: {e}" | 406 validation['status'] = f"Error: {e}" |
432 | 407 |
433 return validation | 408 return validation |
434 | 409 |
435 def convert_genes(model,annotation): | 410 def convert_genes(model, annotation): |
411 """Rename genes using a selected annotation key in gene.notes; returns a model copy.""" | |
436 from cobra.manipulation import rename_genes | 412 from cobra.manipulation import rename_genes |
437 model2=model.copy() | 413 model2=model.copy() |
438 try: | 414 try: |
439 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} | 415 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} |
440 except: | 416 except: |
448 def _normalize_colname(col: str) -> str: | 424 def _normalize_colname(col: str) -> str: |
449 return col.strip().lower().replace(' ', '_') | 425 return col.strip().lower().replace(' ', '_') |
450 | 426 |
451 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]: | 427 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]: |
452 """ | 428 """ |
453 Cerca colonne utili e ritorna dict {ensg: colname1, hgnc_id: colname2, ...} | 429 Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}. |
454 Lancia ValueError se non trova almeno un mapping utile. | 430 Raise ValueError if no suitable mapping is found. |
455 """ | 431 """ |
456 cols = { _normalize_colname(c): c for c in mapping_df.columns } | 432 cols = { _normalize_colname(c): c for c in mapping_df.columns } |
457 chosen = {} | 433 chosen = {} |
458 # possibili nomi per ciascuna categoria | 434 # candidate names for each category |
459 candidates = { | 435 candidates = { |
460 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], | 436 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], |
461 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], | 437 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], |
462 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], | 438 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], |
463 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'], | 439 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'], |
474 source_col: str, | 450 source_col: str, |
475 target_col: str, | 451 target_col: str, |
476 model_source_genes: Optional[Set[str]] = None, | 452 model_source_genes: Optional[Set[str]] = None, |
477 logger: Optional[logging.Logger] = None) -> None: | 453 logger: Optional[logging.Logger] = None) -> None: |
478 """ | 454 """ |
479 Verifica che, nel mapping_df (eventualmente già filtrato sui source di interesse), | 455 Check that, within the filtered mapping_df, each target maps to at most one source. |
480 ogni target sia associato ad al massimo un source. Se trova target associati a | 456 Log examples if duplicates are found. |
481 >1 source solleva ValueError mostrando esempi. | |
482 | |
483 - mapping_df: DataFrame con colonne source_col, target_col | |
484 - model_source_genes: se fornito, è un set di source normalizzati che stiamo traducendo | |
485 (se None, si usa tutto mapping_df) | |
486 """ | 457 """ |
487 if logger is None: | 458 if logger is None: |
488 logger = logging.getLogger(__name__) | 459 logger = logging.getLogger(__name__) |
489 | 460 |
490 if mapping_df.empty: | 461 if mapping_df.empty: |
491 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.") | 462 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.") |
492 return | 463 return |
493 | 464 |
494 # normalizza le colonne temporanee per gruppi (senza modificare il df originale) | 465 # normalize temporary columns for grouping (without altering the original df) |
495 tmp = mapping_df[[source_col, target_col]].copy() | 466 tmp = mapping_df[[source_col, target_col]].copy() |
496 tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id) | 467 tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id) |
497 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip() | 468 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip() |
498 | 469 |
499 # se è passato un insieme di geni modello, filtra qui (già fatto nella chiamata, ma doppio-check ok) | 470 # optionally filter to the set of model source genes |
500 if model_source_genes is not None: | 471 if model_source_genes is not None: |
501 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)] | 472 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)] |
502 | 473 |
503 if tmp.empty: | 474 if tmp.empty: |
504 logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.") | 475 logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.") |
505 return | 476 return |
506 | 477 |
507 # costruisci il reverse mapping target -> set(sources) | 478 # build reverse mapping: target -> set(sources) |
508 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna())) | 479 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna())) |
509 # trova target con più di 1 source | 480 # find targets with more than one source |
510 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} | 481 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} |
511 | 482 |
512 if problematic: | 483 if problematic: |
513 # prepara messaggio di errore con esempi (fino a 20) | 484 # prepare warning message with examples (limited subset) |
514 sample_items = list(problematic.items()) | 485 sample_items = list(problematic.items()) |
515 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] | 486 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] |
516 for tgt, sources in sample_items: | 487 for tgt, sources in sample_items: |
517 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") | 488 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") |
518 full_msg = "\n".join(msg_lines) | 489 full_msg = "\n".join(msg_lines) |
519 # loggare e sollevare errore | 490 # log warning |
520 logger.warning(full_msg) | 491 logger.warning(full_msg) |
521 | 492 |
522 # se tutto ok | 493 # if everything is fine |
523 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") | 494 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") |
524 | 495 |
525 | 496 |
526 def _normalize_gene_id(g: str) -> str: | 497 def _normalize_gene_id(g: str) -> str: |
527 """Rendi consistente un gene id per l'uso come chiave (rimuove prefissi come 'HGNC:' e strip).""" | 498 """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips).""" |
528 if g is None: | 499 if g is None: |
529 return "" | 500 return "" |
530 g = str(g).strip() | 501 g = str(g).strip() |
531 # remove common prefixes | 502 # remove common prefixes |
532 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) | 503 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) |
533 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) | 504 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) |
534 return g | 505 return g |
535 | 506 |
536 def _simplify_boolean_expression(expr: str) -> str: | 507 def _simplify_boolean_expression(expr: str) -> str: |
537 """ | 508 """ |
538 Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze. | 509 Simplify a boolean expression by removing duplicates and redundancies. |
539 Gestisce espressioni con 'and' e 'or'. | 510 Handles expressions with 'and' and 'or'. |
540 """ | 511 """ |
541 if not expr or not expr.strip(): | 512 if not expr or not expr.strip(): |
542 return expr | 513 return expr |
543 | 514 |
544 # Normalizza gli operatori | 515 # normalize operators |
545 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') | 516 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') |
546 | 517 |
547 # Funzione ricorsiva per processare le espressioni | 518 # recursive helper to process expressions |
548 def process_expression(s: str) -> str: | 519 def process_expression(s: str) -> str: |
549 s = s.strip() | 520 s = s.strip() |
550 if not s: | 521 if not s: |
551 return s | 522 return s |
552 | 523 |
553 # Gestisci le parentesi | 524 # handle parentheses |
554 while '(' in s: | 525 while '(' in s: |
555 # Trova la parentesi più interna | 526 # find the innermost parentheses |
556 start = -1 | 527 start = -1 |
557 for i, c in enumerate(s): | 528 for i, c in enumerate(s): |
558 if c == '(': | 529 if c == '(': |
559 start = i | 530 start = i |
560 elif c == ')' and start != -1: | 531 elif c == ')' and start != -1: |
561 # Processa il contenuto tra parentesi | 532 # process inner content |
562 inner = s[start+1:i] | 533 inner = s[start+1:i] |
563 processed_inner = process_expression(inner) | 534 processed_inner = process_expression(inner) |
564 s = s[:start] + processed_inner + s[i+1:] | 535 s = s[:start] + processed_inner + s[i+1:] |
565 break | 536 break |
566 else: | 537 else: |
567 break | 538 break |
568 | 539 |
569 # Dividi per 'or' al livello più alto | 540 # split by 'or' at top level |
570 or_parts = [] | 541 or_parts = [] |
571 current_part = "" | 542 current_part = "" |
572 paren_count = 0 | 543 paren_count = 0 |
573 | 544 |
574 tokens = s.split() | 545 tokens = s.split() |
588 i += 1 | 559 i += 1 |
589 | 560 |
590 if current_part.strip(): | 561 if current_part.strip(): |
591 or_parts.append(current_part.strip()) | 562 or_parts.append(current_part.strip()) |
592 | 563 |
593 # Processa ogni parte OR | 564 # process each OR part |
594 processed_or_parts = [] | 565 processed_or_parts = [] |
595 for or_part in or_parts: | 566 for or_part in or_parts: |
596 # Dividi per 'and' all'interno di ogni parte OR | 567 # split by 'and' within each OR part |
597 and_parts = [] | 568 and_parts = [] |
598 current_and = "" | 569 current_and = "" |
599 paren_count = 0 | 570 paren_count = 0 |
600 | 571 |
601 and_tokens = or_part.split() | 572 and_tokens = or_part.split() |
615 j += 1 | 586 j += 1 |
616 | 587 |
617 if current_and.strip(): | 588 if current_and.strip(): |
618 and_parts.append(current_and.strip()) | 589 and_parts.append(current_and.strip()) |
619 | 590 |
620 # Rimuovi duplicati nelle parti AND | 591 # deduplicate AND parts |
621 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine | 592 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine |
622 | 593 |
623 if len(unique_and_parts) == 1: | 594 if len(unique_and_parts) == 1: |
624 processed_or_parts.append(unique_and_parts[0]) | 595 processed_or_parts.append(unique_and_parts[0]) |
625 elif len(unique_and_parts) > 1: | 596 elif len(unique_and_parts) > 1: |
626 processed_or_parts.append(" and ".join(unique_and_parts)) | 597 processed_or_parts.append(" and ".join(unique_and_parts)) |
627 | 598 |
628 # Rimuovi duplicati nelle parti OR | 599 # deduplicate OR parts |
629 unique_or_parts = list(dict.fromkeys(processed_or_parts)) | 600 unique_or_parts = list(dict.fromkeys(processed_or_parts)) |
630 | 601 |
631 if len(unique_or_parts) == 1: | 602 if len(unique_or_parts) == 1: |
632 return unique_or_parts[0] | 603 return unique_or_parts[0] |
633 elif len(unique_or_parts) > 1: | 604 elif len(unique_or_parts) > 1: |
636 return "" | 607 return "" |
637 | 608 |
638 try: | 609 try: |
639 return process_expression(expr) | 610 return process_expression(expr) |
640 except Exception: | 611 except Exception: |
641 # Se la semplificazione fallisce, ritorna l'espressione originale | 612 # if simplification fails, return the original expression |
642 return expr | 613 return expr |
643 | 614 |
644 # ---------- Main public function ---------- | 615 # ---------- Main public function ---------- |
645 def translate_model_genes(model: 'cobra.Model', | 616 def translate_model_genes(model: 'cobra.Model', |
646 mapping_df: 'pd.DataFrame', | 617 mapping_df: 'pd.DataFrame', |
647 target_nomenclature: str, | 618 target_nomenclature: str, |
648 source_nomenclature: str = 'hgnc_id', | 619 source_nomenclature: str = 'hgnc_id', |
649 allow_many_to_one: bool = False, | 620 allow_many_to_one: bool = False, |
650 logger: Optional[logging.Logger] = None) -> 'cobra.Model': | 621 logger: Optional[logging.Logger] = None) -> 'cobra.Model': |
651 """ | 622 """ |
652 Translate model genes from source_nomenclature to target_nomenclature. | 623 Translate model genes from source_nomenclature to target_nomenclature using a mapping table. |
653 mapping_df should contain at least columns that allow the mapping | 624 mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez). |
654 (e.g. ensg, hgnc_id, hgnc_symbol, entrez). | 625 |
655 | |
656 Args: | 626 Args: |
657 allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target | 627 model: COBRA model to translate. |
658 e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target. | 628 mapping_df: DataFrame containing the mapping information. |
629 target_nomenclature: Desired target key (e.g., 'hgnc_symbol'). | |
630 source_nomenclature: Current source key in the model (default 'hgnc_id'). | |
631 allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs. | |
632 logger: Optional logger. | |
659 """ | 633 """ |
660 if logger is None: | 634 if logger is None: |
661 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | 635 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
662 logger = logging.getLogger(__name__) | 636 logger = logging.getLogger(__name__) |
663 | 637 |
709 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() | 683 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() |
710 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id) | 684 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id) |
711 | 685 |
712 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() | 686 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() |
713 | 687 |
714 # Se non ci sono righe rilevanti, avvisa | |
715 if filtered_map.empty: | 688 if filtered_map.empty: |
716 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") | 689 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") |
717 | 690 |
718 # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one --- | |
719 if not allow_many_to_one: | 691 if not allow_many_to_one: |
720 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) | 692 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) |
721 | 693 |
722 # Crea il mapping | 694 # Crea il mapping |
723 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) | 695 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) |
737 for rxn in model_copy.reactions: | 709 for rxn in model_copy.reactions: |
738 gpr = rxn.gene_reaction_rule | 710 gpr = rxn.gene_reaction_rule |
739 if gpr and gpr.strip(): | 711 if gpr and gpr.strip(): |
740 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) | 712 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) |
741 if new_gpr != gpr: | 713 if new_gpr != gpr: |
742 # Semplifica l'espressione booleana per rimuovere duplicati | |
743 simplified_gpr = _simplify_boolean_expression(new_gpr) | 714 simplified_gpr = _simplify_boolean_expression(new_gpr) |
744 if simplified_gpr != new_gpr: | 715 if simplified_gpr != new_gpr: |
745 stats['simplified_gprs'] += 1 | 716 stats['simplified_gprs'] += 1 |
746 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") | 717 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") |
747 rxn.gene_reaction_rule = simplified_gpr | 718 rxn.gene_reaction_rule = simplified_gpr |