| 
456
 | 
     1 """
 | 
| 
 | 
     2 Utilities for generating and manipulating COBRA models and related metadata.
 | 
| 
 | 
     3 
 | 
| 
 | 
     4 This module includes helpers to:
 | 
| 
 | 
     5 - extract rules, reactions, bounds, objective coefficients, and compartments
 | 
| 
 | 
     6 - build a COBRA model from a tabular file
 | 
| 
 | 
     7 - set objective and medium from dataframes
 | 
| 
 | 
     8 - validate a model and convert gene identifiers
 | 
| 
 | 
     9 - translate model GPRs using mapping tables
 | 
| 
 | 
    10 """
 | 
| 
418
 | 
    11 import os
 | 
| 
 | 
    12 import cobra
 | 
| 
 | 
    13 import pandas as pd
 | 
| 
419
 | 
    14 import re
 | 
| 
426
 | 
    15 import logging
 | 
| 
419
 | 
    16 from typing import Optional, Tuple, Union, List, Dict, Set
 | 
| 
426
 | 
    17 from collections import defaultdict
 | 
| 
418
 | 
    18 import utils.rule_parsing  as rulesUtils
 | 
| 
419
 | 
    19 import utils.reaction_parsing as reactionUtils
 | 
| 
 | 
    20 from cobra import Model as cobraModel, Reaction, Metabolite
 | 
| 
490
 | 
    21 import sys
 | 
| 
 | 
    22 
 | 
| 
 | 
    23 
 | 
| 
 | 
    24 ############################ check_methods ####################################
 | 
| 
 | 
    25 def gene_type(l :str, name :str) -> str:
 | 
| 
 | 
    26     """
 | 
| 
 | 
    27     Determine the type of gene ID.
 | 
| 
 | 
    28 
 | 
| 
 | 
    29     Args:
 | 
| 
 | 
    30         l (str): The gene identifier to check.
 | 
| 
 | 
    31         name (str): The name of the dataset, used in error messages.
 | 
| 
 | 
    32 
 | 
| 
 | 
    33     Returns:
 | 
| 
538
 | 
    34         str: The type of gene ID ('HGNC_ID', 'ENSG', 'HGNC_symbol', or 'entrez_id').
 | 
| 
490
 | 
    35 
 | 
| 
 | 
    36     Raises:
 | 
| 
 | 
    37         sys.exit: If the gene ID type is not supported, the execution is aborted.
 | 
| 
 | 
    38     """
 | 
| 
 | 
    39     if check_hgnc(l):
 | 
| 
538
 | 
    40         return 'HGNC_ID'
 | 
| 
490
 | 
    41     elif check_ensembl(l):
 | 
| 
538
 | 
    42         return 'ENSG'
 | 
| 
490
 | 
    43     elif check_symbol(l):
 | 
| 
538
 | 
    44         return 'HGNC_symbol'
 | 
| 
490
 | 
    45     elif check_entrez(l):
 | 
| 
 | 
    46         return 'entrez_id'
 | 
| 
 | 
    47     else:
 | 
| 
 | 
    48         sys.exit('Execution aborted:\n' +
 | 
| 
 | 
    49                  'gene ID type in ' + name + ' not supported. Supported ID'+
 | 
| 
 | 
    50                  'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
 | 
| 
 | 
    51 
 | 
| 
 | 
    52 def check_hgnc(l :str) -> bool:
 | 
| 
 | 
    53     """
 | 
| 
 | 
    54     Check if a gene identifier follows the HGNC format.
 | 
| 
 | 
    55 
 | 
| 
 | 
    56     Args:
 | 
| 
 | 
    57         l (str): The gene identifier to check.
 | 
| 
 | 
    58 
 | 
| 
 | 
    59     Returns:
 | 
| 
 | 
    60         bool: True if the gene identifier follows the HGNC format, False otherwise.
 | 
| 
 | 
    61     """
 | 
| 
 | 
    62     if len(l) > 5:
 | 
| 
 | 
    63         if (l.upper()).startswith('HGNC:'):
 | 
| 
 | 
    64             return l[5:].isdigit()
 | 
| 
 | 
    65         else:
 | 
| 
 | 
    66             return False
 | 
| 
 | 
    67     else:
 | 
| 
 | 
    68         return False
 | 
| 
 | 
    69 
 | 
| 
 | 
    70 def check_ensembl(l :str) -> bool:
 | 
| 
 | 
    71     """
 | 
| 
 | 
    72     Check if a gene identifier follows the Ensembl format.
 | 
| 
 | 
    73 
 | 
| 
 | 
    74     Args:
 | 
| 
 | 
    75         l (str): The gene identifier to check.
 | 
| 
 | 
    76 
 | 
| 
 | 
    77     Returns:
 | 
| 
 | 
    78         bool: True if the gene identifier follows the Ensembl format, False otherwise.
 | 
| 
 | 
    79     """
 | 
| 
 | 
    80     return l.upper().startswith('ENS')
 | 
| 
 | 
    81  
 | 
| 
 | 
    82 
 | 
| 
 | 
    83 def check_symbol(l :str) -> bool:
 | 
| 
 | 
    84     """
 | 
| 
 | 
    85     Check if a gene identifier follows the symbol format.
 | 
| 
 | 
    86 
 | 
| 
 | 
    87     Args:
 | 
| 
 | 
    88         l (str): The gene identifier to check.
 | 
| 
 | 
    89 
 | 
| 
 | 
    90     Returns:
 | 
| 
 | 
    91         bool: True if the gene identifier follows the symbol format, False otherwise.
 | 
| 
 | 
    92     """
 | 
| 
 | 
    93     if len(l) > 0:
 | 
| 
 | 
    94         if l[0].isalpha() and l[1:].isalnum():
 | 
| 
 | 
    95             return True
 | 
| 
 | 
    96         else:
 | 
| 
 | 
    97             return False
 | 
| 
 | 
    98     else:
 | 
| 
 | 
    99         return False
 | 
| 
 | 
   100 
 | 
| 
 | 
   101 def check_entrez(l :str) -> bool:
 | 
| 
 | 
   102     """
 | 
| 
 | 
   103     Check if a gene identifier follows the Entrez ID format.
 | 
| 
 | 
   104 
 | 
| 
 | 
   105     Args:
 | 
| 
 | 
   106         l (str): The gene identifier to check.
 | 
| 
 | 
   107 
 | 
| 
 | 
   108     Returns:
 | 
| 
 | 
   109         bool: True if the gene identifier follows the Entrez ID format, False otherwise.
 | 
| 
 | 
   110     """ 
 | 
| 
 | 
   111     if len(l) > 0:
 | 
| 
 | 
   112         return l.isdigit()
 | 
| 
 | 
   113     else: 
 | 
| 
 | 
   114         return False
 | 
| 
418
 | 
   115 
 | 
| 
 | 
   116 ################################- DATA GENERATION -################################
 | 
| 
 | 
   117 ReactionId = str
 | 
| 
419
 | 
   118 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
 | 
| 
418
 | 
   119     """
 | 
| 
456
 | 
   120     Generate a dictionary mapping reaction IDs to GPR rules from the model.
 | 
| 
418
 | 
   121 
 | 
| 
 | 
   122     Args:
 | 
| 
456
 | 
   123         model: COBRA model to derive data from.
 | 
| 
 | 
   124         asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings.
 | 
| 
418
 | 
   125 
 | 
| 
 | 
   126     Returns:
 | 
| 
456
 | 
   127         Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID.
 | 
| 
 | 
   128         Dict[ReactionId, str]: Raw rules by reaction ID.
 | 
| 
418
 | 
   129     """
 | 
| 
 | 
   130     _ruleGetter   =  lambda reaction : reaction.gene_reaction_rule
 | 
| 
 | 
   131     ruleExtractor = (lambda reaction :
 | 
| 
 | 
   132         rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
 | 
| 
 | 
   133 
 | 
| 
 | 
   134     return {
 | 
| 
 | 
   135         reaction.id : ruleExtractor(reaction)
 | 
| 
 | 
   136         for reaction in model.reactions
 | 
| 
 | 
   137         if reaction.gene_reaction_rule }
 | 
| 
 | 
   138 
 | 
| 
419
 | 
   139 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
 | 
| 
418
 | 
   140     """
 | 
| 
456
 | 
   141     Generate a dictionary mapping reaction IDs to reaction formulas from the model.
 | 
| 
418
 | 
   142 
 | 
| 
 | 
   143     Args:
 | 
| 
456
 | 
   144         model: COBRA model to derive data from.
 | 
| 
 | 
   145         asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings.
 | 
| 
418
 | 
   146 
 | 
| 
 | 
   147     Returns:
 | 
| 
456
 | 
   148         Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested).
 | 
| 
418
 | 
   149     """
 | 
| 
 | 
   150 
 | 
| 
 | 
   151     unparsedReactions = {
 | 
| 
 | 
   152         reaction.id : reaction.reaction
 | 
| 
 | 
   153         for reaction in model.reactions
 | 
| 
 | 
   154         if reaction.reaction 
 | 
| 
 | 
   155     }
 | 
| 
 | 
   156 
 | 
| 
 | 
   157     if not asParsed: return unparsedReactions
 | 
| 
 | 
   158     
 | 
| 
 | 
   159     return reactionUtils.create_reaction_dict(unparsedReactions)
 | 
| 
 | 
   160 
 | 
| 
419
 | 
   161 def get_medium(model:cobraModel) -> pd.DataFrame:
 | 
| 
456
 | 
   162     """
 | 
| 
 | 
   163     Extract the uptake reactions representing the model medium.
 | 
| 
 | 
   164 
 | 
| 
 | 
   165     Returns a DataFrame with a single column 'reaction' listing exchange reactions
 | 
| 
 | 
   166     with negative lower bound and no positive stoichiometric coefficients (uptake only).
 | 
| 
 | 
   167     """
 | 
| 
418
 | 
   168     trueMedium=[]
 | 
| 
 | 
   169     for r in model.reactions:
 | 
| 
 | 
   170         positiveCoeff=0
 | 
| 
 | 
   171         for m in r.metabolites:
 | 
| 
 | 
   172             if r.get_coefficient(m.id)>0:
 | 
| 
 | 
   173                 positiveCoeff=1;
 | 
| 
 | 
   174         if (positiveCoeff==0 and r.lower_bound<0):
 | 
| 
 | 
   175             trueMedium.append(r.id)
 | 
| 
 | 
   176 
 | 
| 
 | 
   177     df_medium = pd.DataFrame()
 | 
| 
 | 
   178     df_medium["reaction"] = trueMedium
 | 
| 
 | 
   179     return df_medium
 | 
| 
 | 
   180 
 | 
| 
426
 | 
   181 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
 | 
| 
 | 
   182     """
 | 
| 
456
 | 
   183     Extract objective coefficients for each reaction.
 | 
| 
 | 
   184 
 | 
| 
426
 | 
   185     Args:
 | 
| 
456
 | 
   186         model: COBRA model
 | 
| 
 | 
   187 
 | 
| 
426
 | 
   188     Returns:
 | 
| 
456
 | 
   189         pd.DataFrame with columns: ReactionID, ObjectiveCoefficient
 | 
| 
426
 | 
   190     """
 | 
| 
 | 
   191     coeffs = []
 | 
| 
456
 | 
   192     # model.objective.expression is a linear expression
 | 
| 
426
 | 
   193     objective_expr = model.objective.expression.as_coefficients_dict()
 | 
| 
 | 
   194     
 | 
| 
 | 
   195     for reaction in model.reactions:
 | 
| 
 | 
   196         coeff = objective_expr.get(reaction.forward_variable, 0.0)
 | 
| 
 | 
   197         coeffs.append({
 | 
| 
 | 
   198             "ReactionID": reaction.id,
 | 
| 
 | 
   199             "ObjectiveCoefficient": coeff
 | 
| 
 | 
   200         })
 | 
| 
 | 
   201     
 | 
| 
 | 
   202     return pd.DataFrame(coeffs)
 | 
| 
 | 
   203 
 | 
| 
419
 | 
   204 def generate_bounds(model:cobraModel) -> pd.DataFrame:
 | 
| 
456
 | 
   205     """
 | 
| 
 | 
   206     Build a DataFrame of lower/upper bounds for all reactions.
 | 
| 
 | 
   207 
 | 
| 
 | 
   208     Returns:
 | 
| 
 | 
   209         pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound'].
 | 
| 
 | 
   210     """
 | 
| 
418
 | 
   211 
 | 
| 
 | 
   212     rxns = []
 | 
| 
 | 
   213     for reaction in model.reactions:
 | 
| 
 | 
   214         rxns.append(reaction.id)
 | 
| 
 | 
   215 
 | 
| 
 | 
   216     bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns)
 | 
| 
 | 
   217 
 | 
| 
 | 
   218     for reaction in model.reactions:
 | 
| 
 | 
   219         bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
 | 
| 
 | 
   220     return bounds
 | 
| 
 | 
   221 
 | 
| 
 | 
   222 
 | 
| 
 | 
   223 
 | 
| 
419
 | 
   224 def generate_compartments(model: cobraModel) -> pd.DataFrame:
 | 
| 
418
 | 
   225     """
 | 
| 
506
 | 
   226     Generates a DataFrame containing pathway information for each reaction.
 | 
| 
 | 
   227     Creates columns for each pathway position (Pathway_1, Pathway_2, etc.) only if pathways exist.
 | 
| 
418
 | 
   228     
 | 
| 
 | 
   229     Args:
 | 
| 
506
 | 
   230         model: the COBRA model to extract pathway data from.
 | 
| 
418
 | 
   231         
 | 
| 
 | 
   232     Returns:
 | 
| 
506
 | 
   233         pd.DataFrame: DataFrame with ReactionID and pathway columns (if any pathways exist)
 | 
| 
418
 | 
   234     """
 | 
| 
 | 
   235     pathway_data = []
 | 
| 
 | 
   236 
 | 
| 
 | 
   237     # First pass: determine the maximum number of pathways any reaction has
 | 
| 
 | 
   238     max_pathways = 0
 | 
| 
 | 
   239     reaction_pathways = {}
 | 
| 
506
 | 
   240     has_any_pathways = False
 | 
| 
418
 | 
   241 
 | 
| 
 | 
   242     for reaction in model.reactions:
 | 
| 
 | 
   243         # Get unique pathways from all metabolites in the reaction
 | 
| 
506
 | 
   244         if 'pathways' in reaction.annotation and reaction.annotation['pathways']:
 | 
| 
503
 | 
   245             if type(reaction.annotation['pathways']) == list:
 | 
| 
506
 | 
   246                 # Filter out empty/None values
 | 
| 
 | 
   247                 valid_pathways = [p for p in reaction.annotation['pathways'] if p]
 | 
| 
 | 
   248                 if valid_pathways:
 | 
| 
 | 
   249                     reaction_pathways[reaction.id] = valid_pathways
 | 
| 
 | 
   250                     max_pathways = max(max_pathways, len(valid_pathways))
 | 
| 
 | 
   251                     has_any_pathways = True
 | 
| 
 | 
   252                 else:
 | 
| 
 | 
   253                     reaction_pathways[reaction.id] = []
 | 
| 
503
 | 
   254             else:
 | 
| 
506
 | 
   255                 # Single pathway value
 | 
| 
 | 
   256                 if reaction.annotation['pathways']:
 | 
| 
 | 
   257                     reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
 | 
| 
 | 
   258                     max_pathways = max(max_pathways, 1)
 | 
| 
 | 
   259                     has_any_pathways = True
 | 
| 
 | 
   260                 else:
 | 
| 
 | 
   261                     reaction_pathways[reaction.id] = []
 | 
| 
418
 | 
   262         else:
 | 
| 
503
 | 
   263             # No pathway annotation - use empty list
 | 
| 
 | 
   264             reaction_pathways[reaction.id] = []
 | 
| 
418
 | 
   265 
 | 
| 
506
 | 
   266     # If no pathways exist in the model, return DataFrame with only ReactionID
 | 
| 
 | 
   267     if not has_any_pathways:
 | 
| 
 | 
   268         return None
 | 
| 
 | 
   269 
 | 
| 
 | 
   270     # Create column names for pathways only if they exist
 | 
| 
418
 | 
   271     pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
 | 
| 
 | 
   272 
 | 
| 
506
 | 
   273     # Second pass: create the data with pathway columns
 | 
| 
418
 | 
   274     for reaction_id, pathways in reaction_pathways.items():
 | 
| 
 | 
   275         row = {"ReactionID": reaction_id}
 | 
| 
 | 
   276         
 | 
| 
 | 
   277         # Fill pathway columns
 | 
| 
 | 
   278         for i in range(max_pathways):
 | 
| 
 | 
   279             col_name = pathway_columns[i]
 | 
| 
 | 
   280             if i < len(pathways):
 | 
| 
 | 
   281                 row[col_name] = pathways[i]
 | 
| 
 | 
   282             else:
 | 
| 
506
 | 
   283                 row[col_name] = None
 | 
| 
418
 | 
   284 
 | 
| 
 | 
   285         pathway_data.append(row)
 | 
| 
 | 
   286 
 | 
| 
419
 | 
   287     return pd.DataFrame(pathway_data)
 | 
| 
 | 
   288 
 | 
| 
505
 | 
   289 def set_annotation_pathways_from_data(model: cobraModel, df: pd.DataFrame):
 | 
| 
 | 
   290     """Set reaction pathways based on 'Pathway_1', 'Pathway_2', ... columns in the dataframe."""
 | 
| 
 | 
   291     pathway_cols = [col for col in df.columns if col.startswith('Pathway_')]
 | 
| 
 | 
   292     if not pathway_cols:
 | 
| 
 | 
   293         print("No 'Pathway_' columns found, skipping pathway annotation")
 | 
| 
 | 
   294         return
 | 
| 
 | 
   295     
 | 
| 
 | 
   296     pathway_data = defaultdict(list)
 | 
| 
 | 
   297     
 | 
| 
 | 
   298     for idx, row in df.iterrows():
 | 
| 
 | 
   299         reaction_id = str(row['ReactionID']).strip()
 | 
| 
 | 
   300         if reaction_id not in model.reactions:
 | 
| 
 | 
   301             continue
 | 
| 
 | 
   302         
 | 
| 
 | 
   303         pathways = []
 | 
| 
 | 
   304         for col in pathway_cols:
 | 
| 
 | 
   305             if pd.notna(row[col]) and str(row[col]).strip():
 | 
| 
 | 
   306                 pathways.append(str(row[col]).strip())
 | 
| 
 | 
   307         
 | 
| 
 | 
   308         if pathways:
 | 
| 
506
 | 
   309 
 | 
| 
505
 | 
   310             reaction = model.reactions.get_by_id(reaction_id)
 | 
| 
 | 
   311             if len(pathways) == 1:
 | 
| 
 | 
   312                 reaction.annotation['pathways'] = pathways[0]
 | 
| 
 | 
   313             else:
 | 
| 
 | 
   314                 reaction.annotation['pathways'] = pathways
 | 
| 
419
 | 
   315 
 | 
| 
505
 | 
   316             pathway_data[reaction_id] = pathways
 | 
| 
 | 
   317     
 | 
| 
 | 
   318     print(f"Annotated {len(pathway_data)} reactions with pathways.")
 | 
| 
419
 | 
   319 
 | 
| 
 | 
   320 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
 | 
| 
 | 
   321     """
 | 
| 
456
 | 
   322     Build a COBRApy model from a tabular file with reaction data.
 | 
| 
 | 
   323 
 | 
| 
419
 | 
   324     Args:
 | 
| 
456
 | 
   325         csv_path: Path to the tab-separated file.
 | 
| 
 | 
   326         model_id: ID for the newly created model.
 | 
| 
 | 
   327 
 | 
| 
419
 | 
   328     Returns:
 | 
| 
456
 | 
   329         cobra.Model: The constructed COBRApy model.
 | 
| 
419
 | 
   330     """
 | 
| 
 | 
   331     
 | 
| 
501
 | 
   332     # Try to detect separator
 | 
| 
 | 
   333     with open(csv_path, 'r') as f:
 | 
| 
 | 
   334         first_line = f.readline()
 | 
| 
 | 
   335         sep = '\t' if '\t' in first_line else ','
 | 
| 
 | 
   336     
 | 
| 
 | 
   337     df = pd.read_csv(csv_path, sep=sep)
 | 
| 
 | 
   338     
 | 
| 
 | 
   339     # Check required columns
 | 
| 
 | 
   340     required_cols = ['ReactionID', 'Formula']
 | 
| 
 | 
   341     missing_cols = [col for col in required_cols if col not in df.columns]
 | 
| 
 | 
   342     if missing_cols:
 | 
| 
 | 
   343         raise ValueError(f"Missing required columns: {missing_cols}. Available columns: {list(df.columns)}")
 | 
| 
419
 | 
   344     
 | 
| 
 | 
   345     model = cobraModel(model_id)
 | 
| 
 | 
   346     
 | 
| 
 | 
   347     metabolites_dict = {}
 | 
| 
 | 
   348     compartments_dict = {}
 | 
| 
 | 
   349     
 | 
| 
456
 | 
   350     print(f"Building model from {len(df)} reactions...")
 | 
| 
419
 | 
   351     
 | 
| 
 | 
   352     for idx, row in df.iterrows():
 | 
| 
448
 | 
   353         reaction_formula = str(row['Formula']).strip()
 | 
| 
419
 | 
   354         if not reaction_formula or reaction_formula == 'nan':
 | 
| 
 | 
   355             continue
 | 
| 
 | 
   356             
 | 
| 
 | 
   357         metabolites = extract_metabolites_from_reaction(reaction_formula)
 | 
| 
 | 
   358         
 | 
| 
 | 
   359         for met_id in metabolites:
 | 
| 
 | 
   360             compartment = extract_compartment_from_metabolite(met_id)
 | 
| 
 | 
   361             
 | 
| 
 | 
   362             if compartment not in compartments_dict:
 | 
| 
 | 
   363                 compartments_dict[compartment] = compartment
 | 
| 
 | 
   364             
 | 
| 
 | 
   365             if met_id not in metabolites_dict:
 | 
| 
 | 
   366                 metabolites_dict[met_id] = Metabolite(
 | 
| 
 | 
   367                     id=met_id,
 | 
| 
 | 
   368                     compartment=compartment,
 | 
| 
 | 
   369                     name=met_id.replace(f"_{compartment}", "").replace("__", "_")
 | 
| 
 | 
   370                 )
 | 
| 
 | 
   371     
 | 
| 
 | 
   372     model.compartments = compartments_dict
 | 
| 
 | 
   373     
 | 
| 
 | 
   374     model.add_metabolites(list(metabolites_dict.values()))
 | 
| 
 | 
   375     
 | 
| 
456
 | 
   376     print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments")
 | 
| 
419
 | 
   377     
 | 
| 
 | 
   378     reactions_added = 0
 | 
| 
 | 
   379     reactions_skipped = 0
 | 
| 
 | 
   380     
 | 
| 
 | 
   381     for idx, row in df.iterrows():
 | 
| 
 | 
   382 
 | 
| 
 | 
   383         reaction_id = str(row['ReactionID']).strip()
 | 
| 
427
 | 
   384         reaction_formula = str(row['Formula']).strip()
 | 
| 
419
 | 
   385         
 | 
| 
 | 
   386         if not reaction_formula or reaction_formula == 'nan':
 | 
| 
456
 | 
   387             raise ValueError(f"Missing reaction formula for {reaction_id}")
 | 
| 
419
 | 
   388         
 | 
| 
 | 
   389         reaction = Reaction(reaction_id)
 | 
| 
 | 
   390         reaction.name = reaction_id
 | 
| 
 | 
   391         
 | 
| 
 | 
   392         reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
 | 
| 
 | 
   393         reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
 | 
| 
 | 
   394         
 | 
| 
427
 | 
   395         if pd.notna(row['GPR']) and str(row['GPR']).strip():
 | 
| 
 | 
   396             reaction.gene_reaction_rule = str(row['GPR']).strip()
 | 
| 
419
 | 
   397         
 | 
| 
 | 
   398         try:
 | 
| 
 | 
   399             parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
 | 
| 
 | 
   400         except Exception as e:
 | 
| 
456
 | 
   401             print(f"Error parsing reaction {reaction_id}: {e}")
 | 
| 
419
 | 
   402             reactions_skipped += 1
 | 
| 
 | 
   403             continue
 | 
| 
 | 
   404         
 | 
| 
 | 
   405         model.add_reactions([reaction])
 | 
| 
 | 
   406         reactions_added += 1
 | 
| 
 | 
   407             
 | 
| 
 | 
   408     
 | 
| 
456
 | 
   409     print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions")
 | 
| 
419
 | 
   410     
 | 
| 
430
 | 
   411     # set objective function
 | 
| 
 | 
   412     set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient")
 | 
| 
 | 
   413 
 | 
| 
419
 | 
   414     set_medium_from_data(model, df)
 | 
| 
505
 | 
   415 
 | 
| 
 | 
   416     set_annotation_pathways_from_data(model, df)
 | 
| 
419
 | 
   417     
 | 
| 
456
 | 
   418     print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites")
 | 
| 
419
 | 
   419     
 | 
| 
 | 
   420     return model
 | 
| 
 | 
   421 
 | 
| 
 | 
   422 
 | 
| 
 | 
   423 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
 | 
| 
499
 | 
   424 #def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
 | 
| 
 | 
   425 #    """
 | 
| 
 | 
   426 #    Extract metabolite IDs from a reaction formula.
 | 
| 
 | 
   427 #    Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e),
 | 
| 
 | 
   428 #    allowing leading digits/underscores.
 | 
| 
 | 
   429 #    """
 | 
| 
 | 
   430 #    metabolites = set()
 | 
| 
 | 
   431 #    # optional coefficient followed by a token ending with _<letters>
 | 
| 
 | 
   432 #    if reaction_formula[-1] == ']' and reaction_formula[-3] == '[':
 | 
| 
 | 
   433 #        pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+[[A-Za-z0-9]]+)'
 | 
| 
 | 
   434 #    else:
 | 
| 
 | 
   435 #        pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[A-Za-z0-9]+)'
 | 
| 
 | 
   436 #    matches = re.findall(pattern, reaction_formula)
 | 
| 
 | 
   437 #    metabolites.update(matches)
 | 
| 
 | 
   438 #    return metabolites
 | 
| 
 | 
   439 
 | 
| 
 | 
   440 
 | 
| 
419
 | 
   441 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
 | 
| 
 | 
   442     """
 | 
| 
500
 | 
   443     Extract metabolite IDs from a reaction formula.
 | 
| 
 | 
   444 
 | 
| 
 | 
   445     Handles:
 | 
| 
 | 
   446       - optional stoichiometric coefficients (integers or decimals)
 | 
| 
 | 
   447       - compartment tags at the end of the metabolite, either [c] or _c
 | 
| 
 | 
   448 
 | 
| 
 | 
   449     Returns the IDs including the compartment suffix exactly as written.
 | 
| 
419
 | 
   450     """
 | 
| 
499
 | 
   451     pattern = re.compile(
 | 
| 
500
 | 
   452         r'(?:^|(?<=\s)|(?<=\+)|(?<=,)|(?<==)|(?<=:))'              # left boundary (start, space, +, comma, =, :)
 | 
| 
501
 | 
   453         r'(?:\d+(?:\.\d+)?\s+)?'                                   # optional coefficient (requires space after)
 | 
| 
 | 
   454         r'([A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+))'  # metabolite + compartment (can start with number)
 | 
| 
499
 | 
   455     )
 | 
| 
 | 
   456     return {m.group(1) for m in pattern.finditer(reaction_formula)}
 | 
| 
419
 | 
   457 
 | 
| 
 | 
   458 
 | 
| 
500
 | 
   459 
 | 
| 
419
 | 
   460 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
 | 
| 
456
 | 
   461     """Extract the compartment from a metabolite ID."""
 | 
| 
500
 | 
   462     if '_' == metabolite_id[-2]:
 | 
| 
419
 | 
   463         return metabolite_id.split('_')[-1]
 | 
| 
493
 | 
   464     if metabolite_id[-1] == ']' and metabolite_id[-3] == '[':
 | 
| 
 | 
   465         return metabolite_id[-2]
 | 
| 
419
 | 
   466     return 'c'  # default cytoplasm
 | 
| 
 | 
   467 
 | 
| 
 | 
   468 
 | 
| 
 | 
   469 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
 | 
| 
456
 | 
   470     """Parse a reaction formula and set metabolites with their coefficients."""
 | 
| 
419
 | 
   471 
 | 
| 
 | 
   472     if '<=>' in formula:
 | 
| 
501
 | 
   473         parts = formula.split('<=>')
 | 
| 
419
 | 
   474         reversible = True
 | 
| 
 | 
   475     elif '<--' in formula:
 | 
| 
501
 | 
   476         parts = formula.split('<--')
 | 
| 
419
 | 
   477         reversible = False
 | 
| 
 | 
   478     elif '-->' in formula:
 | 
| 
501
 | 
   479         parts = formula.split('-->')
 | 
| 
419
 | 
   480         reversible = False
 | 
| 
 | 
   481     elif '<-' in formula:
 | 
| 
501
 | 
   482         parts = formula.split('<-')
 | 
| 
419
 | 
   483         reversible = False
 | 
| 
 | 
   484     else:
 | 
| 
456
 | 
   485         raise ValueError(f"Unrecognized reaction format: {formula}")
 | 
| 
419
 | 
   486     
 | 
| 
501
 | 
   487     # Handle cases where one side might be empty (exchange reactions)
 | 
| 
 | 
   488     if len(parts) != 2:
 | 
| 
 | 
   489         raise ValueError(f"Invalid reaction format, expected 2 parts: {formula}")
 | 
| 
 | 
   490     
 | 
| 
 | 
   491     left, right = parts[0].strip(), parts[1].strip()
 | 
| 
 | 
   492     
 | 
| 
 | 
   493     reactants = parse_metabolites_side(left) if left else {}
 | 
| 
 | 
   494     products = parse_metabolites_side(right) if right else {}
 | 
| 
419
 | 
   495     
 | 
| 
 | 
   496     metabolites_to_add = {}
 | 
| 
 | 
   497     
 | 
| 
 | 
   498     for met_id, coeff in reactants.items():
 | 
| 
 | 
   499         if met_id in metabolites_dict:
 | 
| 
 | 
   500             metabolites_to_add[metabolites_dict[met_id]] = -coeff
 | 
| 
 | 
   501     
 | 
| 
 | 
   502     for met_id, coeff in products.items():
 | 
| 
 | 
   503         if met_id in metabolites_dict:
 | 
| 
 | 
   504             metabolites_to_add[metabolites_dict[met_id]] = coeff
 | 
| 
 | 
   505     
 | 
| 
 | 
   506     reaction.add_metabolites(metabolites_to_add)
 | 
| 
 | 
   507 
 | 
| 
 | 
   508 
 | 
| 
 | 
   509 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
 | 
| 
456
 | 
   510     """Parse one side of a reaction and extract metabolites with coefficients."""
 | 
| 
419
 | 
   511     metabolites = {}
 | 
| 
 | 
   512     if not side_str or side_str.strip() == '':
 | 
| 
 | 
   513         return metabolites
 | 
| 
 | 
   514 
 | 
| 
 | 
   515     terms = side_str.split('+')
 | 
| 
 | 
   516     for term in terms:
 | 
| 
 | 
   517         term = term.strip()
 | 
| 
 | 
   518         if not term:
 | 
| 
 | 
   519             continue
 | 
| 
 | 
   520 
 | 
| 
501
 | 
   521         # First check if term has space-separated coefficient and metabolite
 | 
| 
 | 
   522         parts = term.split()
 | 
| 
 | 
   523         if len(parts) == 2:
 | 
| 
 | 
   524             # Two parts: potential coefficient + metabolite
 | 
| 
 | 
   525             try:
 | 
| 
 | 
   526                 coeff = float(parts[0])
 | 
| 
 | 
   527                 met_id = parts[1]
 | 
| 
 | 
   528                 # Verify the second part looks like a metabolite with compartment
 | 
| 
 | 
   529                 if re.match(r'[A-Za-z0-9_]+(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', met_id):
 | 
| 
 | 
   530                     metabolites[met_id] = coeff
 | 
| 
 | 
   531                     continue
 | 
| 
 | 
   532             except ValueError:
 | 
| 
 | 
   533                 pass
 | 
| 
 | 
   534         
 | 
| 
 | 
   535         # Single term - check if it's a metabolite (no coefficient)  
 | 
| 
 | 
   536         # Updated pattern to include metabolites starting with numbers
 | 
| 
 | 
   537         if re.match(r'[A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', term):
 | 
| 
 | 
   538             metabolites[term] = 1.0
 | 
| 
 | 
   539         else:
 | 
| 
 | 
   540             print(f"Warning: Could not parse metabolite term: '{term}'")
 | 
| 
419
 | 
   541 
 | 
| 
 | 
   542     return metabolites
 | 
| 
 | 
   543 
 | 
| 
 | 
   544 
 | 
| 
 | 
   545 
 | 
| 
430
 | 
   546 def set_objective_from_csv(model: cobra.Model, df: pd.DataFrame, obj_col: str = "ObjectiveCoefficient"):
 | 
| 
419
 | 
   547     """
 | 
| 
430
 | 
   548     Sets the model's objective function based on a column of coefficients in the CSV.
 | 
| 
 | 
   549     Can be any reaction(s), not necessarily biomass.
 | 
| 
419
 | 
   550     """
 | 
| 
430
 | 
   551     obj_dict = {}
 | 
| 
419
 | 
   552     
 | 
| 
430
 | 
   553     for idx, row in df.iterrows():
 | 
| 
 | 
   554         reaction_id = str(row['ReactionID']).strip()
 | 
| 
 | 
   555         coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0
 | 
| 
 | 
   556         if coeff != 0:
 | 
| 
 | 
   557             if reaction_id in model.reactions:
 | 
| 
 | 
   558                 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff
 | 
| 
 | 
   559             else:
 | 
| 
 | 
   560                 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.")
 | 
| 
 | 
   561 
 | 
| 
 | 
   562     if not obj_dict:
 | 
| 
 | 
   563         raise ValueError("No reactions found with non-zero objective coefficient.")
 | 
| 
 | 
   564 
 | 
| 
 | 
   565     model.objective = obj_dict
 | 
| 
 | 
   566     print(f"Objective set with {len(obj_dict)} reactions.")
 | 
| 
 | 
   567 
 | 
| 
 | 
   568 
 | 
| 
419
 | 
   569 
 | 
| 
 | 
   570 
 | 
| 
 | 
   571 def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
 | 
| 
456
 | 
   572     """Set the medium based on the 'InMedium' column in the dataframe."""
 | 
| 
501
 | 
   573     if 'InMedium' not in df.columns:
 | 
| 
 | 
   574         print("No 'InMedium' column found, skipping medium setup")
 | 
| 
 | 
   575         return
 | 
| 
 | 
   576         
 | 
| 
419
 | 
   577     medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
 | 
| 
 | 
   578     
 | 
| 
 | 
   579     medium_dict = {}
 | 
| 
 | 
   580     for rxn_id in medium_reactions:
 | 
| 
 | 
   581         if rxn_id in [r.id for r in model.reactions]:
 | 
| 
 | 
   582             reaction = model.reactions.get_by_id(rxn_id)
 | 
| 
501
 | 
   583             if reaction.lower_bound < 0: 
 | 
| 
419
 | 
   584                 medium_dict[rxn_id] = abs(reaction.lower_bound)
 | 
| 
 | 
   585     
 | 
| 
 | 
   586     if medium_dict:
 | 
| 
 | 
   587         model.medium = medium_dict
 | 
| 
456
 | 
   588         print(f"Medium set with {len(medium_dict)} components")
 | 
| 
501
 | 
   589     else:
 | 
| 
 | 
   590         print("No medium components found")
 | 
| 
419
 | 
   591 def validate_model(model: cobraModel) -> Dict[str, any]:
 | 
| 
456
 | 
   592     """Validate the model and return basic statistics."""
 | 
| 
419
 | 
   593     validation = {
 | 
| 
 | 
   594         'num_reactions': len(model.reactions),
 | 
| 
 | 
   595         'num_metabolites': len(model.metabolites),
 | 
| 
 | 
   596         'num_genes': len(model.genes),
 | 
| 
 | 
   597         'num_compartments': len(model.compartments),
 | 
| 
 | 
   598         'objective': str(model.objective),
 | 
| 
 | 
   599         'medium_size': len(model.medium),
 | 
| 
 | 
   600         'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
 | 
| 
 | 
   601         'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
 | 
| 
 | 
   602     }
 | 
| 
 | 
   603     
 | 
| 
 | 
   604     try:
 | 
| 
456
 | 
   605         # Growth test
 | 
| 
419
 | 
   606         solution = model.optimize()
 | 
| 
 | 
   607         validation['growth_rate'] = solution.objective_value
 | 
| 
 | 
   608         validation['status'] = solution.status
 | 
| 
 | 
   609     except Exception as e:
 | 
| 
 | 
   610         validation['growth_rate'] = None
 | 
| 
 | 
   611         validation['status'] = f"Error: {e}"
 | 
| 
 | 
   612     
 | 
| 
 | 
   613     return validation
 | 
| 
 | 
   614 
 | 
| 
456
 | 
   615 def convert_genes(model, annotation):
 | 
| 
 | 
   616     """Rename genes using a selected annotation key in gene.notes; returns a model copy."""
 | 
| 
419
 | 
   617     from cobra.manipulation import rename_genes
 | 
| 
 | 
   618     model2=model.copy()
 | 
| 
 | 
   619     try:
 | 
| 
 | 
   620         dict_genes={gene.id:gene.notes[annotation]  for gene in model2.genes}
 | 
| 
 | 
   621     except:
 | 
| 
 | 
   622         print("No annotation in gene dict!")
 | 
| 
 | 
   623         return -1
 | 
| 
 | 
   624     rename_genes(model2,dict_genes)
 | 
| 
 | 
   625 
 | 
| 
426
 | 
   626     return model2
 | 
| 
 | 
   627 
 | 
| 
 | 
   628 # ---------- Utility helpers ----------
 | 
| 
 | 
   629 def _normalize_colname(col: str) -> str:
 | 
| 
 | 
   630     return col.strip().lower().replace(' ', '_')
 | 
| 
 | 
   631 
 | 
| 
 | 
   632 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
 | 
| 
 | 
   633     """
 | 
| 
456
 | 
   634     Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}.
 | 
| 
 | 
   635     Raise ValueError if no suitable mapping is found.
 | 
| 
426
 | 
   636     """
 | 
| 
 | 
   637     cols = { _normalize_colname(c): c for c in mapping_df.columns }
 | 
| 
 | 
   638     chosen = {}
 | 
| 
456
 | 
   639     # candidate names for each category
 | 
| 
426
 | 
   640     candidates = {
 | 
| 
 | 
   641         'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
 | 
| 
 | 
   642         'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
 | 
| 
444
 | 
   643         'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'],
 | 
| 
455
 | 
   644         'entrez_id': ['entrez', 'entrez_id', 'entrezgene'],
 | 
| 
 | 
   645         'gene_number': ['gene_number']
 | 
| 
426
 | 
   646     }
 | 
| 
 | 
   647     for key, names in candidates.items():
 | 
| 
 | 
   648         for n in names:
 | 
| 
 | 
   649             if n in cols:
 | 
| 
 | 
   650                 chosen[key] = cols[n]
 | 
| 
 | 
   651                 break
 | 
| 
 | 
   652     return chosen
 | 
| 
 | 
   653 
 | 
| 
 | 
   654 def _validate_target_uniqueness(mapping_df: 'pd.DataFrame',
 | 
| 
 | 
   655                                 source_col: str,
 | 
| 
 | 
   656                                 target_col: str,
 | 
| 
 | 
   657                                 model_source_genes: Optional[Set[str]] = None,
 | 
| 
 | 
   658                                 logger: Optional[logging.Logger] = None) -> None:
 | 
| 
 | 
   659     """
 | 
| 
456
 | 
   660         Check that, within the filtered mapping_df, each target maps to at most one source.
 | 
| 
 | 
   661         Log examples if duplicates are found.
 | 
| 
426
 | 
   662     """
 | 
| 
 | 
   663     if logger is None:
 | 
| 
 | 
   664         logger = logging.getLogger(__name__)
 | 
| 
 | 
   665 
 | 
| 
 | 
   666     if mapping_df.empty:
 | 
| 
 | 
   667         logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
 | 
| 
 | 
   668         return
 | 
| 
 | 
   669 
 | 
| 
456
 | 
   670     # normalize temporary columns for grouping (without altering the original df)
 | 
| 
426
 | 
   671     tmp = mapping_df[[source_col, target_col]].copy()
 | 
| 
503
 | 
   672     tmp['_src_norm'] = tmp[source_col].astype(str).apply(_normalize_gene_id)
 | 
| 
426
 | 
   673     tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
 | 
| 
 | 
   674 
 | 
| 
456
 | 
   675     # optionally filter to the set of model source genes
 | 
| 
426
 | 
   676     if model_source_genes is not None:
 | 
| 
 | 
   677         tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
 | 
| 
 | 
   678 
 | 
| 
 | 
   679     if tmp.empty:
 | 
| 
 | 
   680         logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.")
 | 
| 
 | 
   681         return
 | 
| 
 | 
   682 
 | 
| 
456
 | 
   683     # build reverse mapping: target -> set(sources)
 | 
| 
426
 | 
   684     grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
 | 
| 
456
 | 
   685     # find targets with more than one source
 | 
| 
426
 | 
   686     problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
 | 
| 
 | 
   687 
 | 
| 
 | 
   688     if problematic:
 | 
| 
456
 | 
   689     # prepare warning message with examples (limited subset)
 | 
| 
455
 | 
   690         sample_items = list(problematic.items())
 | 
| 
426
 | 
   691         msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
 | 
| 
 | 
   692         for tgt, sources in sample_items:
 | 
| 
 | 
   693             msg_lines.append(f"  - target '{tgt}' <- sources: {', '.join(sources)}")
 | 
| 
 | 
   694         full_msg = "\n".join(msg_lines)
 | 
| 
456
 | 
   695     # log warning
 | 
| 
455
 | 
   696         logger.warning(full_msg)
 | 
| 
426
 | 
   697 
 | 
| 
456
 | 
   698     # if everything is fine
 | 
| 
426
 | 
   699     logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
 | 
| 
 | 
   700 
 | 
| 
 | 
   701 
 | 
| 
 | 
   702 def _normalize_gene_id(g: str) -> str:
 | 
| 
456
 | 
   703     """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips)."""
 | 
| 
426
 | 
   704     if g is None:
 | 
| 
 | 
   705         return ""
 | 
| 
 | 
   706     g = str(g).strip()
 | 
| 
 | 
   707     # remove common prefixes
 | 
| 
 | 
   708     g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE)
 | 
| 
 | 
   709     g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
 | 
| 
 | 
   710     return g
 | 
| 
 | 
   711 
 | 
| 
493
 | 
   712 def _is_or_only_expression(expr: str) -> bool:
 | 
| 
 | 
   713     """
 | 
| 
 | 
   714     Check if a GPR expression contains only OR operators (no AND operators).
 | 
| 
 | 
   715     
 | 
| 
 | 
   716     Args:
 | 
| 
 | 
   717         expr: GPR expression string
 | 
| 
 | 
   718         
 | 
| 
 | 
   719     Returns:
 | 
| 
 | 
   720         bool: True if expression contains only OR (and parentheses) and has multiple genes, False otherwise
 | 
| 
 | 
   721     """
 | 
| 
 | 
   722     if not expr or not expr.strip():
 | 
| 
 | 
   723         return False
 | 
| 
 | 
   724         
 | 
| 
 | 
   725     # Normalize the expression
 | 
| 
 | 
   726     normalized = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
 | 
| 
 | 
   727     
 | 
| 
 | 
   728     # Check if it contains any AND operators
 | 
| 
 | 
   729     has_and = ' and ' in normalized.lower()
 | 
| 
 | 
   730     
 | 
| 
 | 
   731     # Check if it contains OR operators
 | 
| 
 | 
   732     has_or = ' or ' in normalized.lower()
 | 
| 
 | 
   733     
 | 
| 
 | 
   734     # Must have OR operators and no AND operators
 | 
| 
 | 
   735     return has_or and not has_and
 | 
| 
 | 
   736 
 | 
| 
 | 
   737 
 | 
| 
 | 
   738 def _flatten_or_only_gpr(expr: str) -> str:
 | 
| 
 | 
   739     """
 | 
| 
 | 
   740     Flatten a GPR expression that contains only OR operators by:
 | 
| 
 | 
   741     1. Removing all parentheses
 | 
| 
 | 
   742     2. Extracting unique gene names
 | 
| 
 | 
   743     3. Joining them with ' or '
 | 
| 
 | 
   744     
 | 
| 
 | 
   745     Args:
 | 
| 
 | 
   746         expr: GPR expression string with only OR operators
 | 
| 
 | 
   747         
 | 
| 
 | 
   748     Returns:
 | 
| 
 | 
   749         str: Flattened GPR expression
 | 
| 
 | 
   750     """
 | 
| 
 | 
   751     if not expr or not expr.strip():
 | 
| 
 | 
   752         return expr
 | 
| 
 | 
   753         
 | 
| 
 | 
   754     # Extract all gene tokens (exclude logical operators and parentheses)
 | 
| 
 | 
   755     gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
 | 
| 
 | 
   756     logical = {'and', 'or', 'AND', 'OR', '(', ')'}
 | 
| 
 | 
   757     
 | 
| 
 | 
   758     tokens = re.findall(gene_pattern, expr)
 | 
| 
 | 
   759     genes = [t for t in tokens if t not in logical]
 | 
| 
 | 
   760     
 | 
| 
 | 
   761     # Create set to remove duplicates, then convert back to list to maintain some order
 | 
| 
 | 
   762     unique_genes = list(dict.fromkeys(genes))  # Preserves insertion order
 | 
| 
 | 
   763     
 | 
| 
 | 
   764     if len(unique_genes) == 0:
 | 
| 
 | 
   765         return expr
 | 
| 
 | 
   766     elif len(unique_genes) == 1:
 | 
| 
 | 
   767         return unique_genes[0]
 | 
| 
 | 
   768     else:
 | 
| 
 | 
   769         return ' or '.join(unique_genes)
 | 
| 
 | 
   770 
 | 
| 
 | 
   771 
 | 
| 
455
 | 
   772 def _simplify_boolean_expression(expr: str) -> str:
 | 
| 
 | 
   773     """
 | 
| 
490
 | 
   774     Simplify a boolean expression by removing duplicates while strictly preserving semantics.
 | 
| 
 | 
   775     This function handles simple duplicates within parentheses while being conservative about
 | 
| 
 | 
   776     complex expressions that could change semantics.
 | 
| 
455
 | 
   777     """
 | 
| 
 | 
   778     if not expr or not expr.strip():
 | 
| 
 | 
   779         return expr
 | 
| 
 | 
   780     
 | 
| 
490
 | 
   781     # Normalize operators and whitespace
 | 
| 
455
 | 
   782     expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
 | 
| 
490
 | 
   783     expr = ' '.join(expr.split())  # Normalize whitespace
 | 
| 
455
 | 
   784     
 | 
| 
490
 | 
   785     def simplify_parentheses_content(match_obj):
 | 
| 
 | 
   786         """Helper function to simplify content within parentheses."""
 | 
| 
 | 
   787         content = match_obj.group(1)  # Content inside parentheses
 | 
| 
455
 | 
   788         
 | 
| 
490
 | 
   789         # Only simplify if it's a pure OR or pure AND chain
 | 
| 
 | 
   790         if ' or ' in content and ' and ' not in content:
 | 
| 
 | 
   791             # Pure OR chain - safe to deduplicate
 | 
| 
 | 
   792             parts = [p.strip() for p in content.split(' or ') if p.strip()]
 | 
| 
 | 
   793             unique_parts = []
 | 
| 
 | 
   794             seen = set()
 | 
| 
 | 
   795             for part in parts:
 | 
| 
 | 
   796                 if part not in seen:
 | 
| 
 | 
   797                     unique_parts.append(part)
 | 
| 
 | 
   798                     seen.add(part)
 | 
| 
455
 | 
   799             
 | 
| 
490
 | 
   800             if len(unique_parts) == 1:
 | 
| 
 | 
   801                 return unique_parts[0]  # Remove unnecessary parentheses for single items
 | 
| 
 | 
   802             else:
 | 
| 
 | 
   803                 return '(' + ' or '.join(unique_parts) + ')'
 | 
| 
 | 
   804                 
 | 
| 
 | 
   805         elif ' and ' in content and ' or ' not in content:
 | 
| 
 | 
   806             # Pure AND chain - safe to deduplicate  
 | 
| 
 | 
   807             parts = [p.strip() for p in content.split(' and ') if p.strip()]
 | 
| 
 | 
   808             unique_parts = []
 | 
| 
 | 
   809             seen = set()
 | 
| 
 | 
   810             for part in parts:
 | 
| 
 | 
   811                 if part not in seen:
 | 
| 
 | 
   812                     unique_parts.append(part)
 | 
| 
 | 
   813                     seen.add(part)
 | 
| 
455
 | 
   814             
 | 
| 
490
 | 
   815             if len(unique_parts) == 1:
 | 
| 
 | 
   816                 return unique_parts[0]  # Remove unnecessary parentheses for single items
 | 
| 
 | 
   817             else:
 | 
| 
 | 
   818                 return '(' + ' and '.join(unique_parts) + ')'
 | 
| 
 | 
   819         else:
 | 
| 
 | 
   820             # Mixed operators or single item - return with parentheses as-is
 | 
| 
 | 
   821             return '(' + content + ')'
 | 
| 
 | 
   822     
 | 
| 
 | 
   823     def remove_duplicates_simple(parts_str: str, separator: str) -> str:
 | 
| 
 | 
   824         """Remove duplicates from a simple chain of operations."""
 | 
| 
 | 
   825         parts = [p.strip() for p in parts_str.split(separator) if p.strip()]
 | 
| 
455
 | 
   826         
 | 
| 
490
 | 
   827         # Remove duplicates while preserving order
 | 
| 
 | 
   828         unique_parts = []
 | 
| 
 | 
   829         seen = set()
 | 
| 
 | 
   830         for part in parts:
 | 
| 
 | 
   831             if part not in seen:
 | 
| 
 | 
   832                 unique_parts.append(part)
 | 
| 
 | 
   833                 seen.add(part)
 | 
| 
455
 | 
   834         
 | 
| 
490
 | 
   835         if len(unique_parts) == 1:
 | 
| 
 | 
   836             return unique_parts[0]
 | 
| 
455
 | 
   837         else:
 | 
| 
490
 | 
   838             return f' {separator} '.join(unique_parts)
 | 
| 
455
 | 
   839     
 | 
| 
 | 
   840     try:
 | 
| 
490
 | 
   841         import re
 | 
| 
 | 
   842         
 | 
| 
 | 
   843         # First, simplify content within parentheses
 | 
| 
 | 
   844         # This handles cases like (A or A) -> A and (B and B) -> B
 | 
| 
 | 
   845         expr_simplified = re.sub(r'\(([^()]+)\)', simplify_parentheses_content, expr)
 | 
| 
 | 
   846         
 | 
| 
 | 
   847         # Check if the resulting expression has mixed operators  
 | 
| 
 | 
   848         has_and = ' and ' in expr_simplified
 | 
| 
 | 
   849         has_or = ' or ' in expr_simplified
 | 
| 
 | 
   850         
 | 
| 
 | 
   851         # Only simplify top-level if it's pure AND or pure OR
 | 
| 
 | 
   852         if has_and and not has_or and '(' not in expr_simplified:
 | 
| 
 | 
   853             # Pure AND chain at top level - safe to deduplicate
 | 
| 
 | 
   854             return remove_duplicates_simple(expr_simplified, 'and')
 | 
| 
 | 
   855         elif has_or and not has_and and '(' not in expr_simplified:
 | 
| 
 | 
   856             # Pure OR chain at top level - safe to deduplicate  
 | 
| 
 | 
   857             return remove_duplicates_simple(expr_simplified, 'or')
 | 
| 
 | 
   858         else:
 | 
| 
 | 
   859             # Mixed operators or has parentheses - return the simplified version (with parentheses content cleaned)
 | 
| 
 | 
   860             return expr_simplified
 | 
| 
 | 
   861             
 | 
| 
455
 | 
   862     except Exception:
 | 
| 
490
 | 
   863         # If anything goes wrong, return the original expression
 | 
| 
455
 | 
   864         return expr
 | 
| 
 | 
   865 
 | 
| 
492
 | 
   866 
 | 
| 
426
 | 
   867 def translate_model_genes(model: 'cobra.Model',
 | 
| 
 | 
   868                          mapping_df: 'pd.DataFrame',
 | 
| 
 | 
   869                          target_nomenclature: str,
 | 
| 
 | 
   870                          source_nomenclature: str = 'hgnc_id',
 | 
| 
455
 | 
   871                          allow_many_to_one: bool = False,
 | 
| 
490
 | 
   872                          logger: Optional[logging.Logger] = None) -> Tuple['cobra.Model', Dict[str, str]]:
 | 
| 
426
 | 
   873     """
 | 
| 
456
 | 
   874     Translate model genes from source_nomenclature to target_nomenclature using a mapping table.
 | 
| 
 | 
   875     mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez).
 | 
| 
 | 
   876 
 | 
| 
455
 | 
   877     Args:
 | 
| 
456
 | 
   878         model: COBRA model to translate.
 | 
| 
 | 
   879         mapping_df: DataFrame containing the mapping information.
 | 
| 
 | 
   880         target_nomenclature: Desired target key (e.g., 'hgnc_symbol').
 | 
| 
 | 
   881         source_nomenclature: Current source key in the model (default 'hgnc_id').
 | 
| 
 | 
   882         allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs.
 | 
| 
 | 
   883         logger: Optional logger.
 | 
| 
490
 | 
   884     
 | 
| 
 | 
   885     Returns:
 | 
| 
 | 
   886         Tuple containing:
 | 
| 
 | 
   887         - Translated COBRA model
 | 
| 
 | 
   888         - Dictionary mapping reaction IDs to translation issue descriptions
 | 
| 
426
 | 
   889     """
 | 
| 
 | 
   890     if logger is None:
 | 
| 
 | 
   891         logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 | 
| 
 | 
   892         logger = logging.getLogger(__name__)
 | 
| 
 | 
   893 
 | 
| 
 | 
   894     logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'")
 | 
| 
 | 
   895 
 | 
| 
 | 
   896     # normalize column names and choose relevant columns
 | 
| 
 | 
   897     chosen = _choose_columns(mapping_df)
 | 
| 
 | 
   898     if not chosen:
 | 
| 
 | 
   899         raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.")
 | 
| 
 | 
   900 
 | 
| 
 | 
   901     # map source/target to actual dataframe column names (allow user-specified source/target keys)
 | 
| 
 | 
   902     # normalize input args
 | 
| 
 | 
   903     src_key = source_nomenclature.strip().lower()
 | 
| 
 | 
   904     tgt_key = target_nomenclature.strip().lower()
 | 
| 
 | 
   905 
 | 
| 
 | 
   906     # try to find the actual column names for requested keys
 | 
| 
 | 
   907     col_for_src = None
 | 
| 
 | 
   908     col_for_tgt = None
 | 
| 
 | 
   909     # first, try exact match
 | 
| 
 | 
   910     for k, actual in chosen.items():
 | 
| 
 | 
   911         if k == src_key:
 | 
| 
 | 
   912             col_for_src = actual
 | 
| 
 | 
   913         if k == tgt_key:
 | 
| 
 | 
   914             col_for_tgt = actual
 | 
| 
 | 
   915 
 | 
| 
 | 
   916     # if not found, try mapping common names
 | 
| 
 | 
   917     if col_for_src is None:
 | 
| 
 | 
   918         possible_src_names = {k: v for k, v in chosen.items()}
 | 
| 
 | 
   919         # try to match by contained substring
 | 
| 
 | 
   920         for k, actual in possible_src_names.items():
 | 
| 
 | 
   921             if src_key in k:
 | 
| 
 | 
   922                 col_for_src = actual
 | 
| 
 | 
   923                 break
 | 
| 
 | 
   924 
 | 
| 
 | 
   925     if col_for_tgt is None:
 | 
| 
 | 
   926         for k, actual in chosen.items():
 | 
| 
 | 
   927             if tgt_key in k:
 | 
| 
 | 
   928                 col_for_tgt = actual
 | 
| 
 | 
   929                 break
 | 
| 
 | 
   930 
 | 
| 
 | 
   931     if col_for_src is None:
 | 
| 
 | 
   932         raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
 | 
| 
 | 
   933     if col_for_tgt is None:
 | 
| 
 | 
   934         raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
 | 
| 
 | 
   935 
 | 
| 
 | 
   936     model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
 | 
| 
 | 
   937     logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
 | 
| 
 | 
   938 
 | 
| 
 | 
   939     tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy()
 | 
| 
503
 | 
   940     tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).apply(_normalize_gene_id)
 | 
| 
426
 | 
   941 
 | 
| 
 | 
   942     filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
 | 
| 
 | 
   943 
 | 
| 
 | 
   944     if filtered_map.empty:
 | 
| 
 | 
   945         logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
 | 
| 
 | 
   946 
 | 
| 
455
 | 
   947     if not allow_many_to_one:
 | 
| 
 | 
   948         _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
 | 
| 
426
 | 
   949 
 | 
| 
455
 | 
   950     # Crea il mapping
 | 
| 
426
 | 
   951     gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
 | 
| 
 | 
   952 
 | 
| 
 | 
   953     # copy model
 | 
| 
 | 
   954     model_copy = model.copy()
 | 
| 
 | 
   955 
 | 
| 
 | 
   956     # statistics
 | 
| 
493
 | 
   957     stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0, 'flattened_or_gprs': 0}
 | 
| 
426
 | 
   958     unmapped = []
 | 
| 
 | 
   959     multi = []
 | 
| 
490
 | 
   960     
 | 
| 
 | 
   961     # Dictionary to store translation issues per reaction
 | 
| 
 | 
   962     reaction_translation_issues = {}
 | 
| 
426
 | 
   963 
 | 
| 
 | 
   964     original_genes = {g.id for g in model_copy.genes}
 | 
| 
 | 
   965     logger.info(f"Original genes count: {len(original_genes)}")
 | 
| 
 | 
   966 
 | 
| 
 | 
   967     # translate GPRs
 | 
| 
 | 
   968     for rxn in model_copy.reactions:
 | 
| 
 | 
   969         gpr = rxn.gene_reaction_rule
 | 
| 
 | 
   970         if gpr and gpr.strip():
 | 
| 
490
 | 
   971             new_gpr, rxn_issues = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger, track_issues=True)
 | 
| 
 | 
   972             if rxn_issues:
 | 
| 
 | 
   973                 reaction_translation_issues[rxn.id] = rxn_issues
 | 
| 
 | 
   974             
 | 
| 
426
 | 
   975             if new_gpr != gpr:
 | 
| 
493
 | 
   976                 # Check if this GPR has translation issues and contains only OR operators
 | 
| 
 | 
   977                 if rxn_issues and _is_or_only_expression(new_gpr):
 | 
| 
 | 
   978                     # Flatten the GPR: remove parentheses and create set of unique genes
 | 
| 
 | 
   979                     flattened_gpr = _flatten_or_only_gpr(new_gpr)
 | 
| 
 | 
   980                     if flattened_gpr != new_gpr:
 | 
| 
 | 
   981                         stats['flattened_or_gprs'] += 1
 | 
| 
 | 
   982                         logger.debug(f"Flattened OR-only GPR with issues for {rxn.id}: '{new_gpr}' -> '{flattened_gpr}'")
 | 
| 
 | 
   983                         new_gpr = flattened_gpr
 | 
| 
 | 
   984                 
 | 
| 
455
 | 
   985                 simplified_gpr = _simplify_boolean_expression(new_gpr)
 | 
| 
 | 
   986                 if simplified_gpr != new_gpr:
 | 
| 
 | 
   987                     stats['simplified_gprs'] += 1
 | 
| 
 | 
   988                     logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
 | 
| 
 | 
   989                 rxn.gene_reaction_rule = simplified_gpr
 | 
| 
 | 
   990                 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'")
 | 
| 
426
 | 
   991 
 | 
| 
 | 
   992     # update model genes based on new GPRs
 | 
| 
 | 
   993     _update_model_genes(model_copy, logger)
 | 
| 
 | 
   994 
 | 
| 
 | 
   995     # final logging
 | 
| 
 | 
   996     _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger)
 | 
| 
 | 
   997 
 | 
| 
 | 
   998     logger.info("Translation finished")
 | 
| 
490
 | 
   999     return model_copy, reaction_translation_issues
 | 
| 
426
 | 
  1000 
 | 
| 
 | 
  1001 
 | 
| 
 | 
  1002 # ---------- helper functions ----------
 | 
| 
 | 
  1003 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]:
 | 
| 
 | 
  1004     """
 | 
| 
 | 
  1005     Build mapping dict: source_id -> list of target_ids
 | 
| 
 | 
  1006     Normalizes IDs (removes prefixes like 'HGNC:' etc).
 | 
| 
 | 
  1007     """
 | 
| 
 | 
  1008     df = mapping_df[[source_col, target_col]].dropna().copy()
 | 
| 
 | 
  1009     # normalize to string
 | 
| 
503
 | 
  1010     df[source_col] = df[source_col].astype(str).apply(_normalize_gene_id)
 | 
| 
426
 | 
  1011     df[target_col] = df[target_col].astype(str).str.strip()
 | 
| 
 | 
  1012 
 | 
| 
 | 
  1013     df = df.drop_duplicates()
 | 
| 
 | 
  1014 
 | 
| 
 | 
  1015     logger.info(f"Creating mapping from {len(df)} rows")
 | 
| 
 | 
  1016 
 | 
| 
 | 
  1017     mapping = defaultdict(list)
 | 
| 
 | 
  1018     for _, row in df.iterrows():
 | 
| 
 | 
  1019         s = row[source_col]
 | 
| 
 | 
  1020         t = row[target_col]
 | 
| 
 | 
  1021         if t not in mapping[s]:
 | 
| 
 | 
  1022             mapping[s].append(t)
 | 
| 
 | 
  1023 
 | 
| 
 | 
  1024     # stats
 | 
| 
 | 
  1025     one_to_one = sum(1 for v in mapping.values() if len(v) == 1)
 | 
| 
 | 
  1026     one_to_many = sum(1 for v in mapping.values() if len(v) > 1)
 | 
| 
 | 
  1027     logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many")
 | 
| 
 | 
  1028     return dict(mapping)
 | 
| 
 | 
  1029 
 | 
| 
 | 
  1030 
 | 
| 
 | 
  1031 def _translate_gpr(gpr_string: str,
 | 
| 
 | 
  1032                    gene_mapping: Dict[str, List[str]],
 | 
| 
 | 
  1033                    stats: Dict[str, int],
 | 
| 
 | 
  1034                    unmapped_genes: List[str],
 | 
| 
 | 
  1035                    multi_mapping_genes: List[Tuple[str, List[str]]],
 | 
| 
490
 | 
  1036                    logger: logging.Logger,
 | 
| 
 | 
  1037                    track_issues: bool = False) -> Union[str, Tuple[str, str]]:
 | 
| 
426
 | 
  1038     """
 | 
| 
 | 
  1039     Translate genes inside a GPR string using gene_mapping.
 | 
| 
490
 | 
  1040     Returns new GPR string, and optionally translation issues if track_issues=True.
 | 
| 
426
 | 
  1041     """
 | 
| 
 | 
  1042     # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols)
 | 
| 
 | 
  1043     token_pattern = r'\b[A-Za-z0-9:_.-]+\b'
 | 
| 
 | 
  1044     tokens = re.findall(token_pattern, gpr_string)
 | 
| 
 | 
  1045 
 | 
| 
 | 
  1046     logical = {'and', 'or', 'AND', 'OR', '(', ')'}
 | 
| 
 | 
  1047     tokens = [t for t in tokens if t not in logical]
 | 
| 
 | 
  1048 
 | 
| 
 | 
  1049     new_gpr = gpr_string
 | 
| 
490
 | 
  1050     issues = []
 | 
| 
426
 | 
  1051 
 | 
| 
 | 
  1052     for token in sorted(set(tokens), key=lambda x: -len(x)):  # longer tokens first to avoid partial replacement
 | 
| 
 | 
  1053         norm = _normalize_gene_id(token)
 | 
| 
 | 
  1054         if norm in gene_mapping:
 | 
| 
 | 
  1055             targets = gene_mapping[norm]
 | 
| 
 | 
  1056             stats['translated'] += 1
 | 
| 
 | 
  1057             if len(targets) == 1:
 | 
| 
 | 
  1058                 stats['one_to_one'] += 1
 | 
| 
 | 
  1059                 replacement = targets[0]
 | 
| 
 | 
  1060             else:
 | 
| 
 | 
  1061                 stats['one_to_many'] += 1
 | 
| 
 | 
  1062                 multi_mapping_genes.append((token, targets))
 | 
| 
 | 
  1063                 replacement = "(" + " or ".join(targets) + ")"
 | 
| 
490
 | 
  1064                 if track_issues:
 | 
| 
 | 
  1065                     issues.append(f"{token} -> {' or '.join(targets)}")
 | 
| 
426
 | 
  1066 
 | 
| 
 | 
  1067             pattern = r'\b' + re.escape(token) + r'\b'
 | 
| 
 | 
  1068             new_gpr = re.sub(pattern, replacement, new_gpr)
 | 
| 
 | 
  1069         else:
 | 
| 
 | 
  1070             stats['not_found'] += 1
 | 
| 
 | 
  1071             if token not in unmapped_genes:
 | 
| 
 | 
  1072                 unmapped_genes.append(token)
 | 
| 
 | 
  1073             logger.debug(f"Token not found in mapping (left as-is): {token}")
 | 
| 
 | 
  1074 
 | 
| 
490
 | 
  1075     # Check for many-to-one cases (multiple source genes mapping to same target)
 | 
| 
 | 
  1076     if track_issues:
 | 
| 
 | 
  1077         # Build reverse mapping to detect many-to-one cases from original tokens
 | 
| 
 | 
  1078         original_to_target = {}
 | 
| 
 | 
  1079         
 | 
| 
 | 
  1080         for orig_token in tokens:
 | 
| 
 | 
  1081             norm = _normalize_gene_id(orig_token)
 | 
| 
 | 
  1082             if norm in gene_mapping:
 | 
| 
 | 
  1083                 targets = gene_mapping[norm]
 | 
| 
 | 
  1084                 for target in targets:
 | 
| 
 | 
  1085                     if target not in original_to_target:
 | 
| 
 | 
  1086                         original_to_target[target] = []
 | 
| 
 | 
  1087                     if orig_token not in original_to_target[target]:
 | 
| 
 | 
  1088                         original_to_target[target].append(orig_token)
 | 
| 
 | 
  1089         
 | 
| 
 | 
  1090         # Identify many-to-one mappings in this specific GPR
 | 
| 
 | 
  1091         for target, original_genes in original_to_target.items():
 | 
| 
 | 
  1092             if len(original_genes) > 1:
 | 
| 
 | 
  1093                 issues.append(f"{' or '.join(original_genes)} -> {target}")
 | 
| 
 | 
  1094     
 | 
| 
 | 
  1095     issue_text = "; ".join(issues) if issues else ""
 | 
| 
 | 
  1096     
 | 
| 
 | 
  1097     if track_issues:
 | 
| 
 | 
  1098         return new_gpr, issue_text
 | 
| 
 | 
  1099     else:
 | 
| 
 | 
  1100         return new_gpr
 | 
| 
426
 | 
  1101 
 | 
| 
 | 
  1102 
 | 
| 
 | 
  1103 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger):
 | 
| 
 | 
  1104     """
 | 
| 
 | 
  1105     Rebuild model.genes from gene_reaction_rule content.
 | 
| 
 | 
  1106     Removes genes not referenced and adds missing ones.
 | 
| 
 | 
  1107     """
 | 
| 
 | 
  1108     # collect genes in GPRs
 | 
| 
 | 
  1109     gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
 | 
| 
 | 
  1110     logical = {'and', 'or', 'AND', 'OR', '(', ')'}
 | 
| 
 | 
  1111     genes_in_gpr: Set[str] = set()
 | 
| 
 | 
  1112 
 | 
| 
 | 
  1113     for rxn in model.reactions:
 | 
| 
 | 
  1114         gpr = rxn.gene_reaction_rule
 | 
| 
 | 
  1115         if gpr and gpr.strip():
 | 
| 
 | 
  1116             toks = re.findall(gene_pattern, gpr)
 | 
| 
 | 
  1117             toks = [t for t in toks if t not in logical]
 | 
| 
 | 
  1118             # normalize IDs consistent with mapping normalization
 | 
| 
 | 
  1119             toks = [_normalize_gene_id(t) for t in toks]
 | 
| 
 | 
  1120             genes_in_gpr.update(toks)
 | 
| 
 | 
  1121 
 | 
| 
 | 
  1122     # existing gene ids
 | 
| 
 | 
  1123     existing = {g.id for g in model.genes}
 | 
| 
 | 
  1124 
 | 
| 
 | 
  1125     # remove obsolete genes
 | 
| 
 | 
  1126     to_remove = [gid for gid in existing if gid not in genes_in_gpr]
 | 
| 
 | 
  1127     removed = 0
 | 
| 
 | 
  1128     for gid in to_remove:
 | 
| 
 | 
  1129         try:
 | 
| 
 | 
  1130             gene_obj = model.genes.get_by_id(gid)
 | 
| 
 | 
  1131             model.genes.remove(gene_obj)
 | 
| 
 | 
  1132             removed += 1
 | 
| 
 | 
  1133         except Exception:
 | 
| 
 | 
  1134             # safe-ignore
 | 
| 
 | 
  1135             pass
 | 
| 
 | 
  1136 
 | 
| 
 | 
  1137     # add new genes
 | 
| 
 | 
  1138     added = 0
 | 
| 
 | 
  1139     for gid in genes_in_gpr:
 | 
| 
 | 
  1140         if gid not in existing:
 | 
| 
 | 
  1141             new_gene = cobra.Gene(gid)
 | 
| 
 | 
  1142             try:
 | 
| 
 | 
  1143                 model.genes.add(new_gene)
 | 
| 
 | 
  1144             except Exception:
 | 
| 
 | 
  1145                 # fallback: if model.genes doesn't support add, try append or model.add_genes
 | 
| 
 | 
  1146                 try:
 | 
| 
 | 
  1147                     model.genes.append(new_gene)
 | 
| 
 | 
  1148                 except Exception:
 | 
| 
 | 
  1149                     try:
 | 
| 
 | 
  1150                         model.add_genes([new_gene])
 | 
| 
 | 
  1151                     except Exception:
 | 
| 
 | 
  1152                         logger.warning(f"Could not add gene object for {gid}")
 | 
| 
 | 
  1153             added += 1
 | 
| 
 | 
  1154 
 | 
| 
 | 
  1155     logger.info(f"Model genes updated: removed {removed}, added {added}")
 | 
| 
 | 
  1156 
 | 
| 
 | 
  1157 
 | 
| 
508
 | 
  1158 def export_model_to_tabular(model: cobraModel, 
 | 
| 
 | 
  1159                            output_path: str,
 | 
| 
 | 
  1160                            translation_issues: Dict = None,
 | 
| 
 | 
  1161                            include_objective: bool = True,
 | 
| 
 | 
  1162                            save_function = None) -> pd.DataFrame:
 | 
| 
 | 
  1163     """
 | 
| 
 | 
  1164     Export a COBRA model to tabular format with optional components.
 | 
| 
 | 
  1165     
 | 
| 
 | 
  1166     Args:
 | 
| 
 | 
  1167         model: COBRA model to export
 | 
| 
 | 
  1168         output_path: Path where to save the tabular file
 | 
| 
 | 
  1169         translation_issues: Optional dict of {reaction_id: issues} from gene translation
 | 
| 
 | 
  1170         include_objective: Whether to include objective coefficient column
 | 
| 
 | 
  1171         save_function: Optional custom save function, if None uses pd.DataFrame.to_csv
 | 
| 
 | 
  1172         
 | 
| 
 | 
  1173     Returns:
 | 
| 
 | 
  1174         pd.DataFrame: The merged tabular data
 | 
| 
 | 
  1175     """
 | 
| 
 | 
  1176     # Generate model data
 | 
| 
 | 
  1177     rules = generate_rules(model, asParsed=False)
 | 
| 
 | 
  1178     
 | 
| 
 | 
  1179     reactions = generate_reactions(model, asParsed=False)
 | 
| 
 | 
  1180     bounds = generate_bounds(model)
 | 
| 
 | 
  1181     medium = get_medium(model)
 | 
| 
 | 
  1182     compartments = generate_compartments(model)
 | 
| 
 | 
  1183     
 | 
| 
 | 
  1184     # Create base DataFrames
 | 
| 
 | 
  1185     df_rules = pd.DataFrame(list(rules.items()), columns=["ReactionID", "GPR"])
 | 
| 
 | 
  1186     df_reactions = pd.DataFrame(list(reactions.items()), columns=["ReactionID", "Formula"])
 | 
| 
 | 
  1187     df_bounds = bounds.reset_index().rename(columns={"index": "ReactionID"})
 | 
| 
 | 
  1188     df_medium = medium.rename(columns={"reaction": "ReactionID"})
 | 
| 
 | 
  1189     df_medium["InMedium"] = True
 | 
| 
 | 
  1190     
 | 
| 
 | 
  1191     # Start merging
 | 
| 
 | 
  1192     merged = df_reactions.merge(df_rules, on="ReactionID", how="outer")
 | 
| 
 | 
  1193     merged = merged.merge(df_bounds, on="ReactionID", how="outer")
 | 
| 
 | 
  1194     
 | 
| 
 | 
  1195     # Add objective coefficients if requested
 | 
| 
 | 
  1196     if include_objective:
 | 
| 
 | 
  1197         objective_function = extract_objective_coefficients(model)
 | 
| 
 | 
  1198         merged = merged.merge(objective_function, on="ReactionID", how="outer")
 | 
| 
 | 
  1199     
 | 
| 
 | 
  1200     # Add compartments/pathways if they exist
 | 
| 
 | 
  1201     if compartments is not None:
 | 
| 
 | 
  1202         merged = merged.merge(compartments, on="ReactionID", how="outer")
 | 
| 
 | 
  1203     
 | 
| 
 | 
  1204     # Add medium information
 | 
| 
 | 
  1205     merged = merged.merge(df_medium, on="ReactionID", how="left")
 | 
| 
 | 
  1206     
 | 
| 
 | 
  1207     # Add translation issues if provided
 | 
| 
 | 
  1208     if translation_issues:
 | 
| 
 | 
  1209         df_translation_issues = pd.DataFrame([
 | 
| 
 | 
  1210             {"ReactionID": rxn_id, "TranslationIssues": issues}
 | 
| 
 | 
  1211             for rxn_id, issues in translation_issues.items()
 | 
| 
 | 
  1212         ])
 | 
| 
 | 
  1213         if not df_translation_issues.empty:
 | 
| 
 | 
  1214             merged = merged.merge(df_translation_issues, on="ReactionID", how="left")
 | 
| 
 | 
  1215             merged["TranslationIssues"] = merged["TranslationIssues"].fillna("")
 | 
| 
 | 
  1216     
 | 
| 
 | 
  1217     # Final processing
 | 
| 
 | 
  1218     merged["InMedium"] = merged["InMedium"].fillna(False)
 | 
| 
 | 
  1219     merged = merged.sort_values(by="InMedium", ascending=False)
 | 
| 
 | 
  1220     
 | 
| 
 | 
  1221     # Save the file
 | 
| 
 | 
  1222     if save_function:
 | 
| 
 | 
  1223         save_function(merged, output_path)
 | 
| 
 | 
  1224     else:
 | 
| 
 | 
  1225         merged.to_csv(output_path, sep="\t", index=False)
 | 
| 
 | 
  1226     
 | 
| 
 | 
  1227     return merged
 | 
| 
 | 
  1228 
 | 
| 
 | 
  1229 
 | 
| 
426
 | 
  1230 def _log_translation_statistics(stats: Dict[str, int],
 | 
| 
 | 
  1231                                unmapped_genes: List[str],
 | 
| 
 | 
  1232                                multi_mapping_genes: List[Tuple[str, List[str]]],
 | 
| 
 | 
  1233                                original_genes: Set[str],
 | 
| 
 | 
  1234                                final_genes,
 | 
| 
 | 
  1235                                logger: logging.Logger):
 | 
| 
 | 
  1236     logger.info("=== TRANSLATION STATISTICS ===")
 | 
| 
 | 
  1237     logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
 | 
| 
 | 
  1238     logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
 | 
| 
455
 | 
  1239     logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}")
 | 
| 
493
 | 
  1240     logger.info(f"Flattened OR-only GPRs with issues: {stats.get('flattened_or_gprs', 0)}")
 | 
| 
426
 | 
  1241 
 | 
| 
 | 
  1242     final_ids = {g.id for g in final_genes}
 | 
| 
 | 
  1243     logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")
 | 
| 
 | 
  1244 
 | 
| 
 | 
  1245     if unmapped_genes:
 | 
| 
 | 
  1246         logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}")
 | 
| 
 | 
  1247     if multi_mapping_genes:
 | 
| 
 | 
  1248         logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):")
 | 
| 
 | 
  1249         for orig, targets in multi_mapping_genes[:10]:
 | 
| 
490
 | 
  1250             logger.info(f"  {orig} -> {', '.join(targets)}")
 | 
| 
493
 | 
  1251     
 | 
| 
 | 
  1252     # Log summary of flattened GPRs if any
 | 
| 
 | 
  1253     if stats.get('flattened_or_gprs', 0) > 0:
 | 
| 
 | 
  1254         logger.info(f"Flattened {stats['flattened_or_gprs']} OR-only GPRs that had translation issues (removed parentheses, created unique gene sets)")
 | 
| 
490
 | 
  1255 
 | 
| 
 | 
  1256      |