Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/model_utils.py @ 419:ed2c1f9e20ba draft
Uploaded
author | francesco_lapi |
---|---|
date | Tue, 09 Sep 2025 09:08:17 +0000 |
parents | 919b5b71a61c |
children | 00a78da611ba |
comparison
equal
deleted
inserted
replaced
418:919b5b71a61c | 419:ed2c1f9e20ba |
---|---|
2 import csv | 2 import csv |
3 import cobra | 3 import cobra |
4 import pickle | 4 import pickle |
5 import argparse | 5 import argparse |
6 import pandas as pd | 6 import pandas as pd |
7 from typing import Optional, Tuple, Union, List, Dict | 7 import re |
8 from typing import Optional, Tuple, Union, List, Dict, Set | |
8 import utils.general_utils as utils | 9 import utils.general_utils as utils |
9 import utils.rule_parsing as rulesUtils | 10 import utils.rule_parsing as rulesUtils |
11 import utils.reaction_parsing as reactionUtils | |
12 from cobra import Model as cobraModel, Reaction, Metabolite | |
10 | 13 |
11 ################################- DATA GENERATION -################################ | 14 ################################- DATA GENERATION -################################ |
12 ReactionId = str | 15 ReactionId = str |
13 def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | 16 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: |
14 """ | 17 """ |
15 Generates a dictionary mapping reaction ids to rules from the model. | 18 Generates a dictionary mapping reaction ids to rules from the model. |
16 | 19 |
17 Args: | 20 Args: |
18 model : the model to derive data from. | 21 model : the model to derive data from. |
32 return { | 35 return { |
33 reaction.id : ruleExtractor(reaction) | 36 reaction.id : ruleExtractor(reaction) |
34 for reaction in model.reactions | 37 for reaction in model.reactions |
35 if reaction.gene_reaction_rule } | 38 if reaction.gene_reaction_rule } |
36 | 39 |
37 def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]: | 40 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: |
38 """ | 41 """ |
39 Generates a dictionary mapping reaction ids to reaction formulas from the model. | 42 Generates a dictionary mapping reaction ids to reaction formulas from the model. |
40 | 43 |
41 Args: | 44 Args: |
42 model : the model to derive data from. | 45 model : the model to derive data from. |
54 | 57 |
55 if not asParsed: return unparsedReactions | 58 if not asParsed: return unparsedReactions |
56 | 59 |
57 return reactionUtils.create_reaction_dict(unparsedReactions) | 60 return reactionUtils.create_reaction_dict(unparsedReactions) |
58 | 61 |
59 def get_medium(model:cobra.Model) -> pd.DataFrame: | 62 def get_medium(model:cobraModel) -> pd.DataFrame: |
60 trueMedium=[] | 63 trueMedium=[] |
61 for r in model.reactions: | 64 for r in model.reactions: |
62 positiveCoeff=0 | 65 positiveCoeff=0 |
63 for m in r.metabolites: | 66 for m in r.metabolites: |
64 if r.get_coefficient(m.id)>0: | 67 if r.get_coefficient(m.id)>0: |
68 | 71 |
69 df_medium = pd.DataFrame() | 72 df_medium = pd.DataFrame() |
70 df_medium["reaction"] = trueMedium | 73 df_medium["reaction"] = trueMedium |
71 return df_medium | 74 return df_medium |
72 | 75 |
73 def generate_bounds(model:cobra.Model) -> pd.DataFrame: | 76 def generate_bounds(model:cobraModel) -> pd.DataFrame: |
74 | 77 |
75 rxns = [] | 78 rxns = [] |
76 for reaction in model.reactions: | 79 for reaction in model.reactions: |
77 rxns.append(reaction.id) | 80 rxns.append(reaction.id) |
78 | 81 |
82 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] | 85 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] |
83 return bounds | 86 return bounds |
84 | 87 |
85 | 88 |
86 | 89 |
87 def generate_compartments(model: cobra.Model) -> pd.DataFrame: | 90 def generate_compartments(model: cobraModel) -> pd.DataFrame: |
88 """ | 91 """ |
89 Generates a DataFrame containing compartment information for each reaction. | 92 Generates a DataFrame containing compartment information for each reaction. |
90 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) | 93 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) |
91 | 94 |
92 Args: | 95 Args: |
125 row[col_name] = None # or "" if you prefer empty strings | 128 row[col_name] = None # or "" if you prefer empty strings |
126 | 129 |
127 pathway_data.append(row) | 130 pathway_data.append(row) |
128 | 131 |
129 return pd.DataFrame(pathway_data) | 132 return pd.DataFrame(pathway_data) |
133 | |
134 | |
135 | |
136 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: | |
137 """ | |
138 Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. | |
139 | |
140 Args: | |
141 csv_path: Path al file CSV (separato da tab) | |
142 model_id: ID del modello da creare | |
143 | |
144 Returns: | |
145 cobra.Model: Il modello COBRApy costruito | |
146 """ | |
147 | |
148 # Leggi i dati dal CSV | |
149 df = pd.read_csv(csv_path, sep='\t') | |
150 | |
151 # Crea il modello vuoto | |
152 model = cobraModel(model_id) | |
153 | |
154 # Dict per tenere traccia di metaboliti e compartimenti | |
155 metabolites_dict = {} | |
156 compartments_dict = {} | |
157 | |
158 print(f"Costruendo modello da {len(df)} reazioni...") | |
159 | |
160 # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni | |
161 for idx, row in df.iterrows(): | |
162 reaction_formula = str(row['Reaction']).strip() | |
163 if not reaction_formula or reaction_formula == 'nan': | |
164 continue | |
165 | |
166 # Estrai metaboliti dalla formula della reazione | |
167 metabolites = extract_metabolites_from_reaction(reaction_formula) | |
168 | |
169 for met_id in metabolites: | |
170 compartment = extract_compartment_from_metabolite(met_id) | |
171 | |
172 # Aggiungi compartimento se non esiste | |
173 if compartment not in compartments_dict: | |
174 compartments_dict[compartment] = compartment | |
175 | |
176 # Aggiungi metabolita se non esiste | |
177 if met_id not in metabolites_dict: | |
178 metabolites_dict[met_id] = Metabolite( | |
179 id=met_id, | |
180 compartment=compartment, | |
181 name=met_id.replace(f"_{compartment}", "").replace("__", "_") | |
182 ) | |
183 | |
184 # Aggiungi compartimenti al modello | |
185 model.compartments = compartments_dict | |
186 | |
187 # Aggiungi metaboliti al modello | |
188 model.add_metabolites(list(metabolites_dict.values())) | |
189 | |
190 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") | |
191 | |
192 # Seconda passata: aggiungi le reazioni | |
193 reactions_added = 0 | |
194 reactions_skipped = 0 | |
195 | |
196 for idx, row in df.iterrows(): | |
197 | |
198 reaction_id = str(row['ReactionID']).strip() | |
199 reaction_formula = str(row['Reaction']).strip() | |
200 | |
201 # Salta reazioni senza formula | |
202 if not reaction_formula or reaction_formula == 'nan': | |
203 raise ValueError(f"Formula della reazione mancante {reaction_id}") | |
204 | |
205 # Crea la reazione | |
206 reaction = Reaction(reaction_id) | |
207 reaction.name = reaction_id | |
208 | |
209 # Imposta bounds | |
210 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 | |
211 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 | |
212 | |
213 # Aggiungi gene rule se presente | |
214 if pd.notna(row['Rule']) and str(row['Rule']).strip(): | |
215 reaction.gene_reaction_rule = str(row['Rule']).strip() | |
216 | |
217 # Parse della formula della reazione | |
218 try: | |
219 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) | |
220 except Exception as e: | |
221 print(f"Errore nel parsing della reazione {reaction_id}: {e}") | |
222 reactions_skipped += 1 | |
223 continue | |
224 | |
225 # Aggiungi la reazione al modello | |
226 model.add_reactions([reaction]) | |
227 reactions_added += 1 | |
228 | |
229 | |
230 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") | |
231 | |
232 # Imposta l'obiettivo di biomassa | |
233 set_biomass_objective(model) | |
234 | |
235 # Imposta il medium | |
236 set_medium_from_data(model, df) | |
237 | |
238 print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") | |
239 | |
240 return model | |
241 | |
242 | |
243 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) | |
244 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: | |
245 """ | |
246 Estrae gli ID dei metaboliti da una formula di reazione. | |
247 Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) | |
248 e permette che comincino con cifre o underscore. | |
249 """ | |
250 metabolites = set() | |
251 # coefficiente opzionale seguito da un token che termina con _<letters> | |
252 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' | |
253 matches = re.findall(pattern, reaction_formula) | |
254 metabolites.update(matches) | |
255 return metabolites | |
256 | |
257 | |
258 def extract_compartment_from_metabolite(metabolite_id: str) -> str: | |
259 """ | |
260 Estrae il compartimento dall'ID del metabolita. | |
261 """ | |
262 # Il compartimento รจ solitamente l'ultima lettera dopo l'underscore | |
263 if '_' in metabolite_id: | |
264 return metabolite_id.split('_')[-1] | |
265 return 'c' # default cytoplasm | |
266 | |
267 | |
268 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): | |
269 """ | |
270 Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. | |
271 """ | |
272 | |
273 if reaction.id == 'EX_thbpt_e': | |
274 print(reaction.id) | |
275 print(formula) | |
276 # Dividi in parte sinistra e destra | |
277 if '<=>' in formula: | |
278 left, right = formula.split('<=>') | |
279 reversible = True | |
280 elif '<--' in formula: | |
281 left, right = formula.split('<--') | |
282 reversible = False | |
283 left, right = left, right | |
284 elif '-->' in formula: | |
285 left, right = formula.split('-->') | |
286 reversible = False | |
287 elif '<-' in formula: | |
288 left, right = formula.split('<-') | |
289 reversible = False | |
290 left, right = left, right | |
291 else: | |
292 raise ValueError(f"Formato reazione non riconosciuto: {formula}") | |
293 | |
294 # Parse dei metaboliti e coefficienti | |
295 reactants = parse_metabolites_side(left.strip()) | |
296 products = parse_metabolites_side(right.strip()) | |
297 | |
298 # Aggiungi metaboliti alla reazione | |
299 metabolites_to_add = {} | |
300 | |
301 # Reagenti (coefficienti negativi) | |
302 for met_id, coeff in reactants.items(): | |
303 if met_id in metabolites_dict: | |
304 metabolites_to_add[metabolites_dict[met_id]] = -coeff | |
305 | |
306 # Prodotti (coefficienti positivi) | |
307 for met_id, coeff in products.items(): | |
308 if met_id in metabolites_dict: | |
309 metabolites_to_add[metabolites_dict[met_id]] = coeff | |
310 | |
311 reaction.add_metabolites(metabolites_to_add) | |
312 | |
313 | |
314 def parse_metabolites_side(side_str: str) -> Dict[str, float]: | |
315 """ | |
316 Parsa un lato della reazione per estrarre metaboliti e coefficienti. | |
317 """ | |
318 metabolites = {} | |
319 if not side_str or side_str.strip() == '': | |
320 return metabolites | |
321 | |
322 terms = side_str.split('+') | |
323 for term in terms: | |
324 term = term.strip() | |
325 if not term: | |
326 continue | |
327 | |
328 # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> | |
329 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) | |
330 if match: | |
331 coeff_str, met_id = match.groups() | |
332 coeff = float(coeff_str) if coeff_str else 1.0 | |
333 metabolites[met_id] = coeff | |
334 | |
335 return metabolites | |
336 | |
337 | |
338 | |
339 def set_biomass_objective(model: cobraModel): | |
340 """ | |
341 Imposta la reazione di biomassa come obiettivo. | |
342 """ | |
343 biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()] | |
344 | |
345 if biomass_reactions: | |
346 model.objective = biomass_reactions[0].id | |
347 print(f"Obiettivo impostato su: {biomass_reactions[0].id}") | |
348 else: | |
349 print("Nessuna reazione di biomassa trovata") | |
350 | |
351 | |
352 def set_medium_from_data(model: cobraModel, df: pd.DataFrame): | |
353 """ | |
354 Imposta il medium basato sulla colonna InMedium. | |
355 """ | |
356 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() | |
357 | |
358 medium_dict = {} | |
359 for rxn_id in medium_reactions: | |
360 if rxn_id in [r.id for r in model.reactions]: | |
361 reaction = model.reactions.get_by_id(rxn_id) | |
362 if reaction.lower_bound < 0: # Solo reazioni di uptake | |
363 medium_dict[rxn_id] = abs(reaction.lower_bound) | |
364 | |
365 if medium_dict: | |
366 model.medium = medium_dict | |
367 print(f"Medium impostato con {len(medium_dict)} componenti") | |
368 | |
369 | |
370 def validate_model(model: cobraModel) -> Dict[str, any]: | |
371 """ | |
372 Valida il modello e fornisce statistiche di base. | |
373 """ | |
374 validation = { | |
375 'num_reactions': len(model.reactions), | |
376 'num_metabolites': len(model.metabolites), | |
377 'num_genes': len(model.genes), | |
378 'num_compartments': len(model.compartments), | |
379 'objective': str(model.objective), | |
380 'medium_size': len(model.medium), | |
381 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), | |
382 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), | |
383 } | |
384 | |
385 try: | |
386 # Test di crescita | |
387 solution = model.optimize() | |
388 validation['growth_rate'] = solution.objective_value | |
389 validation['status'] = solution.status | |
390 except Exception as e: | |
391 validation['growth_rate'] = None | |
392 validation['status'] = f"Error: {e}" | |
393 | |
394 return validation | |
395 | |
396 def convert_genes(model,annotation): | |
397 from cobra.manipulation import rename_genes | |
398 model2=model.copy() | |
399 try: | |
400 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} | |
401 except: | |
402 print("No annotation in gene dict!") | |
403 return -1 | |
404 rename_genes(model2,dict_genes) | |
405 | |
406 return model2 |