diff COBRAxy/utils/model_utils.py @ 419:ed2c1f9e20ba draft

Uploaded
author francesco_lapi
date Tue, 09 Sep 2025 09:08:17 +0000
parents 919b5b71a61c
children 00a78da611ba
line wrap: on
line diff
--- a/COBRAxy/utils/model_utils.py	Tue Sep 09 07:36:30 2025 +0000
+++ b/COBRAxy/utils/model_utils.py	Tue Sep 09 09:08:17 2025 +0000
@@ -4,13 +4,16 @@
 import pickle
 import argparse
 import pandas as pd
-from typing import Optional, Tuple, Union, List, Dict
+import re
+from typing import Optional, Tuple, Union, List, Dict, Set
 import utils.general_utils as utils
 import utils.rule_parsing  as rulesUtils
+import utils.reaction_parsing as reactionUtils
+from cobra import Model as cobraModel, Reaction, Metabolite
 
 ################################- DATA GENERATION -################################
 ReactionId = str
-def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
+def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
     """
     Generates a dictionary mapping reaction ids to rules from the model.
 
@@ -34,7 +37,7 @@
         for reaction in model.reactions
         if reaction.gene_reaction_rule }
 
-def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]:
+def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
     """
     Generates a dictionary mapping reaction ids to reaction formulas from the model.
 
@@ -56,7 +59,7 @@
     
     return reactionUtils.create_reaction_dict(unparsedReactions)
 
-def get_medium(model:cobra.Model) -> pd.DataFrame:
+def get_medium(model:cobraModel) -> pd.DataFrame:
     trueMedium=[]
     for r in model.reactions:
         positiveCoeff=0
@@ -70,7 +73,7 @@
     df_medium["reaction"] = trueMedium
     return df_medium
 
-def generate_bounds(model:cobra.Model) -> pd.DataFrame:
+def generate_bounds(model:cobraModel) -> pd.DataFrame:
 
     rxns = []
     for reaction in model.reactions:
@@ -84,7 +87,7 @@
 
 
 
-def generate_compartments(model: cobra.Model) -> pd.DataFrame:
+def generate_compartments(model: cobraModel) -> pd.DataFrame:
     """
     Generates a DataFrame containing compartment information for each reaction.
     Creates columns for each compartment position (Compartment_1, Compartment_2, etc.)
@@ -126,4 +129,278 @@
 
         pathway_data.append(row)
 
-    return pd.DataFrame(pathway_data)
\ No newline at end of file
+    return pd.DataFrame(pathway_data)
+
+
+
+def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
+    """
+    Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
+    
+    Args:
+        csv_path: Path al file CSV (separato da tab)
+        model_id: ID del modello da creare
+        
+    Returns:
+        cobra.Model: Il modello COBRApy costruito
+    """
+    
+    # Leggi i dati dal CSV
+    df = pd.read_csv(csv_path, sep='\t')
+    
+    # Crea il modello vuoto
+    model = cobraModel(model_id)
+    
+    # Dict per tenere traccia di metaboliti e compartimenti
+    metabolites_dict = {}
+    compartments_dict = {}
+    
+    print(f"Costruendo modello da {len(df)} reazioni...")
+    
+    # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
+    for idx, row in df.iterrows():
+        reaction_formula = str(row['Reaction']).strip()
+        if not reaction_formula or reaction_formula == 'nan':
+            continue
+            
+        # Estrai metaboliti dalla formula della reazione
+        metabolites = extract_metabolites_from_reaction(reaction_formula)
+        
+        for met_id in metabolites:
+            compartment = extract_compartment_from_metabolite(met_id)
+            
+            # Aggiungi compartimento se non esiste
+            if compartment not in compartments_dict:
+                compartments_dict[compartment] = compartment
+            
+            # Aggiungi metabolita se non esiste
+            if met_id not in metabolites_dict:
+                metabolites_dict[met_id] = Metabolite(
+                    id=met_id,
+                    compartment=compartment,
+                    name=met_id.replace(f"_{compartment}", "").replace("__", "_")
+                )
+    
+    # Aggiungi compartimenti al modello
+    model.compartments = compartments_dict
+    
+    # Aggiungi metaboliti al modello  
+    model.add_metabolites(list(metabolites_dict.values()))
+    
+    print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
+    
+    # Seconda passata: aggiungi le reazioni
+    reactions_added = 0
+    reactions_skipped = 0
+    
+    for idx, row in df.iterrows():
+
+        reaction_id = str(row['ReactionID']).strip()
+        reaction_formula = str(row['Reaction']).strip()
+        
+        # Salta reazioni senza formula
+        if not reaction_formula or reaction_formula == 'nan':
+            raise ValueError(f"Formula della reazione mancante {reaction_id}")
+        
+        # Crea la reazione
+        reaction = Reaction(reaction_id)
+        reaction.name = reaction_id
+        
+        # Imposta bounds
+        reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
+        reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
+        
+        # Aggiungi gene rule se presente
+        if pd.notna(row['Rule']) and str(row['Rule']).strip():
+            reaction.gene_reaction_rule = str(row['Rule']).strip()
+        
+        # Parse della formula della reazione
+        try:
+            parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
+        except Exception as e:
+            print(f"Errore nel parsing della reazione {reaction_id}: {e}")
+            reactions_skipped += 1
+            continue
+        
+        # Aggiungi la reazione al modello
+        model.add_reactions([reaction])
+        reactions_added += 1
+            
+    
+    print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
+    
+    # Imposta l'obiettivo di biomassa
+    set_biomass_objective(model)
+    
+    # Imposta il medium
+    set_medium_from_data(model, df)
+    
+    print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
+    
+    return model
+
+
+# Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
+def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
+    """
+    Estrae gli ID dei metaboliti da una formula di reazione.
+    Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e)
+    e permette che comincino con cifre o underscore.
+    """
+    metabolites = set()
+    # coefficiente opzionale seguito da un token che termina con _<letters>
+    pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
+    matches = re.findall(pattern, reaction_formula)
+    metabolites.update(matches)
+    return metabolites
+
+
+def extract_compartment_from_metabolite(metabolite_id: str) -> str:
+    """
+    Estrae il compartimento dall'ID del metabolita.
+    """
+    # Il compartimento รจ solitamente l'ultima lettera dopo l'underscore
+    if '_' in metabolite_id:
+        return metabolite_id.split('_')[-1]
+    return 'c'  # default cytoplasm
+
+
+def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
+    """
+    Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
+    """
+
+    if reaction.id == 'EX_thbpt_e':
+        print(reaction.id)
+        print(formula)
+    # Dividi in parte sinistra e destra
+    if '<=>' in formula:
+        left, right = formula.split('<=>')
+        reversible = True
+    elif '<--' in formula:
+        left, right = formula.split('<--')
+        reversible = False
+        left, right = left, right
+    elif '-->' in formula:
+        left, right = formula.split('-->')
+        reversible = False
+    elif '<-' in formula:
+        left, right = formula.split('<-')
+        reversible = False
+        left, right = left, right
+    else:
+        raise ValueError(f"Formato reazione non riconosciuto: {formula}")
+    
+    # Parse dei metaboliti e coefficienti
+    reactants = parse_metabolites_side(left.strip())
+    products = parse_metabolites_side(right.strip())
+    
+    # Aggiungi metaboliti alla reazione
+    metabolites_to_add = {}
+    
+    # Reagenti (coefficienti negativi)
+    for met_id, coeff in reactants.items():
+        if met_id in metabolites_dict:
+            metabolites_to_add[metabolites_dict[met_id]] = -coeff
+    
+    # Prodotti (coefficienti positivi)
+    for met_id, coeff in products.items():
+        if met_id in metabolites_dict:
+            metabolites_to_add[metabolites_dict[met_id]] = coeff
+    
+    reaction.add_metabolites(metabolites_to_add)
+
+
+def parse_metabolites_side(side_str: str) -> Dict[str, float]:
+    """
+    Parsa un lato della reazione per estrarre metaboliti e coefficienti.
+    """
+    metabolites = {}
+    if not side_str or side_str.strip() == '':
+        return metabolites
+
+    terms = side_str.split('+')
+    for term in terms:
+        term = term.strip()
+        if not term:
+            continue
+
+        # pattern allineato: coefficiente opzionale + id che termina con _<compartimento>
+        match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
+        if match:
+            coeff_str, met_id = match.groups()
+            coeff = float(coeff_str) if coeff_str else 1.0
+            metabolites[met_id] = coeff
+
+    return metabolites
+
+
+
+def set_biomass_objective(model: cobraModel):
+    """
+    Imposta la reazione di biomassa come obiettivo.
+    """
+    biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()]
+    
+    if biomass_reactions:
+        model.objective = biomass_reactions[0].id
+        print(f"Obiettivo impostato su: {biomass_reactions[0].id}")
+    else:
+        print("Nessuna reazione di biomassa trovata")
+
+
+def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
+    """
+    Imposta il medium basato sulla colonna InMedium.
+    """
+    medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
+    
+    medium_dict = {}
+    for rxn_id in medium_reactions:
+        if rxn_id in [r.id for r in model.reactions]:
+            reaction = model.reactions.get_by_id(rxn_id)
+            if reaction.lower_bound < 0:  # Solo reazioni di uptake
+                medium_dict[rxn_id] = abs(reaction.lower_bound)
+    
+    if medium_dict:
+        model.medium = medium_dict
+        print(f"Medium impostato con {len(medium_dict)} componenti")
+
+
+def validate_model(model: cobraModel) -> Dict[str, any]:
+    """
+    Valida il modello e fornisce statistiche di base.
+    """
+    validation = {
+        'num_reactions': len(model.reactions),
+        'num_metabolites': len(model.metabolites),
+        'num_genes': len(model.genes),
+        'num_compartments': len(model.compartments),
+        'objective': str(model.objective),
+        'medium_size': len(model.medium),
+        'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
+        'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
+    }
+    
+    try:
+        # Test di crescita
+        solution = model.optimize()
+        validation['growth_rate'] = solution.objective_value
+        validation['status'] = solution.status
+    except Exception as e:
+        validation['growth_rate'] = None
+        validation['status'] = f"Error: {e}"
+    
+    return validation
+
+def convert_genes(model,annotation):
+    from cobra.manipulation import rename_genes
+    model2=model.copy()
+    try:
+        dict_genes={gene.id:gene.notes[annotation]  for gene in model2.genes}
+    except:
+        print("No annotation in gene dict!")
+        return -1
+    rename_genes(model2,dict_genes)
+
+    return model2
\ No newline at end of file