418
|
1 import os
|
|
2 import csv
|
|
3 import cobra
|
|
4 import pickle
|
|
5 import argparse
|
|
6 import pandas as pd
|
419
|
7 import re
|
426
|
8 import logging
|
419
|
9 from typing import Optional, Tuple, Union, List, Dict, Set
|
426
|
10 from collections import defaultdict
|
418
|
11 import utils.general_utils as utils
|
|
12 import utils.rule_parsing as rulesUtils
|
419
|
13 import utils.reaction_parsing as reactionUtils
|
|
14 from cobra import Model as cobraModel, Reaction, Metabolite
|
418
|
15
|
|
16 ################################- DATA GENERATION -################################
|
|
17 ReactionId = str
|
419
|
18 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
|
418
|
19 """
|
|
20 Generates a dictionary mapping reaction ids to rules from the model.
|
|
21
|
|
22 Args:
|
|
23 model : the model to derive data from.
|
|
24 asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings.
|
|
25
|
|
26 Returns:
|
|
27 Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules.
|
|
28 Dict[ReactionId, str] : the generated dictionary of raw rules.
|
|
29 """
|
|
30 # Is the below approach convoluted? yes
|
|
31 # Ok but is it inefficient? probably
|
|
32 # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane)
|
|
33 _ruleGetter = lambda reaction : reaction.gene_reaction_rule
|
|
34 ruleExtractor = (lambda reaction :
|
|
35 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
|
|
36
|
|
37 return {
|
|
38 reaction.id : ruleExtractor(reaction)
|
|
39 for reaction in model.reactions
|
|
40 if reaction.gene_reaction_rule }
|
|
41
|
419
|
42 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
|
418
|
43 """
|
|
44 Generates a dictionary mapping reaction ids to reaction formulas from the model.
|
|
45
|
|
46 Args:
|
|
47 model : the model to derive data from.
|
|
48 asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are.
|
|
49
|
|
50 Returns:
|
|
51 Dict[ReactionId, str] : the generated dictionary.
|
|
52 """
|
|
53
|
|
54 unparsedReactions = {
|
|
55 reaction.id : reaction.reaction
|
|
56 for reaction in model.reactions
|
|
57 if reaction.reaction
|
|
58 }
|
|
59
|
|
60 if not asParsed: return unparsedReactions
|
|
61
|
|
62 return reactionUtils.create_reaction_dict(unparsedReactions)
|
|
63
|
419
|
64 def get_medium(model:cobraModel) -> pd.DataFrame:
|
418
|
65 trueMedium=[]
|
|
66 for r in model.reactions:
|
|
67 positiveCoeff=0
|
|
68 for m in r.metabolites:
|
|
69 if r.get_coefficient(m.id)>0:
|
|
70 positiveCoeff=1;
|
|
71 if (positiveCoeff==0 and r.lower_bound<0):
|
|
72 trueMedium.append(r.id)
|
|
73
|
|
74 df_medium = pd.DataFrame()
|
|
75 df_medium["reaction"] = trueMedium
|
|
76 return df_medium
|
|
77
|
426
|
78 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
|
|
79 """
|
|
80 Estrae i coefficienti della funzione obiettivo per ciascuna reazione del modello.
|
|
81
|
|
82 Args:
|
|
83 model : cobra.Model
|
|
84
|
|
85 Returns:
|
|
86 pd.DataFrame con colonne: ReactionID, ObjectiveCoefficient
|
|
87 """
|
|
88 coeffs = []
|
|
89 # model.objective.expression รจ un'espressione lineare
|
|
90 objective_expr = model.objective.expression.as_coefficients_dict()
|
|
91
|
|
92 for reaction in model.reactions:
|
|
93 coeff = objective_expr.get(reaction.forward_variable, 0.0)
|
|
94 coeffs.append({
|
|
95 "ReactionID": reaction.id,
|
|
96 "ObjectiveCoefficient": coeff
|
|
97 })
|
|
98
|
|
99 return pd.DataFrame(coeffs)
|
|
100
|
419
|
101 def generate_bounds(model:cobraModel) -> pd.DataFrame:
|
418
|
102
|
|
103 rxns = []
|
|
104 for reaction in model.reactions:
|
|
105 rxns.append(reaction.id)
|
|
106
|
|
107 bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns)
|
|
108
|
|
109 for reaction in model.reactions:
|
|
110 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
|
|
111 return bounds
|
|
112
|
|
113
|
|
114
|
419
|
115 def generate_compartments(model: cobraModel) -> pd.DataFrame:
|
418
|
116 """
|
|
117 Generates a DataFrame containing compartment information for each reaction.
|
|
118 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.)
|
|
119
|
|
120 Args:
|
|
121 model: the COBRA model to extract compartment data from.
|
|
122
|
|
123 Returns:
|
|
124 pd.DataFrame: DataFrame with ReactionID and compartment columns
|
|
125 """
|
|
126 pathway_data = []
|
|
127
|
|
128 # First pass: determine the maximum number of pathways any reaction has
|
|
129 max_pathways = 0
|
|
130 reaction_pathways = {}
|
|
131
|
|
132 for reaction in model.reactions:
|
|
133 # Get unique pathways from all metabolites in the reaction
|
|
134 if type(reaction.annotation['pathways']) == list:
|
|
135 reaction_pathways[reaction.id] = reaction.annotation['pathways']
|
|
136 max_pathways = max(max_pathways, len(reaction.annotation['pathways']))
|
|
137 else:
|
|
138 reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
|
|
139
|
|
140 # Create column names for pathways
|
|
141 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
|
|
142
|
|
143 # Second pass: create the data
|
|
144 for reaction_id, pathways in reaction_pathways.items():
|
|
145 row = {"ReactionID": reaction_id}
|
|
146
|
|
147 # Fill pathway columns
|
|
148 for i in range(max_pathways):
|
|
149 col_name = pathway_columns[i]
|
|
150 if i < len(pathways):
|
|
151 row[col_name] = pathways[i]
|
|
152 else:
|
|
153 row[col_name] = None # or "" if you prefer empty strings
|
|
154
|
|
155 pathway_data.append(row)
|
|
156
|
419
|
157 return pd.DataFrame(pathway_data)
|
|
158
|
|
159
|
|
160
|
|
161 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
|
|
162 """
|
|
163 Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
|
|
164
|
|
165 Args:
|
|
166 csv_path: Path al file CSV (separato da tab)
|
|
167 model_id: ID del modello da creare
|
|
168
|
|
169 Returns:
|
|
170 cobra.Model: Il modello COBRApy costruito
|
|
171 """
|
|
172
|
|
173 # Leggi i dati dal CSV
|
|
174 df = pd.read_csv(csv_path, sep='\t')
|
|
175
|
|
176 # Crea il modello vuoto
|
|
177 model = cobraModel(model_id)
|
|
178
|
|
179 # Dict per tenere traccia di metaboliti e compartimenti
|
|
180 metabolites_dict = {}
|
|
181 compartments_dict = {}
|
|
182
|
|
183 print(f"Costruendo modello da {len(df)} reazioni...")
|
|
184
|
|
185 # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
|
|
186 for idx, row in df.iterrows():
|
448
|
187 reaction_formula = str(row['Formula']).strip()
|
419
|
188 if not reaction_formula or reaction_formula == 'nan':
|
|
189 continue
|
|
190
|
|
191 # Estrai metaboliti dalla formula della reazione
|
|
192 metabolites = extract_metabolites_from_reaction(reaction_formula)
|
|
193
|
|
194 for met_id in metabolites:
|
|
195 compartment = extract_compartment_from_metabolite(met_id)
|
|
196
|
|
197 # Aggiungi compartimento se non esiste
|
|
198 if compartment not in compartments_dict:
|
|
199 compartments_dict[compartment] = compartment
|
|
200
|
|
201 # Aggiungi metabolita se non esiste
|
|
202 if met_id not in metabolites_dict:
|
|
203 metabolites_dict[met_id] = Metabolite(
|
|
204 id=met_id,
|
|
205 compartment=compartment,
|
|
206 name=met_id.replace(f"_{compartment}", "").replace("__", "_")
|
|
207 )
|
|
208
|
|
209 # Aggiungi compartimenti al modello
|
|
210 model.compartments = compartments_dict
|
|
211
|
|
212 # Aggiungi metaboliti al modello
|
|
213 model.add_metabolites(list(metabolites_dict.values()))
|
|
214
|
|
215 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
|
|
216
|
|
217 # Seconda passata: aggiungi le reazioni
|
|
218 reactions_added = 0
|
|
219 reactions_skipped = 0
|
|
220
|
|
221 for idx, row in df.iterrows():
|
|
222
|
|
223 reaction_id = str(row['ReactionID']).strip()
|
427
|
224 reaction_formula = str(row['Formula']).strip()
|
419
|
225
|
|
226 # Salta reazioni senza formula
|
|
227 if not reaction_formula or reaction_formula == 'nan':
|
|
228 raise ValueError(f"Formula della reazione mancante {reaction_id}")
|
|
229
|
|
230 # Crea la reazione
|
|
231 reaction = Reaction(reaction_id)
|
|
232 reaction.name = reaction_id
|
|
233
|
|
234 # Imposta bounds
|
|
235 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
|
|
236 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
|
|
237
|
|
238 # Aggiungi gene rule se presente
|
427
|
239 if pd.notna(row['GPR']) and str(row['GPR']).strip():
|
|
240 reaction.gene_reaction_rule = str(row['GPR']).strip()
|
419
|
241
|
|
242 # Parse della formula della reazione
|
|
243 try:
|
|
244 parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
|
|
245 except Exception as e:
|
|
246 print(f"Errore nel parsing della reazione {reaction_id}: {e}")
|
|
247 reactions_skipped += 1
|
|
248 continue
|
|
249
|
|
250 # Aggiungi la reazione al modello
|
|
251 model.add_reactions([reaction])
|
|
252 reactions_added += 1
|
|
253
|
|
254
|
|
255 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
|
|
256
|
430
|
257 # set objective function
|
|
258 set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient")
|
|
259
|
419
|
260 # Imposta il medium
|
|
261 set_medium_from_data(model, df)
|
|
262
|
|
263 print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
|
|
264
|
|
265 return model
|
|
266
|
|
267
|
|
268 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
|
|
269 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
|
|
270 """
|
|
271 Estrae gli ID dei metaboliti da una formula di reazione.
|
|
272 Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e)
|
|
273 e permette che comincino con cifre o underscore.
|
|
274 """
|
|
275 metabolites = set()
|
|
276 # coefficiente opzionale seguito da un token che termina con _<letters>
|
|
277 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
|
|
278 matches = re.findall(pattern, reaction_formula)
|
|
279 metabolites.update(matches)
|
|
280 return metabolites
|
|
281
|
|
282
|
|
283 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
|
|
284 """
|
|
285 Estrae il compartimento dall'ID del metabolita.
|
|
286 """
|
|
287 # Il compartimento รจ solitamente l'ultima lettera dopo l'underscore
|
|
288 if '_' in metabolite_id:
|
|
289 return metabolite_id.split('_')[-1]
|
|
290 return 'c' # default cytoplasm
|
|
291
|
|
292
|
|
293 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
|
|
294 """
|
|
295 Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
|
|
296 """
|
|
297
|
|
298 if reaction.id == 'EX_thbpt_e':
|
|
299 print(reaction.id)
|
|
300 print(formula)
|
|
301 # Dividi in parte sinistra e destra
|
|
302 if '<=>' in formula:
|
|
303 left, right = formula.split('<=>')
|
|
304 reversible = True
|
|
305 elif '<--' in formula:
|
|
306 left, right = formula.split('<--')
|
|
307 reversible = False
|
|
308 left, right = left, right
|
|
309 elif '-->' in formula:
|
|
310 left, right = formula.split('-->')
|
|
311 reversible = False
|
|
312 elif '<-' in formula:
|
|
313 left, right = formula.split('<-')
|
|
314 reversible = False
|
|
315 left, right = left, right
|
|
316 else:
|
|
317 raise ValueError(f"Formato reazione non riconosciuto: {formula}")
|
|
318
|
|
319 # Parse dei metaboliti e coefficienti
|
|
320 reactants = parse_metabolites_side(left.strip())
|
|
321 products = parse_metabolites_side(right.strip())
|
|
322
|
|
323 # Aggiungi metaboliti alla reazione
|
|
324 metabolites_to_add = {}
|
|
325
|
|
326 # Reagenti (coefficienti negativi)
|
|
327 for met_id, coeff in reactants.items():
|
|
328 if met_id in metabolites_dict:
|
|
329 metabolites_to_add[metabolites_dict[met_id]] = -coeff
|
|
330
|
|
331 # Prodotti (coefficienti positivi)
|
|
332 for met_id, coeff in products.items():
|
|
333 if met_id in metabolites_dict:
|
|
334 metabolites_to_add[metabolites_dict[met_id]] = coeff
|
|
335
|
|
336 reaction.add_metabolites(metabolites_to_add)
|
|
337
|
|
338
|
|
339 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
|
|
340 """
|
|
341 Parsa un lato della reazione per estrarre metaboliti e coefficienti.
|
|
342 """
|
|
343 metabolites = {}
|
|
344 if not side_str or side_str.strip() == '':
|
|
345 return metabolites
|
|
346
|
|
347 terms = side_str.split('+')
|
|
348 for term in terms:
|
|
349 term = term.strip()
|
|
350 if not term:
|
|
351 continue
|
|
352
|
|
353 # pattern allineato: coefficiente opzionale + id che termina con _<compartimento>
|
|
354 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
|
|
355 if match:
|
|
356 coeff_str, met_id = match.groups()
|
|
357 coeff = float(coeff_str) if coeff_str else 1.0
|
|
358 metabolites[met_id] = coeff
|
|
359
|
|
360 return metabolites
|
|
361
|
|
362
|
|
363
|
430
|
364 def set_objective_from_csv(model: cobra.Model, df: pd.DataFrame, obj_col: str = "ObjectiveCoefficient"):
|
419
|
365 """
|
430
|
366 Sets the model's objective function based on a column of coefficients in the CSV.
|
|
367 Can be any reaction(s), not necessarily biomass.
|
419
|
368 """
|
430
|
369 obj_dict = {}
|
419
|
370
|
430
|
371 for idx, row in df.iterrows():
|
|
372 reaction_id = str(row['ReactionID']).strip()
|
|
373 coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0
|
|
374 if coeff != 0:
|
|
375 # Assicuriamoci che la reazione esista nel modello
|
|
376 if reaction_id in model.reactions:
|
|
377 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff
|
|
378 else:
|
|
379 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.")
|
|
380
|
|
381 if not obj_dict:
|
|
382 raise ValueError("No reactions found with non-zero objective coefficient.")
|
|
383
|
|
384 # Imposta la funzione obiettivo
|
|
385 model.objective = obj_dict
|
|
386 print(f"Objective set with {len(obj_dict)} reactions.")
|
|
387
|
|
388
|
419
|
389
|
|
390
|
|
391 def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
|
|
392 """
|
|
393 Imposta il medium basato sulla colonna InMedium.
|
|
394 """
|
|
395 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
|
|
396
|
|
397 medium_dict = {}
|
|
398 for rxn_id in medium_reactions:
|
|
399 if rxn_id in [r.id for r in model.reactions]:
|
|
400 reaction = model.reactions.get_by_id(rxn_id)
|
|
401 if reaction.lower_bound < 0: # Solo reazioni di uptake
|
|
402 medium_dict[rxn_id] = abs(reaction.lower_bound)
|
|
403
|
|
404 if medium_dict:
|
|
405 model.medium = medium_dict
|
|
406 print(f"Medium impostato con {len(medium_dict)} componenti")
|
|
407
|
|
408
|
|
409 def validate_model(model: cobraModel) -> Dict[str, any]:
|
|
410 """
|
|
411 Valida il modello e fornisce statistiche di base.
|
|
412 """
|
|
413 validation = {
|
|
414 'num_reactions': len(model.reactions),
|
|
415 'num_metabolites': len(model.metabolites),
|
|
416 'num_genes': len(model.genes),
|
|
417 'num_compartments': len(model.compartments),
|
|
418 'objective': str(model.objective),
|
|
419 'medium_size': len(model.medium),
|
|
420 'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
|
|
421 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
|
|
422 }
|
|
423
|
|
424 try:
|
|
425 # Test di crescita
|
|
426 solution = model.optimize()
|
|
427 validation['growth_rate'] = solution.objective_value
|
|
428 validation['status'] = solution.status
|
|
429 except Exception as e:
|
|
430 validation['growth_rate'] = None
|
|
431 validation['status'] = f"Error: {e}"
|
|
432
|
|
433 return validation
|
|
434
|
|
435 def convert_genes(model,annotation):
|
|
436 from cobra.manipulation import rename_genes
|
|
437 model2=model.copy()
|
|
438 try:
|
|
439 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes}
|
|
440 except:
|
|
441 print("No annotation in gene dict!")
|
|
442 return -1
|
|
443 rename_genes(model2,dict_genes)
|
|
444
|
426
|
445 return model2
|
|
446
|
|
447 # ---------- Utility helpers ----------
|
|
448 def _normalize_colname(col: str) -> str:
|
|
449 return col.strip().lower().replace(' ', '_')
|
|
450
|
|
451 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
|
|
452 """
|
|
453 Cerca colonne utili e ritorna dict {ensg: colname1, hgnc_id: colname2, ...}
|
|
454 Lancia ValueError se non trova almeno un mapping utile.
|
|
455 """
|
|
456 cols = { _normalize_colname(c): c for c in mapping_df.columns }
|
|
457 chosen = {}
|
|
458 # possibili nomi per ciascuna categoria
|
|
459 candidates = {
|
|
460 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
|
|
461 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
|
444
|
462 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'],
|
455
|
463 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'],
|
|
464 'gene_number': ['gene_number']
|
426
|
465 }
|
|
466 for key, names in candidates.items():
|
|
467 for n in names:
|
|
468 if n in cols:
|
|
469 chosen[key] = cols[n]
|
|
470 break
|
|
471 return chosen
|
|
472
|
|
473 def _validate_target_uniqueness(mapping_df: 'pd.DataFrame',
|
|
474 source_col: str,
|
|
475 target_col: str,
|
|
476 model_source_genes: Optional[Set[str]] = None,
|
|
477 logger: Optional[logging.Logger] = None) -> None:
|
|
478 """
|
|
479 Verifica che, nel mapping_df (eventualmente giร filtrato sui source di interesse),
|
|
480 ogni target sia associato ad al massimo un source. Se trova target associati a
|
|
481 >1 source solleva ValueError mostrando esempi.
|
|
482
|
|
483 - mapping_df: DataFrame con colonne source_col, target_col
|
|
484 - model_source_genes: se fornito, รจ un set di source normalizzati che stiamo traducendo
|
|
485 (se None, si usa tutto mapping_df)
|
|
486 """
|
|
487 if logger is None:
|
|
488 logger = logging.getLogger(__name__)
|
|
489
|
|
490 if mapping_df.empty:
|
|
491 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
|
|
492 return
|
|
493
|
|
494 # normalizza le colonne temporanee per gruppi (senza modificare il df originale)
|
|
495 tmp = mapping_df[[source_col, target_col]].copy()
|
|
496 tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id)
|
|
497 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
|
|
498
|
|
499 # se รจ passato un insieme di geni modello, filtra qui (giร fatto nella chiamata, ma doppio-check ok)
|
|
500 if model_source_genes is not None:
|
|
501 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
|
|
502
|
|
503 if tmp.empty:
|
|
504 logger.warning("After filtering to model source genes, mapping table is empty โ nothing to validate.")
|
|
505 return
|
|
506
|
|
507 # costruisci il reverse mapping target -> set(sources)
|
|
508 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
|
|
509 # trova target con piรน di 1 source
|
|
510 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
|
|
511
|
|
512 if problematic:
|
|
513 # prepara messaggio di errore con esempi (fino a 20)
|
455
|
514 sample_items = list(problematic.items())
|
426
|
515 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
|
|
516 for tgt, sources in sample_items:
|
|
517 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}")
|
|
518 full_msg = "\n".join(msg_lines)
|
|
519 # loggare e sollevare errore
|
455
|
520 logger.warning(full_msg)
|
426
|
521
|
|
522 # se tutto ok
|
|
523 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
|
|
524
|
|
525
|
|
526 def _normalize_gene_id(g: str) -> str:
|
|
527 """Rendi consistente un gene id per l'uso come chiave (rimuove prefissi come 'HGNC:' e strip)."""
|
|
528 if g is None:
|
|
529 return ""
|
|
530 g = str(g).strip()
|
|
531 # remove common prefixes
|
|
532 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE)
|
|
533 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
|
|
534 return g
|
|
535
|
455
|
536 def _simplify_boolean_expression(expr: str) -> str:
|
|
537 """
|
|
538 Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze.
|
|
539 Gestisce espressioni con 'and' e 'or'.
|
|
540 """
|
|
541 if not expr or not expr.strip():
|
|
542 return expr
|
|
543
|
|
544 # Normalizza gli operatori
|
|
545 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
|
|
546
|
|
547 # Funzione ricorsiva per processare le espressioni
|
|
548 def process_expression(s: str) -> str:
|
|
549 s = s.strip()
|
|
550 if not s:
|
|
551 return s
|
|
552
|
|
553 # Gestisci le parentesi
|
|
554 while '(' in s:
|
|
555 # Trova la parentesi piรน interna
|
|
556 start = -1
|
|
557 for i, c in enumerate(s):
|
|
558 if c == '(':
|
|
559 start = i
|
|
560 elif c == ')' and start != -1:
|
|
561 # Processa il contenuto tra parentesi
|
|
562 inner = s[start+1:i]
|
|
563 processed_inner = process_expression(inner)
|
|
564 s = s[:start] + processed_inner + s[i+1:]
|
|
565 break
|
|
566 else:
|
|
567 break
|
|
568
|
|
569 # Dividi per 'or' al livello piรน alto
|
|
570 or_parts = []
|
|
571 current_part = ""
|
|
572 paren_count = 0
|
|
573
|
|
574 tokens = s.split()
|
|
575 i = 0
|
|
576 while i < len(tokens):
|
|
577 token = tokens[i]
|
|
578 if token == 'or' and paren_count == 0:
|
|
579 if current_part.strip():
|
|
580 or_parts.append(current_part.strip())
|
|
581 current_part = ""
|
|
582 else:
|
|
583 if token.count('(') > token.count(')'):
|
|
584 paren_count += token.count('(') - token.count(')')
|
|
585 elif token.count(')') > token.count('('):
|
|
586 paren_count -= token.count(')') - token.count('(')
|
|
587 current_part += token + " "
|
|
588 i += 1
|
|
589
|
|
590 if current_part.strip():
|
|
591 or_parts.append(current_part.strip())
|
|
592
|
|
593 # Processa ogni parte OR
|
|
594 processed_or_parts = []
|
|
595 for or_part in or_parts:
|
|
596 # Dividi per 'and' all'interno di ogni parte OR
|
|
597 and_parts = []
|
|
598 current_and = ""
|
|
599 paren_count = 0
|
|
600
|
|
601 and_tokens = or_part.split()
|
|
602 j = 0
|
|
603 while j < len(and_tokens):
|
|
604 token = and_tokens[j]
|
|
605 if token == 'and' and paren_count == 0:
|
|
606 if current_and.strip():
|
|
607 and_parts.append(current_and.strip())
|
|
608 current_and = ""
|
|
609 else:
|
|
610 if token.count('(') > token.count(')'):
|
|
611 paren_count += token.count('(') - token.count(')')
|
|
612 elif token.count(')') > token.count('('):
|
|
613 paren_count -= token.count(')') - token.count('(')
|
|
614 current_and += token + " "
|
|
615 j += 1
|
|
616
|
|
617 if current_and.strip():
|
|
618 and_parts.append(current_and.strip())
|
|
619
|
|
620 # Rimuovi duplicati nelle parti AND
|
|
621 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine
|
|
622
|
|
623 if len(unique_and_parts) == 1:
|
|
624 processed_or_parts.append(unique_and_parts[0])
|
|
625 elif len(unique_and_parts) > 1:
|
|
626 processed_or_parts.append(" and ".join(unique_and_parts))
|
|
627
|
|
628 # Rimuovi duplicati nelle parti OR
|
|
629 unique_or_parts = list(dict.fromkeys(processed_or_parts))
|
|
630
|
|
631 if len(unique_or_parts) == 1:
|
|
632 return unique_or_parts[0]
|
|
633 elif len(unique_or_parts) > 1:
|
|
634 return " or ".join(unique_or_parts)
|
|
635 else:
|
|
636 return ""
|
|
637
|
|
638 try:
|
|
639 return process_expression(expr)
|
|
640 except Exception:
|
|
641 # Se la semplificazione fallisce, ritorna l'espressione originale
|
|
642 return expr
|
|
643
|
426
|
644 # ---------- Main public function ----------
|
|
645 def translate_model_genes(model: 'cobra.Model',
|
|
646 mapping_df: 'pd.DataFrame',
|
|
647 target_nomenclature: str,
|
|
648 source_nomenclature: str = 'hgnc_id',
|
455
|
649 allow_many_to_one: bool = False,
|
426
|
650 logger: Optional[logging.Logger] = None) -> 'cobra.Model':
|
|
651 """
|
|
652 Translate model genes from source_nomenclature to target_nomenclature.
|
|
653 mapping_df should contain at least columns that allow the mapping
|
|
654 (e.g. ensg, hgnc_id, hgnc_symbol, entrez).
|
455
|
655
|
|
656 Args:
|
|
657 allow_many_to_one: Se True, permette che piรน source vengano mappati allo stesso target
|
|
658 e gestisce i duplicati nelle GPR. Se False, valida l'unicitร dei target.
|
426
|
659 """
|
|
660 if logger is None:
|
|
661 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
662 logger = logging.getLogger(__name__)
|
|
663
|
|
664 logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'")
|
|
665
|
|
666 # normalize column names and choose relevant columns
|
|
667 chosen = _choose_columns(mapping_df)
|
|
668 if not chosen:
|
|
669 raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.")
|
|
670
|
|
671 # map source/target to actual dataframe column names (allow user-specified source/target keys)
|
|
672 # normalize input args
|
|
673 src_key = source_nomenclature.strip().lower()
|
|
674 tgt_key = target_nomenclature.strip().lower()
|
|
675
|
|
676 # try to find the actual column names for requested keys
|
|
677 col_for_src = None
|
|
678 col_for_tgt = None
|
|
679 # first, try exact match
|
|
680 for k, actual in chosen.items():
|
|
681 if k == src_key:
|
|
682 col_for_src = actual
|
|
683 if k == tgt_key:
|
|
684 col_for_tgt = actual
|
|
685
|
|
686 # if not found, try mapping common names
|
|
687 if col_for_src is None:
|
|
688 possible_src_names = {k: v for k, v in chosen.items()}
|
|
689 # try to match by contained substring
|
|
690 for k, actual in possible_src_names.items():
|
|
691 if src_key in k:
|
|
692 col_for_src = actual
|
|
693 break
|
|
694
|
|
695 if col_for_tgt is None:
|
|
696 for k, actual in chosen.items():
|
|
697 if tgt_key in k:
|
|
698 col_for_tgt = actual
|
|
699 break
|
|
700
|
|
701 if col_for_src is None:
|
|
702 raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
|
|
703 if col_for_tgt is None:
|
|
704 raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
|
|
705
|
|
706 model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
|
|
707 logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
|
|
708
|
|
709 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy()
|
|
710 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id)
|
|
711
|
|
712 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
|
|
713
|
455
|
714 # Se non ci sono righe rilevanti, avvisa
|
426
|
715 if filtered_map.empty:
|
|
716 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
|
|
717
|
455
|
718 # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one ---
|
|
719 if not allow_many_to_one:
|
|
720 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
|
426
|
721
|
455
|
722 # Crea il mapping
|
426
|
723 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
|
|
724
|
|
725 # copy model
|
|
726 model_copy = model.copy()
|
|
727
|
|
728 # statistics
|
455
|
729 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0}
|
426
|
730 unmapped = []
|
|
731 multi = []
|
|
732
|
|
733 original_genes = {g.id for g in model_copy.genes}
|
|
734 logger.info(f"Original genes count: {len(original_genes)}")
|
|
735
|
|
736 # translate GPRs
|
|
737 for rxn in model_copy.reactions:
|
|
738 gpr = rxn.gene_reaction_rule
|
|
739 if gpr and gpr.strip():
|
|
740 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger)
|
|
741 if new_gpr != gpr:
|
455
|
742 # Semplifica l'espressione booleana per rimuovere duplicati
|
|
743 simplified_gpr = _simplify_boolean_expression(new_gpr)
|
|
744 if simplified_gpr != new_gpr:
|
|
745 stats['simplified_gprs'] += 1
|
|
746 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
|
|
747 rxn.gene_reaction_rule = simplified_gpr
|
|
748 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'")
|
426
|
749
|
|
750 # update model genes based on new GPRs
|
|
751 _update_model_genes(model_copy, logger)
|
|
752
|
|
753 # final logging
|
|
754 _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger)
|
|
755
|
|
756 logger.info("Translation finished")
|
|
757 return model_copy
|
|
758
|
|
759
|
|
760 # ---------- helper functions ----------
|
|
761 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]:
|
|
762 """
|
|
763 Build mapping dict: source_id -> list of target_ids
|
|
764 Normalizes IDs (removes prefixes like 'HGNC:' etc).
|
|
765 """
|
|
766 df = mapping_df[[source_col, target_col]].dropna().copy()
|
|
767 # normalize to string
|
|
768 df[source_col] = df[source_col].astype(str).map(_normalize_gene_id)
|
|
769 df[target_col] = df[target_col].astype(str).str.strip()
|
|
770
|
|
771 df = df.drop_duplicates()
|
|
772
|
|
773 logger.info(f"Creating mapping from {len(df)} rows")
|
|
774
|
|
775 mapping = defaultdict(list)
|
|
776 for _, row in df.iterrows():
|
|
777 s = row[source_col]
|
|
778 t = row[target_col]
|
|
779 if t not in mapping[s]:
|
|
780 mapping[s].append(t)
|
|
781
|
|
782 # stats
|
|
783 one_to_one = sum(1 for v in mapping.values() if len(v) == 1)
|
|
784 one_to_many = sum(1 for v in mapping.values() if len(v) > 1)
|
|
785 logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many")
|
|
786 return dict(mapping)
|
|
787
|
|
788
|
|
789 def _translate_gpr(gpr_string: str,
|
|
790 gene_mapping: Dict[str, List[str]],
|
|
791 stats: Dict[str, int],
|
|
792 unmapped_genes: List[str],
|
|
793 multi_mapping_genes: List[Tuple[str, List[str]]],
|
|
794 logger: logging.Logger) -> str:
|
|
795 """
|
|
796 Translate genes inside a GPR string using gene_mapping.
|
|
797 Returns new GPR string.
|
|
798 """
|
|
799 # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols)
|
|
800 token_pattern = r'\b[A-Za-z0-9:_.-]+\b'
|
|
801 tokens = re.findall(token_pattern, gpr_string)
|
|
802
|
|
803 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
|
|
804 tokens = [t for t in tokens if t not in logical]
|
|
805
|
|
806 new_gpr = gpr_string
|
|
807
|
|
808 for token in sorted(set(tokens), key=lambda x: -len(x)): # longer tokens first to avoid partial replacement
|
|
809 norm = _normalize_gene_id(token)
|
|
810 if norm in gene_mapping:
|
|
811 targets = gene_mapping[norm]
|
|
812 stats['translated'] += 1
|
|
813 if len(targets) == 1:
|
|
814 stats['one_to_one'] += 1
|
|
815 replacement = targets[0]
|
|
816 else:
|
|
817 stats['one_to_many'] += 1
|
|
818 multi_mapping_genes.append((token, targets))
|
|
819 replacement = "(" + " or ".join(targets) + ")"
|
|
820
|
|
821 pattern = r'\b' + re.escape(token) + r'\b'
|
|
822 new_gpr = re.sub(pattern, replacement, new_gpr)
|
|
823 else:
|
|
824 stats['not_found'] += 1
|
|
825 if token not in unmapped_genes:
|
|
826 unmapped_genes.append(token)
|
|
827 logger.debug(f"Token not found in mapping (left as-is): {token}")
|
|
828
|
|
829 return new_gpr
|
|
830
|
|
831
|
|
832 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger):
|
|
833 """
|
|
834 Rebuild model.genes from gene_reaction_rule content.
|
|
835 Removes genes not referenced and adds missing ones.
|
|
836 """
|
|
837 # collect genes in GPRs
|
|
838 gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
|
|
839 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
|
|
840 genes_in_gpr: Set[str] = set()
|
|
841
|
|
842 for rxn in model.reactions:
|
|
843 gpr = rxn.gene_reaction_rule
|
|
844 if gpr and gpr.strip():
|
|
845 toks = re.findall(gene_pattern, gpr)
|
|
846 toks = [t for t in toks if t not in logical]
|
|
847 # normalize IDs consistent with mapping normalization
|
|
848 toks = [_normalize_gene_id(t) for t in toks]
|
|
849 genes_in_gpr.update(toks)
|
|
850
|
|
851 # existing gene ids
|
|
852 existing = {g.id for g in model.genes}
|
|
853
|
|
854 # remove obsolete genes
|
|
855 to_remove = [gid for gid in existing if gid not in genes_in_gpr]
|
|
856 removed = 0
|
|
857 for gid in to_remove:
|
|
858 try:
|
|
859 gene_obj = model.genes.get_by_id(gid)
|
|
860 model.genes.remove(gene_obj)
|
|
861 removed += 1
|
|
862 except Exception:
|
|
863 # safe-ignore
|
|
864 pass
|
|
865
|
|
866 # add new genes
|
|
867 added = 0
|
|
868 for gid in genes_in_gpr:
|
|
869 if gid not in existing:
|
|
870 new_gene = cobra.Gene(gid)
|
|
871 try:
|
|
872 model.genes.add(new_gene)
|
|
873 except Exception:
|
|
874 # fallback: if model.genes doesn't support add, try append or model.add_genes
|
|
875 try:
|
|
876 model.genes.append(new_gene)
|
|
877 except Exception:
|
|
878 try:
|
|
879 model.add_genes([new_gene])
|
|
880 except Exception:
|
|
881 logger.warning(f"Could not add gene object for {gid}")
|
|
882 added += 1
|
|
883
|
|
884 logger.info(f"Model genes updated: removed {removed}, added {added}")
|
|
885
|
|
886
|
|
887 def _log_translation_statistics(stats: Dict[str, int],
|
|
888 unmapped_genes: List[str],
|
|
889 multi_mapping_genes: List[Tuple[str, List[str]]],
|
|
890 original_genes: Set[str],
|
|
891 final_genes,
|
|
892 logger: logging.Logger):
|
|
893 logger.info("=== TRANSLATION STATISTICS ===")
|
|
894 logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
|
|
895 logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
|
455
|
896 logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}")
|
426
|
897
|
|
898 final_ids = {g.id for g in final_genes}
|
|
899 logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")
|
|
900
|
|
901 if unmapped_genes:
|
|
902 logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}")
|
|
903 if multi_mapping_genes:
|
|
904 logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):")
|
|
905 for orig, targets in multi_mapping_genes[:10]:
|
|
906 logger.info(f" {orig} -> {', '.join(targets)}") |