comparison COBRAxy/utils/model_utils.py @ 419:ed2c1f9e20ba draft

Uploaded
author francesco_lapi
date Tue, 09 Sep 2025 09:08:17 +0000
parents 919b5b71a61c
children 00a78da611ba
comparison
equal deleted inserted replaced
418:919b5b71a61c 419:ed2c1f9e20ba
2 import csv 2 import csv
3 import cobra 3 import cobra
4 import pickle 4 import pickle
5 import argparse 5 import argparse
6 import pandas as pd 6 import pandas as pd
7 from typing import Optional, Tuple, Union, List, Dict 7 import re
8 from typing import Optional, Tuple, Union, List, Dict, Set
8 import utils.general_utils as utils 9 import utils.general_utils as utils
9 import utils.rule_parsing as rulesUtils 10 import utils.rule_parsing as rulesUtils
11 import utils.reaction_parsing as reactionUtils
12 from cobra import Model as cobraModel, Reaction, Metabolite
10 13
11 ################################- DATA GENERATION -################################ 14 ################################- DATA GENERATION -################################
12 ReactionId = str 15 ReactionId = str
13 def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: 16 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
14 """ 17 """
15 Generates a dictionary mapping reaction ids to rules from the model. 18 Generates a dictionary mapping reaction ids to rules from the model.
16 19
17 Args: 20 Args:
18 model : the model to derive data from. 21 model : the model to derive data from.
32 return { 35 return {
33 reaction.id : ruleExtractor(reaction) 36 reaction.id : ruleExtractor(reaction)
34 for reaction in model.reactions 37 for reaction in model.reactions
35 if reaction.gene_reaction_rule } 38 if reaction.gene_reaction_rule }
36 39
37 def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]: 40 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
38 """ 41 """
39 Generates a dictionary mapping reaction ids to reaction formulas from the model. 42 Generates a dictionary mapping reaction ids to reaction formulas from the model.
40 43
41 Args: 44 Args:
42 model : the model to derive data from. 45 model : the model to derive data from.
54 57
55 if not asParsed: return unparsedReactions 58 if not asParsed: return unparsedReactions
56 59
57 return reactionUtils.create_reaction_dict(unparsedReactions) 60 return reactionUtils.create_reaction_dict(unparsedReactions)
58 61
59 def get_medium(model:cobra.Model) -> pd.DataFrame: 62 def get_medium(model:cobraModel) -> pd.DataFrame:
60 trueMedium=[] 63 trueMedium=[]
61 for r in model.reactions: 64 for r in model.reactions:
62 positiveCoeff=0 65 positiveCoeff=0
63 for m in r.metabolites: 66 for m in r.metabolites:
64 if r.get_coefficient(m.id)>0: 67 if r.get_coefficient(m.id)>0:
68 71
69 df_medium = pd.DataFrame() 72 df_medium = pd.DataFrame()
70 df_medium["reaction"] = trueMedium 73 df_medium["reaction"] = trueMedium
71 return df_medium 74 return df_medium
72 75
73 def generate_bounds(model:cobra.Model) -> pd.DataFrame: 76 def generate_bounds(model:cobraModel) -> pd.DataFrame:
74 77
75 rxns = [] 78 rxns = []
76 for reaction in model.reactions: 79 for reaction in model.reactions:
77 rxns.append(reaction.id) 80 rxns.append(reaction.id)
78 81
82 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] 85 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
83 return bounds 86 return bounds
84 87
85 88
86 89
87 def generate_compartments(model: cobra.Model) -> pd.DataFrame: 90 def generate_compartments(model: cobraModel) -> pd.DataFrame:
88 """ 91 """
89 Generates a DataFrame containing compartment information for each reaction. 92 Generates a DataFrame containing compartment information for each reaction.
90 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) 93 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.)
91 94
92 Args: 95 Args:
125 row[col_name] = None # or "" if you prefer empty strings 128 row[col_name] = None # or "" if you prefer empty strings
126 129
127 pathway_data.append(row) 130 pathway_data.append(row)
128 131
129 return pd.DataFrame(pathway_data) 132 return pd.DataFrame(pathway_data)
133
134
135
136 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
137 """
138 Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
139
140 Args:
141 csv_path: Path al file CSV (separato da tab)
142 model_id: ID del modello da creare
143
144 Returns:
145 cobra.Model: Il modello COBRApy costruito
146 """
147
148 # Leggi i dati dal CSV
149 df = pd.read_csv(csv_path, sep='\t')
150
151 # Crea il modello vuoto
152 model = cobraModel(model_id)
153
154 # Dict per tenere traccia di metaboliti e compartimenti
155 metabolites_dict = {}
156 compartments_dict = {}
157
158 print(f"Costruendo modello da {len(df)} reazioni...")
159
160 # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
161 for idx, row in df.iterrows():
162 reaction_formula = str(row['Reaction']).strip()
163 if not reaction_formula or reaction_formula == 'nan':
164 continue
165
166 # Estrai metaboliti dalla formula della reazione
167 metabolites = extract_metabolites_from_reaction(reaction_formula)
168
169 for met_id in metabolites:
170 compartment = extract_compartment_from_metabolite(met_id)
171
172 # Aggiungi compartimento se non esiste
173 if compartment not in compartments_dict:
174 compartments_dict[compartment] = compartment
175
176 # Aggiungi metabolita se non esiste
177 if met_id not in metabolites_dict:
178 metabolites_dict[met_id] = Metabolite(
179 id=met_id,
180 compartment=compartment,
181 name=met_id.replace(f"_{compartment}", "").replace("__", "_")
182 )
183
184 # Aggiungi compartimenti al modello
185 model.compartments = compartments_dict
186
187 # Aggiungi metaboliti al modello
188 model.add_metabolites(list(metabolites_dict.values()))
189
190 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
191
192 # Seconda passata: aggiungi le reazioni
193 reactions_added = 0
194 reactions_skipped = 0
195
196 for idx, row in df.iterrows():
197
198 reaction_id = str(row['ReactionID']).strip()
199 reaction_formula = str(row['Reaction']).strip()
200
201 # Salta reazioni senza formula
202 if not reaction_formula or reaction_formula == 'nan':
203 raise ValueError(f"Formula della reazione mancante {reaction_id}")
204
205 # Crea la reazione
206 reaction = Reaction(reaction_id)
207 reaction.name = reaction_id
208
209 # Imposta bounds
210 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
211 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
212
213 # Aggiungi gene rule se presente
214 if pd.notna(row['Rule']) and str(row['Rule']).strip():
215 reaction.gene_reaction_rule = str(row['Rule']).strip()
216
217 # Parse della formula della reazione
218 try:
219 parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
220 except Exception as e:
221 print(f"Errore nel parsing della reazione {reaction_id}: {e}")
222 reactions_skipped += 1
223 continue
224
225 # Aggiungi la reazione al modello
226 model.add_reactions([reaction])
227 reactions_added += 1
228
229
230 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
231
232 # Imposta l'obiettivo di biomassa
233 set_biomass_objective(model)
234
235 # Imposta il medium
236 set_medium_from_data(model, df)
237
238 print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
239
240 return model
241
242
243 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
244 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
245 """
246 Estrae gli ID dei metaboliti da una formula di reazione.
247 Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e)
248 e permette che comincino con cifre o underscore.
249 """
250 metabolites = set()
251 # coefficiente opzionale seguito da un token che termina con _<letters>
252 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
253 matches = re.findall(pattern, reaction_formula)
254 metabolites.update(matches)
255 return metabolites
256
257
258 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
259 """
260 Estrae il compartimento dall'ID del metabolita.
261 """
262 # Il compartimento รจ solitamente l'ultima lettera dopo l'underscore
263 if '_' in metabolite_id:
264 return metabolite_id.split('_')[-1]
265 return 'c' # default cytoplasm
266
267
268 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
269 """
270 Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
271 """
272
273 if reaction.id == 'EX_thbpt_e':
274 print(reaction.id)
275 print(formula)
276 # Dividi in parte sinistra e destra
277 if '<=>' in formula:
278 left, right = formula.split('<=>')
279 reversible = True
280 elif '<--' in formula:
281 left, right = formula.split('<--')
282 reversible = False
283 left, right = left, right
284 elif '-->' in formula:
285 left, right = formula.split('-->')
286 reversible = False
287 elif '<-' in formula:
288 left, right = formula.split('<-')
289 reversible = False
290 left, right = left, right
291 else:
292 raise ValueError(f"Formato reazione non riconosciuto: {formula}")
293
294 # Parse dei metaboliti e coefficienti
295 reactants = parse_metabolites_side(left.strip())
296 products = parse_metabolites_side(right.strip())
297
298 # Aggiungi metaboliti alla reazione
299 metabolites_to_add = {}
300
301 # Reagenti (coefficienti negativi)
302 for met_id, coeff in reactants.items():
303 if met_id in metabolites_dict:
304 metabolites_to_add[metabolites_dict[met_id]] = -coeff
305
306 # Prodotti (coefficienti positivi)
307 for met_id, coeff in products.items():
308 if met_id in metabolites_dict:
309 metabolites_to_add[metabolites_dict[met_id]] = coeff
310
311 reaction.add_metabolites(metabolites_to_add)
312
313
314 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
315 """
316 Parsa un lato della reazione per estrarre metaboliti e coefficienti.
317 """
318 metabolites = {}
319 if not side_str or side_str.strip() == '':
320 return metabolites
321
322 terms = side_str.split('+')
323 for term in terms:
324 term = term.strip()
325 if not term:
326 continue
327
328 # pattern allineato: coefficiente opzionale + id che termina con _<compartimento>
329 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
330 if match:
331 coeff_str, met_id = match.groups()
332 coeff = float(coeff_str) if coeff_str else 1.0
333 metabolites[met_id] = coeff
334
335 return metabolites
336
337
338
339 def set_biomass_objective(model: cobraModel):
340 """
341 Imposta la reazione di biomassa come obiettivo.
342 """
343 biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()]
344
345 if biomass_reactions:
346 model.objective = biomass_reactions[0].id
347 print(f"Obiettivo impostato su: {biomass_reactions[0].id}")
348 else:
349 print("Nessuna reazione di biomassa trovata")
350
351
352 def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
353 """
354 Imposta il medium basato sulla colonna InMedium.
355 """
356 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
357
358 medium_dict = {}
359 for rxn_id in medium_reactions:
360 if rxn_id in [r.id for r in model.reactions]:
361 reaction = model.reactions.get_by_id(rxn_id)
362 if reaction.lower_bound < 0: # Solo reazioni di uptake
363 medium_dict[rxn_id] = abs(reaction.lower_bound)
364
365 if medium_dict:
366 model.medium = medium_dict
367 print(f"Medium impostato con {len(medium_dict)} componenti")
368
369
370 def validate_model(model: cobraModel) -> Dict[str, any]:
371 """
372 Valida il modello e fornisce statistiche di base.
373 """
374 validation = {
375 'num_reactions': len(model.reactions),
376 'num_metabolites': len(model.metabolites),
377 'num_genes': len(model.genes),
378 'num_compartments': len(model.compartments),
379 'objective': str(model.objective),
380 'medium_size': len(model.medium),
381 'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
382 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
383 }
384
385 try:
386 # Test di crescita
387 solution = model.optimize()
388 validation['growth_rate'] = solution.objective_value
389 validation['status'] = solution.status
390 except Exception as e:
391 validation['growth_rate'] = None
392 validation['status'] = f"Error: {e}"
393
394 return validation
395
396 def convert_genes(model,annotation):
397 from cobra.manipulation import rename_genes
398 model2=model.copy()
399 try:
400 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes}
401 except:
402 print("No annotation in gene dict!")
403 return -1
404 rename_genes(model2,dict_genes)
405
406 return model2