comparison COBRAxy/utils/model_utils.py @ 456:a6e45049c1b9 draft default tip

Uploaded
author francesco_lapi
date Fri, 12 Sep 2025 17:28:45 +0000
parents 4e2bc80764b6
children
comparison
equal deleted inserted replaced
455:4e2bc80764b6 456:a6e45049c1b9
1 """
2 Utilities for generating and manipulating COBRA models and related metadata.
3
4 This module includes helpers to:
5 - extract rules, reactions, bounds, objective coefficients, and compartments
6 - build a COBRA model from a tabular file
7 - set objective and medium from dataframes
8 - validate a model and convert gene identifiers
9 - translate model GPRs using mapping tables
10 """
1 import os 11 import os
2 import csv
3 import cobra 12 import cobra
4 import pickle
5 import argparse
6 import pandas as pd 13 import pandas as pd
7 import re 14 import re
8 import logging 15 import logging
9 from typing import Optional, Tuple, Union, List, Dict, Set 16 from typing import Optional, Tuple, Union, List, Dict, Set
10 from collections import defaultdict 17 from collections import defaultdict
11 import utils.general_utils as utils
12 import utils.rule_parsing as rulesUtils 18 import utils.rule_parsing as rulesUtils
13 import utils.reaction_parsing as reactionUtils 19 import utils.reaction_parsing as reactionUtils
14 from cobra import Model as cobraModel, Reaction, Metabolite 20 from cobra import Model as cobraModel, Reaction, Metabolite
15 21
16 ################################- DATA GENERATION -################################ 22 ################################- DATA GENERATION -################################
17 ReactionId = str 23 ReactionId = str
18 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: 24 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
19 """ 25 """
20 Generates a dictionary mapping reaction ids to rules from the model. 26 Generate a dictionary mapping reaction IDs to GPR rules from the model.
21 27
22 Args: 28 Args:
23 model : the model to derive data from. 29 model: COBRA model to derive data from.
24 asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings. 30 asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings.
25 31
26 Returns: 32 Returns:
27 Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules. 33 Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID.
28 Dict[ReactionId, str] : the generated dictionary of raw rules. 34 Dict[ReactionId, str]: Raw rules by reaction ID.
29 """ 35 """
30 # Is the below approach convoluted? yes
31 # Ok but is it inefficient? probably
32 # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane)
33 _ruleGetter = lambda reaction : reaction.gene_reaction_rule 36 _ruleGetter = lambda reaction : reaction.gene_reaction_rule
34 ruleExtractor = (lambda reaction : 37 ruleExtractor = (lambda reaction :
35 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter 38 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
36 39
37 return { 40 return {
39 for reaction in model.reactions 42 for reaction in model.reactions
40 if reaction.gene_reaction_rule } 43 if reaction.gene_reaction_rule }
41 44
42 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: 45 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
43 """ 46 """
44 Generates a dictionary mapping reaction ids to reaction formulas from the model. 47 Generate a dictionary mapping reaction IDs to reaction formulas from the model.
45 48
46 Args: 49 Args:
47 model : the model to derive data from. 50 model: COBRA model to derive data from.
48 asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are. 51 asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings.
49 52
50 Returns: 53 Returns:
51 Dict[ReactionId, str] : the generated dictionary. 54 Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested).
52 """ 55 """
53 56
54 unparsedReactions = { 57 unparsedReactions = {
55 reaction.id : reaction.reaction 58 reaction.id : reaction.reaction
56 for reaction in model.reactions 59 for reaction in model.reactions
60 if not asParsed: return unparsedReactions 63 if not asParsed: return unparsedReactions
61 64
62 return reactionUtils.create_reaction_dict(unparsedReactions) 65 return reactionUtils.create_reaction_dict(unparsedReactions)
63 66
64 def get_medium(model:cobraModel) -> pd.DataFrame: 67 def get_medium(model:cobraModel) -> pd.DataFrame:
68 """
69 Extract the uptake reactions representing the model medium.
70
71 Returns a DataFrame with a single column 'reaction' listing exchange reactions
72 with negative lower bound and no positive stoichiometric coefficients (uptake only).
73 """
65 trueMedium=[] 74 trueMedium=[]
66 for r in model.reactions: 75 for r in model.reactions:
67 positiveCoeff=0 76 positiveCoeff=0
68 for m in r.metabolites: 77 for m in r.metabolites:
69 if r.get_coefficient(m.id)>0: 78 if r.get_coefficient(m.id)>0:
75 df_medium["reaction"] = trueMedium 84 df_medium["reaction"] = trueMedium
76 return df_medium 85 return df_medium
77 86
78 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame: 87 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
79 """ 88 """
80 Estrae i coefficienti della funzione obiettivo per ciascuna reazione del modello. 89 Extract objective coefficients for each reaction.
81 90
82 Args: 91 Args:
83 model : cobra.Model 92 model: COBRA model
84 93
85 Returns: 94 Returns:
86 pd.DataFrame con colonne: ReactionID, ObjectiveCoefficient 95 pd.DataFrame with columns: ReactionID, ObjectiveCoefficient
87 """ 96 """
88 coeffs = [] 97 coeffs = []
89 # model.objective.expression è un'espressione lineare 98 # model.objective.expression is a linear expression
90 objective_expr = model.objective.expression.as_coefficients_dict() 99 objective_expr = model.objective.expression.as_coefficients_dict()
91 100
92 for reaction in model.reactions: 101 for reaction in model.reactions:
93 coeff = objective_expr.get(reaction.forward_variable, 0.0) 102 coeff = objective_expr.get(reaction.forward_variable, 0.0)
94 coeffs.append({ 103 coeffs.append({
97 }) 106 })
98 107
99 return pd.DataFrame(coeffs) 108 return pd.DataFrame(coeffs)
100 109
101 def generate_bounds(model:cobraModel) -> pd.DataFrame: 110 def generate_bounds(model:cobraModel) -> pd.DataFrame:
111 """
112 Build a DataFrame of lower/upper bounds for all reactions.
113
114 Returns:
115 pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound'].
116 """
102 117
103 rxns = [] 118 rxns = []
104 for reaction in model.reactions: 119 for reaction in model.reactions:
105 rxns.append(reaction.id) 120 rxns.append(reaction.id)
106 121
158 173
159 174
160 175
161 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: 176 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
162 """ 177 """
163 Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. 178 Build a COBRApy model from a tabular file with reaction data.
164 179
165 Args: 180 Args:
166 csv_path: Path al file CSV (separato da tab) 181 csv_path: Path to the tab-separated file.
167 model_id: ID del modello da creare 182 model_id: ID for the newly created model.
168 183
169 Returns: 184 Returns:
170 cobra.Model: Il modello COBRApy costruito 185 cobra.Model: The constructed COBRApy model.
171 """ 186 """
172 187
173 # Leggi i dati dal CSV
174 df = pd.read_csv(csv_path, sep='\t') 188 df = pd.read_csv(csv_path, sep='\t')
175 189
176 # Crea il modello vuoto
177 model = cobraModel(model_id) 190 model = cobraModel(model_id)
178 191
179 # Dict per tenere traccia di metaboliti e compartimenti
180 metabolites_dict = {} 192 metabolites_dict = {}
181 compartments_dict = {} 193 compartments_dict = {}
182 194
183 print(f"Costruendo modello da {len(df)} reazioni...") 195 print(f"Building model from {len(df)} reactions...")
184 196
185 # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
186 for idx, row in df.iterrows(): 197 for idx, row in df.iterrows():
187 reaction_formula = str(row['Formula']).strip() 198 reaction_formula = str(row['Formula']).strip()
188 if not reaction_formula or reaction_formula == 'nan': 199 if not reaction_formula or reaction_formula == 'nan':
189 continue 200 continue
190 201
191 # Estrai metaboliti dalla formula della reazione
192 metabolites = extract_metabolites_from_reaction(reaction_formula) 202 metabolites = extract_metabolites_from_reaction(reaction_formula)
193 203
194 for met_id in metabolites: 204 for met_id in metabolites:
195 compartment = extract_compartment_from_metabolite(met_id) 205 compartment = extract_compartment_from_metabolite(met_id)
196 206
197 # Aggiungi compartimento se non esiste
198 if compartment not in compartments_dict: 207 if compartment not in compartments_dict:
199 compartments_dict[compartment] = compartment 208 compartments_dict[compartment] = compartment
200 209
201 # Aggiungi metabolita se non esiste
202 if met_id not in metabolites_dict: 210 if met_id not in metabolites_dict:
203 metabolites_dict[met_id] = Metabolite( 211 metabolites_dict[met_id] = Metabolite(
204 id=met_id, 212 id=met_id,
205 compartment=compartment, 213 compartment=compartment,
206 name=met_id.replace(f"_{compartment}", "").replace("__", "_") 214 name=met_id.replace(f"_{compartment}", "").replace("__", "_")
207 ) 215 )
208 216
209 # Aggiungi compartimenti al modello
210 model.compartments = compartments_dict 217 model.compartments = compartments_dict
211 218
212 # Aggiungi metaboliti al modello
213 model.add_metabolites(list(metabolites_dict.values())) 219 model.add_metabolites(list(metabolites_dict.values()))
214 220
215 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") 221 print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments")
216 222
217 # Seconda passata: aggiungi le reazioni
218 reactions_added = 0 223 reactions_added = 0
219 reactions_skipped = 0 224 reactions_skipped = 0
220 225
221 for idx, row in df.iterrows(): 226 for idx, row in df.iterrows():
222 227
223 reaction_id = str(row['ReactionID']).strip() 228 reaction_id = str(row['ReactionID']).strip()
224 reaction_formula = str(row['Formula']).strip() 229 reaction_formula = str(row['Formula']).strip()
225 230
226 # Salta reazioni senza formula
227 if not reaction_formula or reaction_formula == 'nan': 231 if not reaction_formula or reaction_formula == 'nan':
228 raise ValueError(f"Formula della reazione mancante {reaction_id}") 232 raise ValueError(f"Missing reaction formula for {reaction_id}")
229 233
230 # Crea la reazione
231 reaction = Reaction(reaction_id) 234 reaction = Reaction(reaction_id)
232 reaction.name = reaction_id 235 reaction.name = reaction_id
233 236
234 # Imposta bounds
235 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 237 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
236 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 238 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
237 239
238 # Aggiungi gene rule se presente
239 if pd.notna(row['GPR']) and str(row['GPR']).strip(): 240 if pd.notna(row['GPR']) and str(row['GPR']).strip():
240 reaction.gene_reaction_rule = str(row['GPR']).strip() 241 reaction.gene_reaction_rule = str(row['GPR']).strip()
241 242
242 # Parse della formula della reazione
243 try: 243 try:
244 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) 244 parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
245 except Exception as e: 245 except Exception as e:
246 print(f"Errore nel parsing della reazione {reaction_id}: {e}") 246 print(f"Error parsing reaction {reaction_id}: {e}")
247 reactions_skipped += 1 247 reactions_skipped += 1
248 continue 248 continue
249 249
250 # Aggiungi la reazione al modello
251 model.add_reactions([reaction]) 250 model.add_reactions([reaction])
252 reactions_added += 1 251 reactions_added += 1
253 252
254 253
255 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") 254 print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions")
256 255
257 # set objective function 256 # set objective function
258 set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient") 257 set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient")
259 258
260 # Imposta il medium
261 set_medium_from_data(model, df) 259 set_medium_from_data(model, df)
262 260
263 print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") 261 print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites")
264 262
265 return model 263 return model
266 264
267 265
268 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) 266 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
269 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: 267 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
270 """ 268 """
271 Estrae gli ID dei metaboliti da una formula di reazione. 269 Extract metabolite IDs from a reaction formula.
272 Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) 270 Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e),
273 e permette che comincino con cifre o underscore. 271 allowing leading digits/underscores.
274 """ 272 """
275 metabolites = set() 273 metabolites = set()
276 # coefficiente opzionale seguito da un token che termina con _<letters> 274 # optional coefficient followed by a token ending with _<letters>
277 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' 275 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
278 matches = re.findall(pattern, reaction_formula) 276 matches = re.findall(pattern, reaction_formula)
279 metabolites.update(matches) 277 metabolites.update(matches)
280 return metabolites 278 return metabolites
281 279
282 280
283 def extract_compartment_from_metabolite(metabolite_id: str) -> str: 281 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
284 """ 282 """Extract the compartment from a metabolite ID."""
285 Estrae il compartimento dall'ID del metabolita.
286 """
287 # Il compartimento è solitamente l'ultima lettera dopo l'underscore
288 if '_' in metabolite_id: 283 if '_' in metabolite_id:
289 return metabolite_id.split('_')[-1] 284 return metabolite_id.split('_')[-1]
290 return 'c' # default cytoplasm 285 return 'c' # default cytoplasm
291 286
292 287
293 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): 288 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
294 """ 289 """Parse a reaction formula and set metabolites with their coefficients."""
295 Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. 290
296 """
297
298 if reaction.id == 'EX_thbpt_e':
299 print(reaction.id)
300 print(formula)
301 # Dividi in parte sinistra e destra
302 if '<=>' in formula: 291 if '<=>' in formula:
303 left, right = formula.split('<=>') 292 left, right = formula.split('<=>')
304 reversible = True 293 reversible = True
305 elif '<--' in formula: 294 elif '<--' in formula:
306 left, right = formula.split('<--') 295 left, right = formula.split('<--')
307 reversible = False 296 reversible = False
308 left, right = left, right
309 elif '-->' in formula: 297 elif '-->' in formula:
310 left, right = formula.split('-->') 298 left, right = formula.split('-->')
311 reversible = False 299 reversible = False
312 elif '<-' in formula: 300 elif '<-' in formula:
313 left, right = formula.split('<-') 301 left, right = formula.split('<-')
314 reversible = False 302 reversible = False
315 left, right = left, right
316 else: 303 else:
317 raise ValueError(f"Formato reazione non riconosciuto: {formula}") 304 raise ValueError(f"Unrecognized reaction format: {formula}")
318 305
319 # Parse dei metaboliti e coefficienti
320 reactants = parse_metabolites_side(left.strip()) 306 reactants = parse_metabolites_side(left.strip())
321 products = parse_metabolites_side(right.strip()) 307 products = parse_metabolites_side(right.strip())
322 308
323 # Aggiungi metaboliti alla reazione
324 metabolites_to_add = {} 309 metabolites_to_add = {}
325 310
326 # Reagenti (coefficienti negativi)
327 for met_id, coeff in reactants.items(): 311 for met_id, coeff in reactants.items():
328 if met_id in metabolites_dict: 312 if met_id in metabolites_dict:
329 metabolites_to_add[metabolites_dict[met_id]] = -coeff 313 metabolites_to_add[metabolites_dict[met_id]] = -coeff
330 314
331 # Prodotti (coefficienti positivi)
332 for met_id, coeff in products.items(): 315 for met_id, coeff in products.items():
333 if met_id in metabolites_dict: 316 if met_id in metabolites_dict:
334 metabolites_to_add[metabolites_dict[met_id]] = coeff 317 metabolites_to_add[metabolites_dict[met_id]] = coeff
335 318
336 reaction.add_metabolites(metabolites_to_add) 319 reaction.add_metabolites(metabolites_to_add)
337 320
338 321
339 def parse_metabolites_side(side_str: str) -> Dict[str, float]: 322 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
340 """ 323 """Parse one side of a reaction and extract metabolites with coefficients."""
341 Parsa un lato della reazione per estrarre metaboliti e coefficienti.
342 """
343 metabolites = {} 324 metabolites = {}
344 if not side_str or side_str.strip() == '': 325 if not side_str or side_str.strip() == '':
345 return metabolites 326 return metabolites
346 327
347 terms = side_str.split('+') 328 terms = side_str.split('+')
348 for term in terms: 329 for term in terms:
349 term = term.strip() 330 term = term.strip()
350 if not term: 331 if not term:
351 continue 332 continue
352 333
353 # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> 334 # optional coefficient + id ending with _<compartment>
354 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) 335 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
355 if match: 336 if match:
356 coeff_str, met_id = match.groups() 337 coeff_str, met_id = match.groups()
357 coeff = float(coeff_str) if coeff_str else 1.0 338 coeff = float(coeff_str) if coeff_str else 1.0
358 metabolites[met_id] = coeff 339 metabolites[met_id] = coeff
370 351
371 for idx, row in df.iterrows(): 352 for idx, row in df.iterrows():
372 reaction_id = str(row['ReactionID']).strip() 353 reaction_id = str(row['ReactionID']).strip()
373 coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0 354 coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0
374 if coeff != 0: 355 if coeff != 0:
375 # Assicuriamoci che la reazione esista nel modello
376 if reaction_id in model.reactions: 356 if reaction_id in model.reactions:
377 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff 357 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff
378 else: 358 else:
379 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.") 359 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.")
380 360
381 if not obj_dict: 361 if not obj_dict:
382 raise ValueError("No reactions found with non-zero objective coefficient.") 362 raise ValueError("No reactions found with non-zero objective coefficient.")
383 363
384 # Imposta la funzione obiettivo
385 model.objective = obj_dict 364 model.objective = obj_dict
386 print(f"Objective set with {len(obj_dict)} reactions.") 365 print(f"Objective set with {len(obj_dict)} reactions.")
387 366
388 367
389 368
390 369
391 def set_medium_from_data(model: cobraModel, df: pd.DataFrame): 370 def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
392 """ 371 """Set the medium based on the 'InMedium' column in the dataframe."""
393 Imposta il medium basato sulla colonna InMedium.
394 """
395 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() 372 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
396 373
397 medium_dict = {} 374 medium_dict = {}
398 for rxn_id in medium_reactions: 375 for rxn_id in medium_reactions:
399 if rxn_id in [r.id for r in model.reactions]: 376 if rxn_id in [r.id for r in model.reactions]:
401 if reaction.lower_bound < 0: # Solo reazioni di uptake 378 if reaction.lower_bound < 0: # Solo reazioni di uptake
402 medium_dict[rxn_id] = abs(reaction.lower_bound) 379 medium_dict[rxn_id] = abs(reaction.lower_bound)
403 380
404 if medium_dict: 381 if medium_dict:
405 model.medium = medium_dict 382 model.medium = medium_dict
406 print(f"Medium impostato con {len(medium_dict)} componenti") 383 print(f"Medium set with {len(medium_dict)} components")
407 384
408 385
409 def validate_model(model: cobraModel) -> Dict[str, any]: 386 def validate_model(model: cobraModel) -> Dict[str, any]:
410 """ 387 """Validate the model and return basic statistics."""
411 Valida il modello e fornisce statistiche di base.
412 """
413 validation = { 388 validation = {
414 'num_reactions': len(model.reactions), 389 'num_reactions': len(model.reactions),
415 'num_metabolites': len(model.metabolites), 390 'num_metabolites': len(model.metabolites),
416 'num_genes': len(model.genes), 391 'num_genes': len(model.genes),
417 'num_compartments': len(model.compartments), 392 'num_compartments': len(model.compartments),
420 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), 395 'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
421 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), 396 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
422 } 397 }
423 398
424 try: 399 try:
425 # Test di crescita 400 # Growth test
426 solution = model.optimize() 401 solution = model.optimize()
427 validation['growth_rate'] = solution.objective_value 402 validation['growth_rate'] = solution.objective_value
428 validation['status'] = solution.status 403 validation['status'] = solution.status
429 except Exception as e: 404 except Exception as e:
430 validation['growth_rate'] = None 405 validation['growth_rate'] = None
431 validation['status'] = f"Error: {e}" 406 validation['status'] = f"Error: {e}"
432 407
433 return validation 408 return validation
434 409
435 def convert_genes(model,annotation): 410 def convert_genes(model, annotation):
411 """Rename genes using a selected annotation key in gene.notes; returns a model copy."""
436 from cobra.manipulation import rename_genes 412 from cobra.manipulation import rename_genes
437 model2=model.copy() 413 model2=model.copy()
438 try: 414 try:
439 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} 415 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes}
440 except: 416 except:
448 def _normalize_colname(col: str) -> str: 424 def _normalize_colname(col: str) -> str:
449 return col.strip().lower().replace(' ', '_') 425 return col.strip().lower().replace(' ', '_')
450 426
451 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]: 427 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
452 """ 428 """
453 Cerca colonne utili e ritorna dict {ensg: colname1, hgnc_id: colname2, ...} 429 Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}.
454 Lancia ValueError se non trova almeno un mapping utile. 430 Raise ValueError if no suitable mapping is found.
455 """ 431 """
456 cols = { _normalize_colname(c): c for c in mapping_df.columns } 432 cols = { _normalize_colname(c): c for c in mapping_df.columns }
457 chosen = {} 433 chosen = {}
458 # possibili nomi per ciascuna categoria 434 # candidate names for each category
459 candidates = { 435 candidates = {
460 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], 436 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
461 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], 437 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
462 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], 438 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'],
463 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'], 439 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'],
474 source_col: str, 450 source_col: str,
475 target_col: str, 451 target_col: str,
476 model_source_genes: Optional[Set[str]] = None, 452 model_source_genes: Optional[Set[str]] = None,
477 logger: Optional[logging.Logger] = None) -> None: 453 logger: Optional[logging.Logger] = None) -> None:
478 """ 454 """
479 Verifica che, nel mapping_df (eventualmente già filtrato sui source di interesse), 455 Check that, within the filtered mapping_df, each target maps to at most one source.
480 ogni target sia associato ad al massimo un source. Se trova target associati a 456 Log examples if duplicates are found.
481 >1 source solleva ValueError mostrando esempi.
482
483 - mapping_df: DataFrame con colonne source_col, target_col
484 - model_source_genes: se fornito, è un set di source normalizzati che stiamo traducendo
485 (se None, si usa tutto mapping_df)
486 """ 457 """
487 if logger is None: 458 if logger is None:
488 logger = logging.getLogger(__name__) 459 logger = logging.getLogger(__name__)
489 460
490 if mapping_df.empty: 461 if mapping_df.empty:
491 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.") 462 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
492 return 463 return
493 464
494 # normalizza le colonne temporanee per gruppi (senza modificare il df originale) 465 # normalize temporary columns for grouping (without altering the original df)
495 tmp = mapping_df[[source_col, target_col]].copy() 466 tmp = mapping_df[[source_col, target_col]].copy()
496 tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id) 467 tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id)
497 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip() 468 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
498 469
499 # se è passato un insieme di geni modello, filtra qui (già fatto nella chiamata, ma doppio-check ok) 470 # optionally filter to the set of model source genes
500 if model_source_genes is not None: 471 if model_source_genes is not None:
501 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)] 472 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
502 473
503 if tmp.empty: 474 if tmp.empty:
504 logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.") 475 logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.")
505 return 476 return
506 477
507 # costruisci il reverse mapping target -> set(sources) 478 # build reverse mapping: target -> set(sources)
508 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna())) 479 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
509 # trova target con più di 1 source 480 # find targets with more than one source
510 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} 481 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
511 482
512 if problematic: 483 if problematic:
513 # prepara messaggio di errore con esempi (fino a 20) 484 # prepare warning message with examples (limited subset)
514 sample_items = list(problematic.items()) 485 sample_items = list(problematic.items())
515 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] 486 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
516 for tgt, sources in sample_items: 487 for tgt, sources in sample_items:
517 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") 488 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}")
518 full_msg = "\n".join(msg_lines) 489 full_msg = "\n".join(msg_lines)
519 # loggare e sollevare errore 490 # log warning
520 logger.warning(full_msg) 491 logger.warning(full_msg)
521 492
522 # se tutto ok 493 # if everything is fine
523 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") 494 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
524 495
525 496
526 def _normalize_gene_id(g: str) -> str: 497 def _normalize_gene_id(g: str) -> str:
527 """Rendi consistente un gene id per l'uso come chiave (rimuove prefissi come 'HGNC:' e strip).""" 498 """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips)."""
528 if g is None: 499 if g is None:
529 return "" 500 return ""
530 g = str(g).strip() 501 g = str(g).strip()
531 # remove common prefixes 502 # remove common prefixes
532 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) 503 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE)
533 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) 504 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
534 return g 505 return g
535 506
536 def _simplify_boolean_expression(expr: str) -> str: 507 def _simplify_boolean_expression(expr: str) -> str:
537 """ 508 """
538 Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze. 509 Simplify a boolean expression by removing duplicates and redundancies.
539 Gestisce espressioni con 'and' e 'or'. 510 Handles expressions with 'and' and 'or'.
540 """ 511 """
541 if not expr or not expr.strip(): 512 if not expr or not expr.strip():
542 return expr 513 return expr
543 514
544 # Normalizza gli operatori 515 # normalize operators
545 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') 516 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
546 517
547 # Funzione ricorsiva per processare le espressioni 518 # recursive helper to process expressions
548 def process_expression(s: str) -> str: 519 def process_expression(s: str) -> str:
549 s = s.strip() 520 s = s.strip()
550 if not s: 521 if not s:
551 return s 522 return s
552 523
553 # Gestisci le parentesi 524 # handle parentheses
554 while '(' in s: 525 while '(' in s:
555 # Trova la parentesi più interna 526 # find the innermost parentheses
556 start = -1 527 start = -1
557 for i, c in enumerate(s): 528 for i, c in enumerate(s):
558 if c == '(': 529 if c == '(':
559 start = i 530 start = i
560 elif c == ')' and start != -1: 531 elif c == ')' and start != -1:
561 # Processa il contenuto tra parentesi 532 # process inner content
562 inner = s[start+1:i] 533 inner = s[start+1:i]
563 processed_inner = process_expression(inner) 534 processed_inner = process_expression(inner)
564 s = s[:start] + processed_inner + s[i+1:] 535 s = s[:start] + processed_inner + s[i+1:]
565 break 536 break
566 else: 537 else:
567 break 538 break
568 539
569 # Dividi per 'or' al livello più alto 540 # split by 'or' at top level
570 or_parts = [] 541 or_parts = []
571 current_part = "" 542 current_part = ""
572 paren_count = 0 543 paren_count = 0
573 544
574 tokens = s.split() 545 tokens = s.split()
588 i += 1 559 i += 1
589 560
590 if current_part.strip(): 561 if current_part.strip():
591 or_parts.append(current_part.strip()) 562 or_parts.append(current_part.strip())
592 563
593 # Processa ogni parte OR 564 # process each OR part
594 processed_or_parts = [] 565 processed_or_parts = []
595 for or_part in or_parts: 566 for or_part in or_parts:
596 # Dividi per 'and' all'interno di ogni parte OR 567 # split by 'and' within each OR part
597 and_parts = [] 568 and_parts = []
598 current_and = "" 569 current_and = ""
599 paren_count = 0 570 paren_count = 0
600 571
601 and_tokens = or_part.split() 572 and_tokens = or_part.split()
615 j += 1 586 j += 1
616 587
617 if current_and.strip(): 588 if current_and.strip():
618 and_parts.append(current_and.strip()) 589 and_parts.append(current_and.strip())
619 590
620 # Rimuovi duplicati nelle parti AND 591 # deduplicate AND parts
621 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine 592 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine
622 593
623 if len(unique_and_parts) == 1: 594 if len(unique_and_parts) == 1:
624 processed_or_parts.append(unique_and_parts[0]) 595 processed_or_parts.append(unique_and_parts[0])
625 elif len(unique_and_parts) > 1: 596 elif len(unique_and_parts) > 1:
626 processed_or_parts.append(" and ".join(unique_and_parts)) 597 processed_or_parts.append(" and ".join(unique_and_parts))
627 598
628 # Rimuovi duplicati nelle parti OR 599 # deduplicate OR parts
629 unique_or_parts = list(dict.fromkeys(processed_or_parts)) 600 unique_or_parts = list(dict.fromkeys(processed_or_parts))
630 601
631 if len(unique_or_parts) == 1: 602 if len(unique_or_parts) == 1:
632 return unique_or_parts[0] 603 return unique_or_parts[0]
633 elif len(unique_or_parts) > 1: 604 elif len(unique_or_parts) > 1:
636 return "" 607 return ""
637 608
638 try: 609 try:
639 return process_expression(expr) 610 return process_expression(expr)
640 except Exception: 611 except Exception:
641 # Se la semplificazione fallisce, ritorna l'espressione originale 612 # if simplification fails, return the original expression
642 return expr 613 return expr
643 614
644 # ---------- Main public function ---------- 615 # ---------- Main public function ----------
645 def translate_model_genes(model: 'cobra.Model', 616 def translate_model_genes(model: 'cobra.Model',
646 mapping_df: 'pd.DataFrame', 617 mapping_df: 'pd.DataFrame',
647 target_nomenclature: str, 618 target_nomenclature: str,
648 source_nomenclature: str = 'hgnc_id', 619 source_nomenclature: str = 'hgnc_id',
649 allow_many_to_one: bool = False, 620 allow_many_to_one: bool = False,
650 logger: Optional[logging.Logger] = None) -> 'cobra.Model': 621 logger: Optional[logging.Logger] = None) -> 'cobra.Model':
651 """ 622 """
652 Translate model genes from source_nomenclature to target_nomenclature. 623 Translate model genes from source_nomenclature to target_nomenclature using a mapping table.
653 mapping_df should contain at least columns that allow the mapping 624 mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez).
654 (e.g. ensg, hgnc_id, hgnc_symbol, entrez). 625
655
656 Args: 626 Args:
657 allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target 627 model: COBRA model to translate.
658 e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target. 628 mapping_df: DataFrame containing the mapping information.
629 target_nomenclature: Desired target key (e.g., 'hgnc_symbol').
630 source_nomenclature: Current source key in the model (default 'hgnc_id').
631 allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs.
632 logger: Optional logger.
659 """ 633 """
660 if logger is None: 634 if logger is None:
661 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') 635 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
662 logger = logging.getLogger(__name__) 636 logger = logging.getLogger(__name__)
663 637
709 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() 683 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy()
710 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id) 684 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id)
711 685
712 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() 686 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
713 687
714 # Se non ci sono righe rilevanti, avvisa
715 if filtered_map.empty: 688 if filtered_map.empty:
716 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") 689 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
717 690
718 # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one ---
719 if not allow_many_to_one: 691 if not allow_many_to_one:
720 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) 692 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
721 693
722 # Crea il mapping 694 # Crea il mapping
723 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) 695 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
737 for rxn in model_copy.reactions: 709 for rxn in model_copy.reactions:
738 gpr = rxn.gene_reaction_rule 710 gpr = rxn.gene_reaction_rule
739 if gpr and gpr.strip(): 711 if gpr and gpr.strip():
740 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) 712 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger)
741 if new_gpr != gpr: 713 if new_gpr != gpr:
742 # Semplifica l'espressione booleana per rimuovere duplicati
743 simplified_gpr = _simplify_boolean_expression(new_gpr) 714 simplified_gpr = _simplify_boolean_expression(new_gpr)
744 if simplified_gpr != new_gpr: 715 if simplified_gpr != new_gpr:
745 stats['simplified_gprs'] += 1 716 stats['simplified_gprs'] += 1
746 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") 717 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
747 rxn.gene_reaction_rule = simplified_gpr 718 rxn.gene_reaction_rule = simplified_gpr