| 
418
 | 
     1 import os
 | 
| 
 | 
     2 import csv
 | 
| 
 | 
     3 import cobra
 | 
| 
 | 
     4 import pickle
 | 
| 
 | 
     5 import argparse
 | 
| 
 | 
     6 import pandas as pd
 | 
| 
419
 | 
     7 import re
 | 
| 
426
 | 
     8 import logging
 | 
| 
419
 | 
     9 from typing import Optional, Tuple, Union, List, Dict, Set
 | 
| 
426
 | 
    10 from collections import defaultdict
 | 
| 
418
 | 
    11 import utils.general_utils as utils
 | 
| 
 | 
    12 import utils.rule_parsing  as rulesUtils
 | 
| 
419
 | 
    13 import utils.reaction_parsing as reactionUtils
 | 
| 
 | 
    14 from cobra import Model as cobraModel, Reaction, Metabolite
 | 
| 
418
 | 
    15 
 | 
| 
 | 
    16 ################################- DATA GENERATION -################################
 | 
| 
 | 
    17 ReactionId = str
 | 
| 
419
 | 
    18 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
 | 
| 
418
 | 
    19     """
 | 
| 
 | 
    20     Generates a dictionary mapping reaction ids to rules from the model.
 | 
| 
 | 
    21 
 | 
| 
 | 
    22     Args:
 | 
| 
 | 
    23         model : the model to derive data from.
 | 
| 
 | 
    24         asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings.
 | 
| 
 | 
    25 
 | 
| 
 | 
    26     Returns:
 | 
| 
 | 
    27         Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules.
 | 
| 
 | 
    28         Dict[ReactionId, str] : the generated dictionary of raw rules.
 | 
| 
 | 
    29     """
 | 
| 
 | 
    30     # Is the below approach convoluted? yes
 | 
| 
 | 
    31     # Ok but is it inefficient? probably
 | 
| 
 | 
    32     # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane)
 | 
| 
 | 
    33     _ruleGetter   =  lambda reaction : reaction.gene_reaction_rule
 | 
| 
 | 
    34     ruleExtractor = (lambda reaction :
 | 
| 
 | 
    35         rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
 | 
| 
 | 
    36 
 | 
| 
 | 
    37     return {
 | 
| 
 | 
    38         reaction.id : ruleExtractor(reaction)
 | 
| 
 | 
    39         for reaction in model.reactions
 | 
| 
 | 
    40         if reaction.gene_reaction_rule }
 | 
| 
 | 
    41 
 | 
| 
419
 | 
    42 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
 | 
| 
418
 | 
    43     """
 | 
| 
 | 
    44     Generates a dictionary mapping reaction ids to reaction formulas from the model.
 | 
| 
 | 
    45 
 | 
| 
 | 
    46     Args:
 | 
| 
 | 
    47         model : the model to derive data from.
 | 
| 
 | 
    48         asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are.
 | 
| 
 | 
    49 
 | 
| 
 | 
    50     Returns:
 | 
| 
 | 
    51         Dict[ReactionId, str] : the generated dictionary.
 | 
| 
 | 
    52     """
 | 
| 
 | 
    53 
 | 
| 
 | 
    54     unparsedReactions = {
 | 
| 
 | 
    55         reaction.id : reaction.reaction
 | 
| 
 | 
    56         for reaction in model.reactions
 | 
| 
 | 
    57         if reaction.reaction 
 | 
| 
 | 
    58     }
 | 
| 
 | 
    59 
 | 
| 
 | 
    60     if not asParsed: return unparsedReactions
 | 
| 
 | 
    61     
 | 
| 
 | 
    62     return reactionUtils.create_reaction_dict(unparsedReactions)
 | 
| 
 | 
    63 
 | 
| 
419
 | 
    64 def get_medium(model:cobraModel) -> pd.DataFrame:
 | 
| 
418
 | 
    65     trueMedium=[]
 | 
| 
 | 
    66     for r in model.reactions:
 | 
| 
 | 
    67         positiveCoeff=0
 | 
| 
 | 
    68         for m in r.metabolites:
 | 
| 
 | 
    69             if r.get_coefficient(m.id)>0:
 | 
| 
 | 
    70                 positiveCoeff=1;
 | 
| 
 | 
    71         if (positiveCoeff==0 and r.lower_bound<0):
 | 
| 
 | 
    72             trueMedium.append(r.id)
 | 
| 
 | 
    73 
 | 
| 
 | 
    74     df_medium = pd.DataFrame()
 | 
| 
 | 
    75     df_medium["reaction"] = trueMedium
 | 
| 
 | 
    76     return df_medium
 | 
| 
 | 
    77 
 | 
| 
426
 | 
    78 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
 | 
| 
 | 
    79     """
 | 
| 
 | 
    80     Estrae i coefficienti della funzione obiettivo per ciascuna reazione del modello.
 | 
| 
 | 
    81     
 | 
| 
 | 
    82     Args:
 | 
| 
 | 
    83         model : cobra.Model
 | 
| 
 | 
    84         
 | 
| 
 | 
    85     Returns:
 | 
| 
 | 
    86         pd.DataFrame con colonne: ReactionID, ObjectiveCoefficient
 | 
| 
 | 
    87     """
 | 
| 
 | 
    88     coeffs = []
 | 
| 
 | 
    89     # model.objective.expression รจ un'espressione lineare
 | 
| 
 | 
    90     objective_expr = model.objective.expression.as_coefficients_dict()
 | 
| 
 | 
    91     
 | 
| 
 | 
    92     for reaction in model.reactions:
 | 
| 
 | 
    93         coeff = objective_expr.get(reaction.forward_variable, 0.0)
 | 
| 
 | 
    94         coeffs.append({
 | 
| 
 | 
    95             "ReactionID": reaction.id,
 | 
| 
 | 
    96             "ObjectiveCoefficient": coeff
 | 
| 
 | 
    97         })
 | 
| 
 | 
    98     
 | 
| 
 | 
    99     return pd.DataFrame(coeffs)
 | 
| 
 | 
   100 
 | 
| 
419
 | 
   101 def generate_bounds(model:cobraModel) -> pd.DataFrame:
 | 
| 
418
 | 
   102 
 | 
| 
 | 
   103     rxns = []
 | 
| 
 | 
   104     for reaction in model.reactions:
 | 
| 
 | 
   105         rxns.append(reaction.id)
 | 
| 
 | 
   106 
 | 
| 
 | 
   107     bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns)
 | 
| 
 | 
   108 
 | 
| 
 | 
   109     for reaction in model.reactions:
 | 
| 
 | 
   110         bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
 | 
| 
 | 
   111     return bounds
 | 
| 
 | 
   112 
 | 
| 
 | 
   113 
 | 
| 
 | 
   114 
 | 
| 
419
 | 
   115 def generate_compartments(model: cobraModel) -> pd.DataFrame:
 | 
| 
418
 | 
   116     """
 | 
| 
 | 
   117     Generates a DataFrame containing compartment information for each reaction.
 | 
| 
 | 
   118     Creates columns for each compartment position (Compartment_1, Compartment_2, etc.)
 | 
| 
 | 
   119     
 | 
| 
 | 
   120     Args:
 | 
| 
 | 
   121         model: the COBRA model to extract compartment data from.
 | 
| 
 | 
   122         
 | 
| 
 | 
   123     Returns:
 | 
| 
 | 
   124         pd.DataFrame: DataFrame with ReactionID and compartment columns
 | 
| 
 | 
   125     """
 | 
| 
 | 
   126     pathway_data = []
 | 
| 
 | 
   127 
 | 
| 
 | 
   128     # First pass: determine the maximum number of pathways any reaction has
 | 
| 
 | 
   129     max_pathways = 0
 | 
| 
 | 
   130     reaction_pathways = {}
 | 
| 
 | 
   131 
 | 
| 
 | 
   132     for reaction in model.reactions:
 | 
| 
 | 
   133         # Get unique pathways from all metabolites in the reaction
 | 
| 
 | 
   134         if type(reaction.annotation['pathways']) == list:
 | 
| 
 | 
   135             reaction_pathways[reaction.id] = reaction.annotation['pathways']
 | 
| 
 | 
   136             max_pathways = max(max_pathways, len(reaction.annotation['pathways']))
 | 
| 
 | 
   137         else:
 | 
| 
 | 
   138             reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
 | 
| 
 | 
   139 
 | 
| 
 | 
   140     # Create column names for pathways
 | 
| 
 | 
   141     pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
 | 
| 
 | 
   142 
 | 
| 
 | 
   143     # Second pass: create the data
 | 
| 
 | 
   144     for reaction_id, pathways in reaction_pathways.items():
 | 
| 
 | 
   145         row = {"ReactionID": reaction_id}
 | 
| 
 | 
   146         
 | 
| 
 | 
   147         # Fill pathway columns
 | 
| 
 | 
   148         for i in range(max_pathways):
 | 
| 
 | 
   149             col_name = pathway_columns[i]
 | 
| 
 | 
   150             if i < len(pathways):
 | 
| 
 | 
   151                 row[col_name] = pathways[i]
 | 
| 
 | 
   152             else:
 | 
| 
 | 
   153                 row[col_name] = None  # or "" if you prefer empty strings
 | 
| 
 | 
   154 
 | 
| 
 | 
   155         pathway_data.append(row)
 | 
| 
 | 
   156 
 | 
| 
419
 | 
   157     return pd.DataFrame(pathway_data)
 | 
| 
 | 
   158 
 | 
| 
 | 
   159 
 | 
| 
 | 
   160 
 | 
| 
 | 
   161 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
 | 
| 
 | 
   162     """
 | 
| 
 | 
   163     Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
 | 
| 
 | 
   164     
 | 
| 
 | 
   165     Args:
 | 
| 
 | 
   166         csv_path: Path al file CSV (separato da tab)
 | 
| 
 | 
   167         model_id: ID del modello da creare
 | 
| 
 | 
   168         
 | 
| 
 | 
   169     Returns:
 | 
| 
 | 
   170         cobra.Model: Il modello COBRApy costruito
 | 
| 
 | 
   171     """
 | 
| 
 | 
   172     
 | 
| 
 | 
   173     # Leggi i dati dal CSV
 | 
| 
 | 
   174     df = pd.read_csv(csv_path, sep='\t')
 | 
| 
 | 
   175     
 | 
| 
 | 
   176     # Crea il modello vuoto
 | 
| 
 | 
   177     model = cobraModel(model_id)
 | 
| 
 | 
   178     
 | 
| 
 | 
   179     # Dict per tenere traccia di metaboliti e compartimenti
 | 
| 
 | 
   180     metabolites_dict = {}
 | 
| 
 | 
   181     compartments_dict = {}
 | 
| 
 | 
   182     
 | 
| 
 | 
   183     print(f"Costruendo modello da {len(df)} reazioni...")
 | 
| 
 | 
   184     
 | 
| 
 | 
   185     # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
 | 
| 
 | 
   186     for idx, row in df.iterrows():
 | 
| 
 | 
   187         reaction_formula = str(row['Reaction']).strip()
 | 
| 
 | 
   188         if not reaction_formula or reaction_formula == 'nan':
 | 
| 
 | 
   189             continue
 | 
| 
 | 
   190             
 | 
| 
 | 
   191         # Estrai metaboliti dalla formula della reazione
 | 
| 
 | 
   192         metabolites = extract_metabolites_from_reaction(reaction_formula)
 | 
| 
 | 
   193         
 | 
| 
 | 
   194         for met_id in metabolites:
 | 
| 
 | 
   195             compartment = extract_compartment_from_metabolite(met_id)
 | 
| 
 | 
   196             
 | 
| 
 | 
   197             # Aggiungi compartimento se non esiste
 | 
| 
 | 
   198             if compartment not in compartments_dict:
 | 
| 
 | 
   199                 compartments_dict[compartment] = compartment
 | 
| 
 | 
   200             
 | 
| 
 | 
   201             # Aggiungi metabolita se non esiste
 | 
| 
 | 
   202             if met_id not in metabolites_dict:
 | 
| 
 | 
   203                 metabolites_dict[met_id] = Metabolite(
 | 
| 
 | 
   204                     id=met_id,
 | 
| 
 | 
   205                     compartment=compartment,
 | 
| 
 | 
   206                     name=met_id.replace(f"_{compartment}", "").replace("__", "_")
 | 
| 
 | 
   207                 )
 | 
| 
 | 
   208     
 | 
| 
 | 
   209     # Aggiungi compartimenti al modello
 | 
| 
 | 
   210     model.compartments = compartments_dict
 | 
| 
 | 
   211     
 | 
| 
 | 
   212     # Aggiungi metaboliti al modello  
 | 
| 
 | 
   213     model.add_metabolites(list(metabolites_dict.values()))
 | 
| 
 | 
   214     
 | 
| 
 | 
   215     print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
 | 
| 
 | 
   216     
 | 
| 
 | 
   217     # Seconda passata: aggiungi le reazioni
 | 
| 
 | 
   218     reactions_added = 0
 | 
| 
 | 
   219     reactions_skipped = 0
 | 
| 
 | 
   220     
 | 
| 
 | 
   221     for idx, row in df.iterrows():
 | 
| 
 | 
   222 
 | 
| 
 | 
   223         reaction_id = str(row['ReactionID']).strip()
 | 
| 
427
 | 
   224         reaction_formula = str(row['Formula']).strip()
 | 
| 
419
 | 
   225         
 | 
| 
 | 
   226         # Salta reazioni senza formula
 | 
| 
 | 
   227         if not reaction_formula or reaction_formula == 'nan':
 | 
| 
 | 
   228             raise ValueError(f"Formula della reazione mancante {reaction_id}")
 | 
| 
 | 
   229         
 | 
| 
 | 
   230         # Crea la reazione
 | 
| 
 | 
   231         reaction = Reaction(reaction_id)
 | 
| 
 | 
   232         reaction.name = reaction_id
 | 
| 
 | 
   233         
 | 
| 
 | 
   234         # Imposta bounds
 | 
| 
 | 
   235         reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
 | 
| 
 | 
   236         reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
 | 
| 
 | 
   237         
 | 
| 
 | 
   238         # Aggiungi gene rule se presente
 | 
| 
427
 | 
   239         if pd.notna(row['GPR']) and str(row['GPR']).strip():
 | 
| 
 | 
   240             reaction.gene_reaction_rule = str(row['GPR']).strip()
 | 
| 
419
 | 
   241         
 | 
| 
 | 
   242         # Parse della formula della reazione
 | 
| 
 | 
   243         try:
 | 
| 
 | 
   244             parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
 | 
| 
 | 
   245         except Exception as e:
 | 
| 
 | 
   246             print(f"Errore nel parsing della reazione {reaction_id}: {e}")
 | 
| 
 | 
   247             reactions_skipped += 1
 | 
| 
 | 
   248             continue
 | 
| 
 | 
   249         
 | 
| 
 | 
   250         # Aggiungi la reazione al modello
 | 
| 
 | 
   251         model.add_reactions([reaction])
 | 
| 
 | 
   252         reactions_added += 1
 | 
| 
 | 
   253             
 | 
| 
 | 
   254     
 | 
| 
 | 
   255     print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
 | 
| 
 | 
   256     
 | 
| 
 | 
   257     # Imposta l'obiettivo di biomassa
 | 
| 
 | 
   258     set_biomass_objective(model)
 | 
| 
 | 
   259     
 | 
| 
 | 
   260     # Imposta il medium
 | 
| 
 | 
   261     set_medium_from_data(model, df)
 | 
| 
 | 
   262     
 | 
| 
 | 
   263     print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
 | 
| 
 | 
   264     
 | 
| 
 | 
   265     return model
 | 
| 
 | 
   266 
 | 
| 
 | 
   267 
 | 
| 
 | 
   268 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
 | 
| 
 | 
   269 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
 | 
| 
 | 
   270     """
 | 
| 
 | 
   271     Estrae gli ID dei metaboliti da una formula di reazione.
 | 
| 
 | 
   272     Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e)
 | 
| 
 | 
   273     e permette che comincino con cifre o underscore.
 | 
| 
 | 
   274     """
 | 
| 
 | 
   275     metabolites = set()
 | 
| 
 | 
   276     # coefficiente opzionale seguito da un token che termina con _<letters>
 | 
| 
 | 
   277     pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
 | 
| 
 | 
   278     matches = re.findall(pattern, reaction_formula)
 | 
| 
 | 
   279     metabolites.update(matches)
 | 
| 
 | 
   280     return metabolites
 | 
| 
 | 
   281 
 | 
| 
 | 
   282 
 | 
| 
 | 
   283 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
 | 
| 
 | 
   284     """
 | 
| 
 | 
   285     Estrae il compartimento dall'ID del metabolita.
 | 
| 
 | 
   286     """
 | 
| 
 | 
   287     # Il compartimento รจ solitamente l'ultima lettera dopo l'underscore
 | 
| 
 | 
   288     if '_' in metabolite_id:
 | 
| 
 | 
   289         return metabolite_id.split('_')[-1]
 | 
| 
 | 
   290     return 'c'  # default cytoplasm
 | 
| 
 | 
   291 
 | 
| 
 | 
   292 
 | 
| 
 | 
   293 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
 | 
| 
 | 
   294     """
 | 
| 
 | 
   295     Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
 | 
| 
 | 
   296     """
 | 
| 
 | 
   297 
 | 
| 
 | 
   298     if reaction.id == 'EX_thbpt_e':
 | 
| 
 | 
   299         print(reaction.id)
 | 
| 
 | 
   300         print(formula)
 | 
| 
 | 
   301     # Dividi in parte sinistra e destra
 | 
| 
 | 
   302     if '<=>' in formula:
 | 
| 
 | 
   303         left, right = formula.split('<=>')
 | 
| 
 | 
   304         reversible = True
 | 
| 
 | 
   305     elif '<--' in formula:
 | 
| 
 | 
   306         left, right = formula.split('<--')
 | 
| 
 | 
   307         reversible = False
 | 
| 
 | 
   308         left, right = left, right
 | 
| 
 | 
   309     elif '-->' in formula:
 | 
| 
 | 
   310         left, right = formula.split('-->')
 | 
| 
 | 
   311         reversible = False
 | 
| 
 | 
   312     elif '<-' in formula:
 | 
| 
 | 
   313         left, right = formula.split('<-')
 | 
| 
 | 
   314         reversible = False
 | 
| 
 | 
   315         left, right = left, right
 | 
| 
 | 
   316     else:
 | 
| 
 | 
   317         raise ValueError(f"Formato reazione non riconosciuto: {formula}")
 | 
| 
 | 
   318     
 | 
| 
 | 
   319     # Parse dei metaboliti e coefficienti
 | 
| 
 | 
   320     reactants = parse_metabolites_side(left.strip())
 | 
| 
 | 
   321     products = parse_metabolites_side(right.strip())
 | 
| 
 | 
   322     
 | 
| 
 | 
   323     # Aggiungi metaboliti alla reazione
 | 
| 
 | 
   324     metabolites_to_add = {}
 | 
| 
 | 
   325     
 | 
| 
 | 
   326     # Reagenti (coefficienti negativi)
 | 
| 
 | 
   327     for met_id, coeff in reactants.items():
 | 
| 
 | 
   328         if met_id in metabolites_dict:
 | 
| 
 | 
   329             metabolites_to_add[metabolites_dict[met_id]] = -coeff
 | 
| 
 | 
   330     
 | 
| 
 | 
   331     # Prodotti (coefficienti positivi)
 | 
| 
 | 
   332     for met_id, coeff in products.items():
 | 
| 
 | 
   333         if met_id in metabolites_dict:
 | 
| 
 | 
   334             metabolites_to_add[metabolites_dict[met_id]] = coeff
 | 
| 
 | 
   335     
 | 
| 
 | 
   336     reaction.add_metabolites(metabolites_to_add)
 | 
| 
 | 
   337 
 | 
| 
 | 
   338 
 | 
| 
 | 
   339 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
 | 
| 
 | 
   340     """
 | 
| 
 | 
   341     Parsa un lato della reazione per estrarre metaboliti e coefficienti.
 | 
| 
 | 
   342     """
 | 
| 
 | 
   343     metabolites = {}
 | 
| 
 | 
   344     if not side_str or side_str.strip() == '':
 | 
| 
 | 
   345         return metabolites
 | 
| 
 | 
   346 
 | 
| 
 | 
   347     terms = side_str.split('+')
 | 
| 
 | 
   348     for term in terms:
 | 
| 
 | 
   349         term = term.strip()
 | 
| 
 | 
   350         if not term:
 | 
| 
 | 
   351             continue
 | 
| 
 | 
   352 
 | 
| 
 | 
   353         # pattern allineato: coefficiente opzionale + id che termina con _<compartimento>
 | 
| 
 | 
   354         match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
 | 
| 
 | 
   355         if match:
 | 
| 
 | 
   356             coeff_str, met_id = match.groups()
 | 
| 
 | 
   357             coeff = float(coeff_str) if coeff_str else 1.0
 | 
| 
 | 
   358             metabolites[met_id] = coeff
 | 
| 
 | 
   359 
 | 
| 
 | 
   360     return metabolites
 | 
| 
 | 
   361 
 | 
| 
 | 
   362 
 | 
| 
 | 
   363 
 | 
| 
 | 
   364 def set_biomass_objective(model: cobraModel):
 | 
| 
 | 
   365     """
 | 
| 
 | 
   366     Imposta la reazione di biomassa come obiettivo.
 | 
| 
 | 
   367     """
 | 
| 
 | 
   368     biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()]
 | 
| 
 | 
   369     
 | 
| 
 | 
   370     if biomass_reactions:
 | 
| 
 | 
   371         model.objective = biomass_reactions[0].id
 | 
| 
 | 
   372         print(f"Obiettivo impostato su: {biomass_reactions[0].id}")
 | 
| 
 | 
   373     else:
 | 
| 
 | 
   374         print("Nessuna reazione di biomassa trovata")
 | 
| 
 | 
   375 
 | 
| 
 | 
   376 
 | 
| 
 | 
   377 def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
 | 
| 
 | 
   378     """
 | 
| 
 | 
   379     Imposta il medium basato sulla colonna InMedium.
 | 
| 
 | 
   380     """
 | 
| 
 | 
   381     medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
 | 
| 
 | 
   382     
 | 
| 
 | 
   383     medium_dict = {}
 | 
| 
 | 
   384     for rxn_id in medium_reactions:
 | 
| 
 | 
   385         if rxn_id in [r.id for r in model.reactions]:
 | 
| 
 | 
   386             reaction = model.reactions.get_by_id(rxn_id)
 | 
| 
 | 
   387             if reaction.lower_bound < 0:  # Solo reazioni di uptake
 | 
| 
 | 
   388                 medium_dict[rxn_id] = abs(reaction.lower_bound)
 | 
| 
 | 
   389     
 | 
| 
 | 
   390     if medium_dict:
 | 
| 
 | 
   391         model.medium = medium_dict
 | 
| 
 | 
   392         print(f"Medium impostato con {len(medium_dict)} componenti")
 | 
| 
 | 
   393 
 | 
| 
 | 
   394 
 | 
| 
 | 
   395 def validate_model(model: cobraModel) -> Dict[str, any]:
 | 
| 
 | 
   396     """
 | 
| 
 | 
   397     Valida il modello e fornisce statistiche di base.
 | 
| 
 | 
   398     """
 | 
| 
 | 
   399     validation = {
 | 
| 
 | 
   400         'num_reactions': len(model.reactions),
 | 
| 
 | 
   401         'num_metabolites': len(model.metabolites),
 | 
| 
 | 
   402         'num_genes': len(model.genes),
 | 
| 
 | 
   403         'num_compartments': len(model.compartments),
 | 
| 
 | 
   404         'objective': str(model.objective),
 | 
| 
 | 
   405         'medium_size': len(model.medium),
 | 
| 
 | 
   406         'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
 | 
| 
 | 
   407         'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
 | 
| 
 | 
   408     }
 | 
| 
 | 
   409     
 | 
| 
 | 
   410     try:
 | 
| 
 | 
   411         # Test di crescita
 | 
| 
 | 
   412         solution = model.optimize()
 | 
| 
 | 
   413         validation['growth_rate'] = solution.objective_value
 | 
| 
 | 
   414         validation['status'] = solution.status
 | 
| 
 | 
   415     except Exception as e:
 | 
| 
 | 
   416         validation['growth_rate'] = None
 | 
| 
 | 
   417         validation['status'] = f"Error: {e}"
 | 
| 
 | 
   418     
 | 
| 
 | 
   419     return validation
 | 
| 
 | 
   420 
 | 
| 
 | 
   421 def convert_genes(model,annotation):
 | 
| 
 | 
   422     from cobra.manipulation import rename_genes
 | 
| 
 | 
   423     model2=model.copy()
 | 
| 
 | 
   424     try:
 | 
| 
 | 
   425         dict_genes={gene.id:gene.notes[annotation]  for gene in model2.genes}
 | 
| 
 | 
   426     except:
 | 
| 
 | 
   427         print("No annotation in gene dict!")
 | 
| 
 | 
   428         return -1
 | 
| 
 | 
   429     rename_genes(model2,dict_genes)
 | 
| 
 | 
   430 
 | 
| 
426
 | 
   431     return model2
 | 
| 
 | 
   432 
 | 
| 
 | 
   433 
 | 
| 
 | 
   434 
 | 
| 
 | 
   435 # ---------- Utility helpers ----------
 | 
| 
 | 
   436 def _normalize_colname(col: str) -> str:
 | 
| 
 | 
   437     return col.strip().lower().replace(' ', '_')
 | 
| 
 | 
   438 
 | 
| 
 | 
   439 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
 | 
| 
 | 
   440     """
 | 
| 
 | 
   441     Cerca colonne utili e ritorna dict {ensg: colname1, hgnc_id: colname2, ...}
 | 
| 
 | 
   442     Lancia ValueError se non trova almeno un mapping utile.
 | 
| 
 | 
   443     """
 | 
| 
 | 
   444     cols = { _normalize_colname(c): c for c in mapping_df.columns }
 | 
| 
 | 
   445     chosen = {}
 | 
| 
 | 
   446     # possibili nomi per ciascuna categoria
 | 
| 
 | 
   447     candidates = {
 | 
| 
 | 
   448         'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
 | 
| 
 | 
   449         'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
 | 
| 
 | 
   450         'hgnc_symbol': ['hgnc_symbol', 'hgnc_symbol', 'symbol'],
 | 
| 
 | 
   451         'entrez_id': ['entrez', 'entrez_id', 'entrezgene']
 | 
| 
 | 
   452     }
 | 
| 
 | 
   453     for key, names in candidates.items():
 | 
| 
 | 
   454         for n in names:
 | 
| 
 | 
   455             if n in cols:
 | 
| 
 | 
   456                 chosen[key] = cols[n]
 | 
| 
 | 
   457                 break
 | 
| 
 | 
   458     return chosen
 | 
| 
 | 
   459 
 | 
| 
 | 
   460 def _validate_target_uniqueness(mapping_df: 'pd.DataFrame',
 | 
| 
 | 
   461                                 source_col: str,
 | 
| 
 | 
   462                                 target_col: str,
 | 
| 
 | 
   463                                 model_source_genes: Optional[Set[str]] = None,
 | 
| 
 | 
   464                                 logger: Optional[logging.Logger] = None) -> None:
 | 
| 
 | 
   465     """
 | 
| 
 | 
   466     Verifica che, nel mapping_df (eventualmente giร  filtrato sui source di interesse),
 | 
| 
 | 
   467     ogni target sia associato ad al massimo un source. Se trova target associati a
 | 
| 
 | 
   468     >1 source solleva ValueError mostrando esempi.
 | 
| 
 | 
   469 
 | 
| 
 | 
   470     - mapping_df: DataFrame con colonne source_col, target_col
 | 
| 
 | 
   471     - model_source_genes: se fornito, รจ un set di source normalizzati che stiamo traducendo
 | 
| 
 | 
   472       (se None, si usa tutto mapping_df)
 | 
| 
 | 
   473     """
 | 
| 
 | 
   474     if logger is None:
 | 
| 
 | 
   475         logger = logging.getLogger(__name__)
 | 
| 
 | 
   476 
 | 
| 
 | 
   477     if mapping_df.empty:
 | 
| 
 | 
   478         logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
 | 
| 
 | 
   479         return
 | 
| 
 | 
   480 
 | 
| 
 | 
   481     # normalizza le colonne temporanee per gruppi (senza modificare il df originale)
 | 
| 
 | 
   482     tmp = mapping_df[[source_col, target_col]].copy()
 | 
| 
 | 
   483     tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id)
 | 
| 
 | 
   484     tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
 | 
| 
 | 
   485 
 | 
| 
 | 
   486     # se รจ passato un insieme di geni modello, filtra qui (giร  fatto nella chiamata, ma doppio-check ok)
 | 
| 
 | 
   487     if model_source_genes is not None:
 | 
| 
 | 
   488         tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
 | 
| 
 | 
   489 
 | 
| 
 | 
   490     if tmp.empty:
 | 
| 
 | 
   491         logger.warning("After filtering to model source genes, mapping table is empty โ nothing to validate.")
 | 
| 
 | 
   492         return
 | 
| 
 | 
   493 
 | 
| 
 | 
   494     # costruisci il reverse mapping target -> set(sources)
 | 
| 
 | 
   495     grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
 | 
| 
 | 
   496     # trova target con piรน di 1 source
 | 
| 
 | 
   497     problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
 | 
| 
 | 
   498 
 | 
| 
 | 
   499     if problematic:
 | 
| 
 | 
   500         # prepara messaggio di errore con esempi (fino a 20)
 | 
| 
 | 
   501         sample_items = list(problematic.items())[:20]
 | 
| 
 | 
   502         msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
 | 
| 
 | 
   503         for tgt, sources in sample_items:
 | 
| 
 | 
   504             msg_lines.append(f"  - target '{tgt}' <- sources: {', '.join(sources)}")
 | 
| 
 | 
   505         if len(problematic) > len(sample_items):
 | 
| 
 | 
   506             msg_lines.append(f"  ... and {len(problematic) - len(sample_items)} more cases.")
 | 
| 
 | 
   507         full_msg = "\n".join(msg_lines)
 | 
| 
 | 
   508         # loggare e sollevare errore
 | 
| 
 | 
   509         logger.error(full_msg)
 | 
| 
 | 
   510         raise ValueError(full_msg)
 | 
| 
 | 
   511 
 | 
| 
 | 
   512     # se tutto ok
 | 
| 
 | 
   513     logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
 | 
| 
 | 
   514 
 | 
| 
 | 
   515 
 | 
| 
 | 
   516 def _normalize_gene_id(g: str) -> str:
 | 
| 
 | 
   517     """Rendi consistente un gene id per l'uso come chiave (rimuove prefissi come 'HGNC:' e strip)."""
 | 
| 
 | 
   518     if g is None:
 | 
| 
 | 
   519         return ""
 | 
| 
 | 
   520     g = str(g).strip()
 | 
| 
 | 
   521     # remove common prefixes
 | 
| 
 | 
   522     g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE)
 | 
| 
 | 
   523     g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
 | 
| 
 | 
   524     return g
 | 
| 
 | 
   525 
 | 
| 
 | 
   526 # ---------- Main public function ----------
 | 
| 
 | 
   527 def translate_model_genes(model: 'cobra.Model',
 | 
| 
 | 
   528                          mapping_df: 'pd.DataFrame',
 | 
| 
 | 
   529                          target_nomenclature: str,
 | 
| 
 | 
   530                          source_nomenclature: str = 'hgnc_id',
 | 
| 
 | 
   531                          logger: Optional[logging.Logger] = None) -> 'cobra.Model':
 | 
| 
 | 
   532     """
 | 
| 
 | 
   533     Translate model genes from source_nomenclature to target_nomenclature.
 | 
| 
 | 
   534     mapping_df should contain at least columns that allow the mapping
 | 
| 
 | 
   535     (e.g. ensg, hgnc_id, hgnc_symbol, entrez).
 | 
| 
 | 
   536     """
 | 
| 
 | 
   537     if logger is None:
 | 
| 
 | 
   538         logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 | 
| 
 | 
   539         logger = logging.getLogger(__name__)
 | 
| 
 | 
   540 
 | 
| 
 | 
   541     logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'")
 | 
| 
 | 
   542 
 | 
| 
 | 
   543     # normalize column names and choose relevant columns
 | 
| 
 | 
   544     chosen = _choose_columns(mapping_df)
 | 
| 
 | 
   545     if not chosen:
 | 
| 
 | 
   546         raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.")
 | 
| 
 | 
   547 
 | 
| 
 | 
   548     # map source/target to actual dataframe column names (allow user-specified source/target keys)
 | 
| 
 | 
   549     # normalize input args
 | 
| 
 | 
   550     src_key = source_nomenclature.strip().lower()
 | 
| 
 | 
   551     tgt_key = target_nomenclature.strip().lower()
 | 
| 
 | 
   552 
 | 
| 
 | 
   553     # try to find the actual column names for requested keys
 | 
| 
 | 
   554     # support synonyms: user may pass "ensg" or "ENSG" etc.
 | 
| 
 | 
   555     col_for_src = None
 | 
| 
 | 
   556     col_for_tgt = None
 | 
| 
 | 
   557     # first, try exact match
 | 
| 
 | 
   558     for k, actual in chosen.items():
 | 
| 
 | 
   559         if k == src_key:
 | 
| 
 | 
   560             col_for_src = actual
 | 
| 
 | 
   561         if k == tgt_key:
 | 
| 
 | 
   562             col_for_tgt = actual
 | 
| 
 | 
   563 
 | 
| 
 | 
   564     # if not found, try mapping common names
 | 
| 
 | 
   565     if col_for_src is None:
 | 
| 
 | 
   566         # fallback: if user passed 'hgnc_id' but chosen has only 'hgnc_symbol', it's not useful
 | 
| 
 | 
   567         # we require at least the source column to exist
 | 
| 
 | 
   568         possible_src_names = {k: v for k, v in chosen.items()}
 | 
| 
 | 
   569         # try to match by contained substring
 | 
| 
 | 
   570         for k, actual in possible_src_names.items():
 | 
| 
 | 
   571             if src_key in k:
 | 
| 
 | 
   572                 col_for_src = actual
 | 
| 
 | 
   573                 break
 | 
| 
 | 
   574 
 | 
| 
 | 
   575     if col_for_tgt is None:
 | 
| 
 | 
   576         for k, actual in chosen.items():
 | 
| 
 | 
   577             if tgt_key in k:
 | 
| 
 | 
   578                 col_for_tgt = actual
 | 
| 
 | 
   579                 break
 | 
| 
 | 
   580 
 | 
| 
 | 
   581     if col_for_src is None:
 | 
| 
 | 
   582         raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
 | 
| 
 | 
   583     if col_for_tgt is None:
 | 
| 
 | 
   584         raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
 | 
| 
 | 
   585     
 | 
| 
 | 
   586 
 | 
| 
 | 
   587     model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
 | 
| 
 | 
   588     logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
 | 
| 
 | 
   589 
 | 
| 
 | 
   590     tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy()
 | 
| 
 | 
   591     tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id)
 | 
| 
 | 
   592 
 | 
| 
 | 
   593     filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
 | 
| 
 | 
   594 
 | 
| 
 | 
   595     # Se non ci sono righe rilevanti, avvisa (possono non esserci mapping per i geni presenti)
 | 
| 
 | 
   596     if filtered_map.empty:
 | 
| 
 | 
   597         logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
 | 
| 
 | 
   598 
 | 
| 
 | 
   599     # --- VALIDAZIONE: nessun target deve essere mappato da piu' di un source (nell'insieme filtrato) ---
 | 
| 
 | 
   600     # Se vuoi la verifica su tutto il dataframe (non solo sui geni del modello), passa model_source_genes=None.
 | 
| 
 | 
   601     _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
 | 
| 
 | 
   602 
 | 
| 
 | 
   603     # Ora crea il mapping solo sul sottoinsieme filtrato (piu' efficiente)
 | 
| 
 | 
   604     # ATTENZIONE: _create_gene_mapping si aspetta i nomi originali delle colonne
 | 
| 
 | 
   605     # quindi passiamo filtered_map con le colonne rimappate (senza la col_for_src + "_norm")
 | 
| 
 | 
   606     gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
 | 
| 
 | 
   607 
 | 
| 
 | 
   608     # copy model
 | 
| 
 | 
   609     model_copy = model.copy()
 | 
| 
 | 
   610 
 | 
| 
 | 
   611     # statistics
 | 
| 
 | 
   612     stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0}
 | 
| 
 | 
   613     unmapped = []
 | 
| 
 | 
   614     multi = []
 | 
| 
 | 
   615 
 | 
| 
 | 
   616     original_genes = {g.id for g in model_copy.genes}
 | 
| 
 | 
   617     logger.info(f"Original genes count: {len(original_genes)}")
 | 
| 
 | 
   618 
 | 
| 
 | 
   619     # translate GPRs
 | 
| 
 | 
   620     for rxn in model_copy.reactions:
 | 
| 
 | 
   621         gpr = rxn.gene_reaction_rule
 | 
| 
 | 
   622         if gpr and gpr.strip():
 | 
| 
 | 
   623             new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger)
 | 
| 
 | 
   624             if new_gpr != gpr:
 | 
| 
 | 
   625                 rxn.gene_reaction_rule = new_gpr
 | 
| 
 | 
   626                 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{new_gpr}'")
 | 
| 
 | 
   627 
 | 
| 
 | 
   628     # update model genes based on new GPRs
 | 
| 
 | 
   629     _update_model_genes(model_copy, logger)
 | 
| 
 | 
   630 
 | 
| 
 | 
   631     # final logging
 | 
| 
 | 
   632     _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger)
 | 
| 
 | 
   633 
 | 
| 
 | 
   634     logger.info("Translation finished")
 | 
| 
 | 
   635     return model_copy
 | 
| 
 | 
   636 
 | 
| 
 | 
   637 
 | 
| 
 | 
   638 # ---------- helper functions ----------
 | 
| 
 | 
   639 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]:
 | 
| 
 | 
   640     """
 | 
| 
 | 
   641     Build mapping dict: source_id -> list of target_ids
 | 
| 
 | 
   642     Normalizes IDs (removes prefixes like 'HGNC:' etc).
 | 
| 
 | 
   643     """
 | 
| 
 | 
   644     df = mapping_df[[source_col, target_col]].dropna().copy()
 | 
| 
 | 
   645     # normalize to string
 | 
| 
 | 
   646     df[source_col] = df[source_col].astype(str).map(_normalize_gene_id)
 | 
| 
 | 
   647     df[target_col] = df[target_col].astype(str).str.strip()
 | 
| 
 | 
   648 
 | 
| 
 | 
   649     df = df.drop_duplicates()
 | 
| 
 | 
   650 
 | 
| 
 | 
   651     logger.info(f"Creating mapping from {len(df)} rows")
 | 
| 
 | 
   652 
 | 
| 
 | 
   653     mapping = defaultdict(list)
 | 
| 
 | 
   654     for _, row in df.iterrows():
 | 
| 
 | 
   655         s = row[source_col]
 | 
| 
 | 
   656         t = row[target_col]
 | 
| 
 | 
   657         if t not in mapping[s]:
 | 
| 
 | 
   658             mapping[s].append(t)
 | 
| 
 | 
   659 
 | 
| 
 | 
   660     # stats
 | 
| 
 | 
   661     one_to_one = sum(1 for v in mapping.values() if len(v) == 1)
 | 
| 
 | 
   662     one_to_many = sum(1 for v in mapping.values() if len(v) > 1)
 | 
| 
 | 
   663     logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many")
 | 
| 
 | 
   664     return dict(mapping)
 | 
| 
 | 
   665 
 | 
| 
 | 
   666 
 | 
| 
 | 
   667 def _translate_gpr(gpr_string: str,
 | 
| 
 | 
   668                    gene_mapping: Dict[str, List[str]],
 | 
| 
 | 
   669                    stats: Dict[str, int],
 | 
| 
 | 
   670                    unmapped_genes: List[str],
 | 
| 
 | 
   671                    multi_mapping_genes: List[Tuple[str, List[str]]],
 | 
| 
 | 
   672                    logger: logging.Logger) -> str:
 | 
| 
 | 
   673     """
 | 
| 
 | 
   674     Translate genes inside a GPR string using gene_mapping.
 | 
| 
 | 
   675     Returns new GPR string.
 | 
| 
 | 
   676     """
 | 
| 
 | 
   677     # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols)
 | 
| 
 | 
   678     token_pattern = r'\b[A-Za-z0-9:_.-]+\b'
 | 
| 
 | 
   679     tokens = re.findall(token_pattern, gpr_string)
 | 
| 
 | 
   680 
 | 
| 
 | 
   681     logical = {'and', 'or', 'AND', 'OR', '(', ')'}
 | 
| 
 | 
   682     tokens = [t for t in tokens if t not in logical]
 | 
| 
 | 
   683 
 | 
| 
 | 
   684     new_gpr = gpr_string
 | 
| 
 | 
   685 
 | 
| 
 | 
   686     for token in sorted(set(tokens), key=lambda x: -len(x)):  # longer tokens first to avoid partial replacement
 | 
| 
 | 
   687         norm = _normalize_gene_id(token)
 | 
| 
 | 
   688         if norm in gene_mapping:
 | 
| 
 | 
   689             targets = gene_mapping[norm]
 | 
| 
 | 
   690             stats['translated'] += 1
 | 
| 
 | 
   691             if len(targets) == 1:
 | 
| 
 | 
   692                 stats['one_to_one'] += 1
 | 
| 
 | 
   693                 replacement = targets[0]
 | 
| 
 | 
   694             else:
 | 
| 
 | 
   695                 stats['one_to_many'] += 1
 | 
| 
 | 
   696                 multi_mapping_genes.append((token, targets))
 | 
| 
 | 
   697                 replacement = "(" + " or ".join(targets) + ")"
 | 
| 
 | 
   698 
 | 
| 
 | 
   699             pattern = r'\b' + re.escape(token) + r'\b'
 | 
| 
 | 
   700             new_gpr = re.sub(pattern, replacement, new_gpr)
 | 
| 
 | 
   701         else:
 | 
| 
 | 
   702             stats['not_found'] += 1
 | 
| 
 | 
   703             if token not in unmapped_genes:
 | 
| 
 | 
   704                 unmapped_genes.append(token)
 | 
| 
 | 
   705             logger.debug(f"Token not found in mapping (left as-is): {token}")
 | 
| 
 | 
   706 
 | 
| 
 | 
   707     return new_gpr
 | 
| 
 | 
   708 
 | 
| 
 | 
   709 
 | 
| 
 | 
   710 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger):
 | 
| 
 | 
   711     """
 | 
| 
 | 
   712     Rebuild model.genes from gene_reaction_rule content.
 | 
| 
 | 
   713     Removes genes not referenced and adds missing ones.
 | 
| 
 | 
   714     """
 | 
| 
 | 
   715     # collect genes in GPRs
 | 
| 
 | 
   716     gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
 | 
| 
 | 
   717     logical = {'and', 'or', 'AND', 'OR', '(', ')'}
 | 
| 
 | 
   718     genes_in_gpr: Set[str] = set()
 | 
| 
 | 
   719 
 | 
| 
 | 
   720     for rxn in model.reactions:
 | 
| 
 | 
   721         gpr = rxn.gene_reaction_rule
 | 
| 
 | 
   722         if gpr and gpr.strip():
 | 
| 
 | 
   723             toks = re.findall(gene_pattern, gpr)
 | 
| 
 | 
   724             toks = [t for t in toks if t not in logical]
 | 
| 
 | 
   725             # normalize IDs consistent with mapping normalization
 | 
| 
 | 
   726             toks = [_normalize_gene_id(t) for t in toks]
 | 
| 
 | 
   727             genes_in_gpr.update(toks)
 | 
| 
 | 
   728 
 | 
| 
 | 
   729     # existing gene ids
 | 
| 
 | 
   730     existing = {g.id for g in model.genes}
 | 
| 
 | 
   731 
 | 
| 
 | 
   732     # remove obsolete genes
 | 
| 
 | 
   733     to_remove = [gid for gid in existing if gid not in genes_in_gpr]
 | 
| 
 | 
   734     removed = 0
 | 
| 
 | 
   735     for gid in to_remove:
 | 
| 
 | 
   736         try:
 | 
| 
 | 
   737             gene_obj = model.genes.get_by_id(gid)
 | 
| 
 | 
   738             model.genes.remove(gene_obj)
 | 
| 
 | 
   739             removed += 1
 | 
| 
 | 
   740         except Exception:
 | 
| 
 | 
   741             # safe-ignore
 | 
| 
 | 
   742             pass
 | 
| 
 | 
   743 
 | 
| 
 | 
   744     # add new genes
 | 
| 
 | 
   745     added = 0
 | 
| 
 | 
   746     for gid in genes_in_gpr:
 | 
| 
 | 
   747         if gid not in existing:
 | 
| 
 | 
   748             new_gene = cobra.Gene(gid)
 | 
| 
 | 
   749             try:
 | 
| 
 | 
   750                 model.genes.add(new_gene)
 | 
| 
 | 
   751             except Exception:
 | 
| 
 | 
   752                 # fallback: if model.genes doesn't support add, try append or model.add_genes
 | 
| 
 | 
   753                 try:
 | 
| 
 | 
   754                     model.genes.append(new_gene)
 | 
| 
 | 
   755                 except Exception:
 | 
| 
 | 
   756                     try:
 | 
| 
 | 
   757                         model.add_genes([new_gene])
 | 
| 
 | 
   758                     except Exception:
 | 
| 
 | 
   759                         logger.warning(f"Could not add gene object for {gid}")
 | 
| 
 | 
   760             added += 1
 | 
| 
 | 
   761 
 | 
| 
 | 
   762     logger.info(f"Model genes updated: removed {removed}, added {added}")
 | 
| 
 | 
   763 
 | 
| 
 | 
   764 
 | 
| 
 | 
   765 def _log_translation_statistics(stats: Dict[str, int],
 | 
| 
 | 
   766                                unmapped_genes: List[str],
 | 
| 
 | 
   767                                multi_mapping_genes: List[Tuple[str, List[str]]],
 | 
| 
 | 
   768                                original_genes: Set[str],
 | 
| 
 | 
   769                                final_genes,
 | 
| 
 | 
   770                                logger: logging.Logger):
 | 
| 
 | 
   771     logger.info("=== TRANSLATION STATISTICS ===")
 | 
| 
 | 
   772     logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
 | 
| 
 | 
   773     logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
 | 
| 
 | 
   774 
 | 
| 
 | 
   775     final_ids = {g.id for g in final_genes}
 | 
| 
 | 
   776     logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")
 | 
| 
 | 
   777 
 | 
| 
 | 
   778     if unmapped_genes:
 | 
| 
 | 
   779         logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}")
 | 
| 
 | 
   780     if multi_mapping_genes:
 | 
| 
 | 
   781         logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):")
 | 
| 
 | 
   782         for orig, targets in multi_mapping_genes[:10]:
 | 
| 
 | 
   783             logger.info(f"  {orig} -> {', '.join(targets)}") |