Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/model_utils.py @ 419:ed2c1f9e20ba draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Tue, 09 Sep 2025 09:08:17 +0000 |
| parents | 919b5b71a61c |
| children | 00a78da611ba |
comparison
equal
deleted
inserted
replaced
| 418:919b5b71a61c | 419:ed2c1f9e20ba |
|---|---|
| 2 import csv | 2 import csv |
| 3 import cobra | 3 import cobra |
| 4 import pickle | 4 import pickle |
| 5 import argparse | 5 import argparse |
| 6 import pandas as pd | 6 import pandas as pd |
| 7 from typing import Optional, Tuple, Union, List, Dict | 7 import re |
| 8 from typing import Optional, Tuple, Union, List, Dict, Set | |
| 8 import utils.general_utils as utils | 9 import utils.general_utils as utils |
| 9 import utils.rule_parsing as rulesUtils | 10 import utils.rule_parsing as rulesUtils |
| 11 import utils.reaction_parsing as reactionUtils | |
| 12 from cobra import Model as cobraModel, Reaction, Metabolite | |
| 10 | 13 |
| 11 ################################- DATA GENERATION -################################ | 14 ################################- DATA GENERATION -################################ |
| 12 ReactionId = str | 15 ReactionId = str |
| 13 def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | 16 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: |
| 14 """ | 17 """ |
| 15 Generates a dictionary mapping reaction ids to rules from the model. | 18 Generates a dictionary mapping reaction ids to rules from the model. |
| 16 | 19 |
| 17 Args: | 20 Args: |
| 18 model : the model to derive data from. | 21 model : the model to derive data from. |
| 32 return { | 35 return { |
| 33 reaction.id : ruleExtractor(reaction) | 36 reaction.id : ruleExtractor(reaction) |
| 34 for reaction in model.reactions | 37 for reaction in model.reactions |
| 35 if reaction.gene_reaction_rule } | 38 if reaction.gene_reaction_rule } |
| 36 | 39 |
| 37 def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]: | 40 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: |
| 38 """ | 41 """ |
| 39 Generates a dictionary mapping reaction ids to reaction formulas from the model. | 42 Generates a dictionary mapping reaction ids to reaction formulas from the model. |
| 40 | 43 |
| 41 Args: | 44 Args: |
| 42 model : the model to derive data from. | 45 model : the model to derive data from. |
| 54 | 57 |
| 55 if not asParsed: return unparsedReactions | 58 if not asParsed: return unparsedReactions |
| 56 | 59 |
| 57 return reactionUtils.create_reaction_dict(unparsedReactions) | 60 return reactionUtils.create_reaction_dict(unparsedReactions) |
| 58 | 61 |
| 59 def get_medium(model:cobra.Model) -> pd.DataFrame: | 62 def get_medium(model:cobraModel) -> pd.DataFrame: |
| 60 trueMedium=[] | 63 trueMedium=[] |
| 61 for r in model.reactions: | 64 for r in model.reactions: |
| 62 positiveCoeff=0 | 65 positiveCoeff=0 |
| 63 for m in r.metabolites: | 66 for m in r.metabolites: |
| 64 if r.get_coefficient(m.id)>0: | 67 if r.get_coefficient(m.id)>0: |
| 68 | 71 |
| 69 df_medium = pd.DataFrame() | 72 df_medium = pd.DataFrame() |
| 70 df_medium["reaction"] = trueMedium | 73 df_medium["reaction"] = trueMedium |
| 71 return df_medium | 74 return df_medium |
| 72 | 75 |
| 73 def generate_bounds(model:cobra.Model) -> pd.DataFrame: | 76 def generate_bounds(model:cobraModel) -> pd.DataFrame: |
| 74 | 77 |
| 75 rxns = [] | 78 rxns = [] |
| 76 for reaction in model.reactions: | 79 for reaction in model.reactions: |
| 77 rxns.append(reaction.id) | 80 rxns.append(reaction.id) |
| 78 | 81 |
| 82 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] | 85 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] |
| 83 return bounds | 86 return bounds |
| 84 | 87 |
| 85 | 88 |
| 86 | 89 |
| 87 def generate_compartments(model: cobra.Model) -> pd.DataFrame: | 90 def generate_compartments(model: cobraModel) -> pd.DataFrame: |
| 88 """ | 91 """ |
| 89 Generates a DataFrame containing compartment information for each reaction. | 92 Generates a DataFrame containing compartment information for each reaction. |
| 90 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) | 93 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) |
| 91 | 94 |
| 92 Args: | 95 Args: |
| 125 row[col_name] = None # or "" if you prefer empty strings | 128 row[col_name] = None # or "" if you prefer empty strings |
| 126 | 129 |
| 127 pathway_data.append(row) | 130 pathway_data.append(row) |
| 128 | 131 |
| 129 return pd.DataFrame(pathway_data) | 132 return pd.DataFrame(pathway_data) |
| 133 | |
| 134 | |
| 135 | |
| 136 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: | |
| 137 """ | |
| 138 Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. | |
| 139 | |
| 140 Args: | |
| 141 csv_path: Path al file CSV (separato da tab) | |
| 142 model_id: ID del modello da creare | |
| 143 | |
| 144 Returns: | |
| 145 cobra.Model: Il modello COBRApy costruito | |
| 146 """ | |
| 147 | |
| 148 # Leggi i dati dal CSV | |
| 149 df = pd.read_csv(csv_path, sep='\t') | |
| 150 | |
| 151 # Crea il modello vuoto | |
| 152 model = cobraModel(model_id) | |
| 153 | |
| 154 # Dict per tenere traccia di metaboliti e compartimenti | |
| 155 metabolites_dict = {} | |
| 156 compartments_dict = {} | |
| 157 | |
| 158 print(f"Costruendo modello da {len(df)} reazioni...") | |
| 159 | |
| 160 # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni | |
| 161 for idx, row in df.iterrows(): | |
| 162 reaction_formula = str(row['Reaction']).strip() | |
| 163 if not reaction_formula or reaction_formula == 'nan': | |
| 164 continue | |
| 165 | |
| 166 # Estrai metaboliti dalla formula della reazione | |
| 167 metabolites = extract_metabolites_from_reaction(reaction_formula) | |
| 168 | |
| 169 for met_id in metabolites: | |
| 170 compartment = extract_compartment_from_metabolite(met_id) | |
| 171 | |
| 172 # Aggiungi compartimento se non esiste | |
| 173 if compartment not in compartments_dict: | |
| 174 compartments_dict[compartment] = compartment | |
| 175 | |
| 176 # Aggiungi metabolita se non esiste | |
| 177 if met_id not in metabolites_dict: | |
| 178 metabolites_dict[met_id] = Metabolite( | |
| 179 id=met_id, | |
| 180 compartment=compartment, | |
| 181 name=met_id.replace(f"_{compartment}", "").replace("__", "_") | |
| 182 ) | |
| 183 | |
| 184 # Aggiungi compartimenti al modello | |
| 185 model.compartments = compartments_dict | |
| 186 | |
| 187 # Aggiungi metaboliti al modello | |
| 188 model.add_metabolites(list(metabolites_dict.values())) | |
| 189 | |
| 190 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") | |
| 191 | |
| 192 # Seconda passata: aggiungi le reazioni | |
| 193 reactions_added = 0 | |
| 194 reactions_skipped = 0 | |
| 195 | |
| 196 for idx, row in df.iterrows(): | |
| 197 | |
| 198 reaction_id = str(row['ReactionID']).strip() | |
| 199 reaction_formula = str(row['Reaction']).strip() | |
| 200 | |
| 201 # Salta reazioni senza formula | |
| 202 if not reaction_formula or reaction_formula == 'nan': | |
| 203 raise ValueError(f"Formula della reazione mancante {reaction_id}") | |
| 204 | |
| 205 # Crea la reazione | |
| 206 reaction = Reaction(reaction_id) | |
| 207 reaction.name = reaction_id | |
| 208 | |
| 209 # Imposta bounds | |
| 210 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 | |
| 211 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 | |
| 212 | |
| 213 # Aggiungi gene rule se presente | |
| 214 if pd.notna(row['Rule']) and str(row['Rule']).strip(): | |
| 215 reaction.gene_reaction_rule = str(row['Rule']).strip() | |
| 216 | |
| 217 # Parse della formula della reazione | |
| 218 try: | |
| 219 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) | |
| 220 except Exception as e: | |
| 221 print(f"Errore nel parsing della reazione {reaction_id}: {e}") | |
| 222 reactions_skipped += 1 | |
| 223 continue | |
| 224 | |
| 225 # Aggiungi la reazione al modello | |
| 226 model.add_reactions([reaction]) | |
| 227 reactions_added += 1 | |
| 228 | |
| 229 | |
| 230 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") | |
| 231 | |
| 232 # Imposta l'obiettivo di biomassa | |
| 233 set_biomass_objective(model) | |
| 234 | |
| 235 # Imposta il medium | |
| 236 set_medium_from_data(model, df) | |
| 237 | |
| 238 print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") | |
| 239 | |
| 240 return model | |
| 241 | |
| 242 | |
| 243 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) | |
| 244 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: | |
| 245 """ | |
| 246 Estrae gli ID dei metaboliti da una formula di reazione. | |
| 247 Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) | |
| 248 e permette che comincino con cifre o underscore. | |
| 249 """ | |
| 250 metabolites = set() | |
| 251 # coefficiente opzionale seguito da un token che termina con _<letters> | |
| 252 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' | |
| 253 matches = re.findall(pattern, reaction_formula) | |
| 254 metabolites.update(matches) | |
| 255 return metabolites | |
| 256 | |
| 257 | |
| 258 def extract_compartment_from_metabolite(metabolite_id: str) -> str: | |
| 259 """ | |
| 260 Estrae il compartimento dall'ID del metabolita. | |
| 261 """ | |
| 262 # Il compartimento รจ solitamente l'ultima lettera dopo l'underscore | |
| 263 if '_' in metabolite_id: | |
| 264 return metabolite_id.split('_')[-1] | |
| 265 return 'c' # default cytoplasm | |
| 266 | |
| 267 | |
| 268 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): | |
| 269 """ | |
| 270 Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. | |
| 271 """ | |
| 272 | |
| 273 if reaction.id == 'EX_thbpt_e': | |
| 274 print(reaction.id) | |
| 275 print(formula) | |
| 276 # Dividi in parte sinistra e destra | |
| 277 if '<=>' in formula: | |
| 278 left, right = formula.split('<=>') | |
| 279 reversible = True | |
| 280 elif '<--' in formula: | |
| 281 left, right = formula.split('<--') | |
| 282 reversible = False | |
| 283 left, right = left, right | |
| 284 elif '-->' in formula: | |
| 285 left, right = formula.split('-->') | |
| 286 reversible = False | |
| 287 elif '<-' in formula: | |
| 288 left, right = formula.split('<-') | |
| 289 reversible = False | |
| 290 left, right = left, right | |
| 291 else: | |
| 292 raise ValueError(f"Formato reazione non riconosciuto: {formula}") | |
| 293 | |
| 294 # Parse dei metaboliti e coefficienti | |
| 295 reactants = parse_metabolites_side(left.strip()) | |
| 296 products = parse_metabolites_side(right.strip()) | |
| 297 | |
| 298 # Aggiungi metaboliti alla reazione | |
| 299 metabolites_to_add = {} | |
| 300 | |
| 301 # Reagenti (coefficienti negativi) | |
| 302 for met_id, coeff in reactants.items(): | |
| 303 if met_id in metabolites_dict: | |
| 304 metabolites_to_add[metabolites_dict[met_id]] = -coeff | |
| 305 | |
| 306 # Prodotti (coefficienti positivi) | |
| 307 for met_id, coeff in products.items(): | |
| 308 if met_id in metabolites_dict: | |
| 309 metabolites_to_add[metabolites_dict[met_id]] = coeff | |
| 310 | |
| 311 reaction.add_metabolites(metabolites_to_add) | |
| 312 | |
| 313 | |
| 314 def parse_metabolites_side(side_str: str) -> Dict[str, float]: | |
| 315 """ | |
| 316 Parsa un lato della reazione per estrarre metaboliti e coefficienti. | |
| 317 """ | |
| 318 metabolites = {} | |
| 319 if not side_str or side_str.strip() == '': | |
| 320 return metabolites | |
| 321 | |
| 322 terms = side_str.split('+') | |
| 323 for term in terms: | |
| 324 term = term.strip() | |
| 325 if not term: | |
| 326 continue | |
| 327 | |
| 328 # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> | |
| 329 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) | |
| 330 if match: | |
| 331 coeff_str, met_id = match.groups() | |
| 332 coeff = float(coeff_str) if coeff_str else 1.0 | |
| 333 metabolites[met_id] = coeff | |
| 334 | |
| 335 return metabolites | |
| 336 | |
| 337 | |
| 338 | |
| 339 def set_biomass_objective(model: cobraModel): | |
| 340 """ | |
| 341 Imposta la reazione di biomassa come obiettivo. | |
| 342 """ | |
| 343 biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()] | |
| 344 | |
| 345 if biomass_reactions: | |
| 346 model.objective = biomass_reactions[0].id | |
| 347 print(f"Obiettivo impostato su: {biomass_reactions[0].id}") | |
| 348 else: | |
| 349 print("Nessuna reazione di biomassa trovata") | |
| 350 | |
| 351 | |
| 352 def set_medium_from_data(model: cobraModel, df: pd.DataFrame): | |
| 353 """ | |
| 354 Imposta il medium basato sulla colonna InMedium. | |
| 355 """ | |
| 356 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() | |
| 357 | |
| 358 medium_dict = {} | |
| 359 for rxn_id in medium_reactions: | |
| 360 if rxn_id in [r.id for r in model.reactions]: | |
| 361 reaction = model.reactions.get_by_id(rxn_id) | |
| 362 if reaction.lower_bound < 0: # Solo reazioni di uptake | |
| 363 medium_dict[rxn_id] = abs(reaction.lower_bound) | |
| 364 | |
| 365 if medium_dict: | |
| 366 model.medium = medium_dict | |
| 367 print(f"Medium impostato con {len(medium_dict)} componenti") | |
| 368 | |
| 369 | |
| 370 def validate_model(model: cobraModel) -> Dict[str, any]: | |
| 371 """ | |
| 372 Valida il modello e fornisce statistiche di base. | |
| 373 """ | |
| 374 validation = { | |
| 375 'num_reactions': len(model.reactions), | |
| 376 'num_metabolites': len(model.metabolites), | |
| 377 'num_genes': len(model.genes), | |
| 378 'num_compartments': len(model.compartments), | |
| 379 'objective': str(model.objective), | |
| 380 'medium_size': len(model.medium), | |
| 381 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), | |
| 382 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), | |
| 383 } | |
| 384 | |
| 385 try: | |
| 386 # Test di crescita | |
| 387 solution = model.optimize() | |
| 388 validation['growth_rate'] = solution.objective_value | |
| 389 validation['status'] = solution.status | |
| 390 except Exception as e: | |
| 391 validation['growth_rate'] = None | |
| 392 validation['status'] = f"Error: {e}" | |
| 393 | |
| 394 return validation | |
| 395 | |
| 396 def convert_genes(model,annotation): | |
| 397 from cobra.manipulation import rename_genes | |
| 398 model2=model.copy() | |
| 399 try: | |
| 400 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} | |
| 401 except: | |
| 402 print("No annotation in gene dict!") | |
| 403 return -1 | |
| 404 rename_genes(model2,dict_genes) | |
| 405 | |
| 406 return model2 |
