| 539 | 1 """ | 
|  | 2 Utilities for generating and manipulating COBRA models and related metadata. | 
|  | 3 | 
|  | 4 This module includes helpers to: | 
|  | 5 - extract rules, reactions, bounds, objective coefficients, and compartments | 
|  | 6 - build a COBRA model from a tabular file | 
|  | 7 - set objective and medium from dataframes | 
|  | 8 - validate a model and convert gene identifiers | 
|  | 9 - translate model GPRs using mapping tables | 
|  | 10 """ | 
|  | 11 import os | 
|  | 12 import cobra | 
|  | 13 import pandas as pd | 
|  | 14 import re | 
|  | 15 import logging | 
|  | 16 from typing import Optional, Tuple, Union, List, Dict, Set | 
|  | 17 from collections import defaultdict | 
|  | 18 from cobra import Model as cobraModel, Reaction, Metabolite | 
|  | 19 import sys | 
|  | 20 | 
| 542 | 21 try: | 
|  | 22     from . import rule_parsing as rulesUtils | 
|  | 23     from . import reaction_parsing as reactionUtils | 
|  | 24 except: | 
|  | 25     import rule_parsing as rulesUtils | 
|  | 26     import reaction_parsing as reactionUtils | 
|  | 27 | 
| 539 | 28 | 
|  | 29 ############################ check_methods #################################### | 
|  | 30 def gene_type(l :str, name :str) -> str: | 
|  | 31     """ | 
|  | 32     Determine the type of gene ID. | 
|  | 33 | 
|  | 34     Args: | 
|  | 35         l (str): The gene identifier to check. | 
|  | 36         name (str): The name of the dataset, used in error messages. | 
|  | 37 | 
|  | 38     Returns: | 
|  | 39         str: The type of gene ID ('HGNC_ID', 'ENSG', 'HGNC_symbol', or 'entrez_id'). | 
|  | 40 | 
|  | 41     Raises: | 
|  | 42         sys.exit: If the gene ID type is not supported, the execution is aborted. | 
|  | 43     """ | 
|  | 44     if check_hgnc(l): | 
|  | 45         return 'HGNC_ID' | 
|  | 46     elif check_ensembl(l): | 
|  | 47         return 'ENSG' | 
|  | 48     elif check_symbol(l): | 
|  | 49         return 'HGNC_symbol' | 
|  | 50     elif check_entrez(l): | 
|  | 51         return 'entrez_id' | 
|  | 52     else: | 
|  | 53         sys.exit('Execution aborted:\n' + | 
|  | 54                  'gene ID type in ' + name + ' not supported. Supported ID'+ | 
|  | 55                  'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n') | 
|  | 56 | 
|  | 57 def check_hgnc(l :str) -> bool: | 
|  | 58     """ | 
|  | 59     Check if a gene identifier follows the HGNC format. | 
|  | 60 | 
|  | 61     Args: | 
|  | 62         l (str): The gene identifier to check. | 
|  | 63 | 
|  | 64     Returns: | 
|  | 65         bool: True if the gene identifier follows the HGNC format, False otherwise. | 
|  | 66     """ | 
|  | 67     if len(l) > 5: | 
|  | 68         if (l.upper()).startswith('HGNC:'): | 
|  | 69             return l[5:].isdigit() | 
|  | 70         else: | 
|  | 71             return False | 
|  | 72     else: | 
|  | 73         return False | 
|  | 74 | 
|  | 75 def check_ensembl(l :str) -> bool: | 
|  | 76     """ | 
|  | 77     Check if a gene identifier follows the Ensembl format. | 
|  | 78 | 
|  | 79     Args: | 
|  | 80         l (str): The gene identifier to check. | 
|  | 81 | 
|  | 82     Returns: | 
|  | 83         bool: True if the gene identifier follows the Ensembl format, False otherwise. | 
|  | 84     """ | 
|  | 85     return l.upper().startswith('ENS') | 
|  | 86 | 
|  | 87 | 
|  | 88 def check_symbol(l :str) -> bool: | 
|  | 89     """ | 
|  | 90     Check if a gene identifier follows the symbol format. | 
|  | 91 | 
|  | 92     Args: | 
|  | 93         l (str): The gene identifier to check. | 
|  | 94 | 
|  | 95     Returns: | 
|  | 96         bool: True if the gene identifier follows the symbol format, False otherwise. | 
|  | 97     """ | 
|  | 98     if len(l) > 0: | 
|  | 99         if l[0].isalpha() and l[1:].isalnum(): | 
|  | 100             return True | 
|  | 101         else: | 
|  | 102             return False | 
|  | 103     else: | 
|  | 104         return False | 
|  | 105 | 
|  | 106 def check_entrez(l :str) -> bool: | 
|  | 107     """ | 
|  | 108     Check if a gene identifier follows the Entrez ID format. | 
|  | 109 | 
|  | 110     Args: | 
|  | 111         l (str): The gene identifier to check. | 
|  | 112 | 
|  | 113     Returns: | 
|  | 114         bool: True if the gene identifier follows the Entrez ID format, False otherwise. | 
|  | 115     """ | 
|  | 116     if len(l) > 0: | 
|  | 117         return l.isdigit() | 
|  | 118     else: | 
|  | 119         return False | 
|  | 120 | 
|  | 121 ################################- DATA GENERATION -################################ | 
|  | 122 ReactionId = str | 
|  | 123 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | 
|  | 124     """ | 
|  | 125     Generate a dictionary mapping reaction IDs to GPR rules from the model. | 
|  | 126 | 
|  | 127     Args: | 
|  | 128         model: COBRA model to derive data from. | 
|  | 129         asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings. | 
|  | 130 | 
|  | 131     Returns: | 
|  | 132         Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID. | 
|  | 133         Dict[ReactionId, str]: Raw rules by reaction ID. | 
|  | 134     """ | 
|  | 135     _ruleGetter   =  lambda reaction : reaction.gene_reaction_rule | 
|  | 136     ruleExtractor = (lambda reaction : | 
|  | 137         rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter | 
|  | 138 | 
|  | 139     return { | 
|  | 140         reaction.id : ruleExtractor(reaction) | 
|  | 141         for reaction in model.reactions | 
|  | 142         if reaction.gene_reaction_rule } | 
|  | 143 | 
|  | 144 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: | 
|  | 145     """ | 
|  | 146     Generate a dictionary mapping reaction IDs to reaction formulas from the model. | 
|  | 147 | 
|  | 148     Args: | 
|  | 149         model: COBRA model to derive data from. | 
|  | 150         asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings. | 
|  | 151 | 
|  | 152     Returns: | 
|  | 153         Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested). | 
|  | 154     """ | 
|  | 155 | 
|  | 156     unparsedReactions = { | 
|  | 157         reaction.id : reaction.reaction | 
|  | 158         for reaction in model.reactions | 
|  | 159         if reaction.reaction | 
|  | 160     } | 
|  | 161 | 
|  | 162     if not asParsed: return unparsedReactions | 
|  | 163 | 
|  | 164     return reactionUtils.create_reaction_dict(unparsedReactions) | 
|  | 165 | 
|  | 166 def get_medium(model:cobraModel) -> pd.DataFrame: | 
|  | 167     """ | 
|  | 168     Extract the uptake reactions representing the model medium. | 
|  | 169 | 
|  | 170     Returns a DataFrame with a single column 'reaction' listing exchange reactions | 
|  | 171     with negative lower bound and no positive stoichiometric coefficients (uptake only). | 
|  | 172     """ | 
|  | 173     trueMedium=[] | 
|  | 174     for r in model.reactions: | 
|  | 175         positiveCoeff=0 | 
|  | 176         for m in r.metabolites: | 
|  | 177             if r.get_coefficient(m.id)>0: | 
|  | 178                 positiveCoeff=1; | 
|  | 179         if (positiveCoeff==0 and r.lower_bound<0): | 
|  | 180             trueMedium.append(r.id) | 
|  | 181 | 
|  | 182     df_medium = pd.DataFrame() | 
|  | 183     df_medium["reaction"] = trueMedium | 
|  | 184     return df_medium | 
|  | 185 | 
|  | 186 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame: | 
|  | 187     """ | 
|  | 188     Extract objective coefficients for each reaction. | 
|  | 189 | 
|  | 190     Args: | 
|  | 191         model: COBRA model | 
|  | 192 | 
|  | 193     Returns: | 
|  | 194         pd.DataFrame with columns: ReactionID, ObjectiveCoefficient | 
|  | 195     """ | 
|  | 196     coeffs = [] | 
|  | 197     # model.objective.expression is a linear expression | 
|  | 198     objective_expr = model.objective.expression.as_coefficients_dict() | 
|  | 199 | 
|  | 200     for reaction in model.reactions: | 
|  | 201         coeff = objective_expr.get(reaction.forward_variable, 0.0) | 
|  | 202         coeffs.append({ | 
|  | 203             "ReactionID": reaction.id, | 
|  | 204             "ObjectiveCoefficient": coeff | 
|  | 205         }) | 
|  | 206 | 
|  | 207     return pd.DataFrame(coeffs) | 
|  | 208 | 
|  | 209 def generate_bounds(model:cobraModel) -> pd.DataFrame: | 
|  | 210     """ | 
|  | 211     Build a DataFrame of lower/upper bounds for all reactions. | 
|  | 212 | 
|  | 213     Returns: | 
|  | 214         pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound']. | 
|  | 215     """ | 
|  | 216 | 
|  | 217     rxns = [] | 
|  | 218     for reaction in model.reactions: | 
|  | 219         rxns.append(reaction.id) | 
|  | 220 | 
|  | 221     bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns) | 
|  | 222 | 
|  | 223     for reaction in model.reactions: | 
|  | 224         bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] | 
|  | 225     return bounds | 
|  | 226 | 
|  | 227 | 
|  | 228 | 
|  | 229 def generate_compartments(model: cobraModel) -> pd.DataFrame: | 
|  | 230     """ | 
|  | 231     Generates a DataFrame containing pathway information for each reaction. | 
|  | 232     Creates columns for each pathway position (Pathway_1, Pathway_2, etc.) only if pathways exist. | 
|  | 233 | 
|  | 234     Args: | 
|  | 235         model: the COBRA model to extract pathway data from. | 
|  | 236 | 
|  | 237     Returns: | 
|  | 238         pd.DataFrame: DataFrame with ReactionID and pathway columns (if any pathways exist) | 
|  | 239     """ | 
|  | 240     pathway_data = [] | 
|  | 241 | 
|  | 242     # First pass: determine the maximum number of pathways any reaction has | 
|  | 243     max_pathways = 0 | 
|  | 244     reaction_pathways = {} | 
|  | 245     has_any_pathways = False | 
|  | 246 | 
|  | 247     for reaction in model.reactions: | 
|  | 248         # Get unique pathways from all metabolites in the reaction | 
|  | 249         if 'pathways' in reaction.annotation and reaction.annotation['pathways']: | 
|  | 250             if type(reaction.annotation['pathways']) == list: | 
|  | 251                 # Filter out empty/None values | 
|  | 252                 valid_pathways = [p for p in reaction.annotation['pathways'] if p] | 
|  | 253                 if valid_pathways: | 
|  | 254                     reaction_pathways[reaction.id] = valid_pathways | 
|  | 255                     max_pathways = max(max_pathways, len(valid_pathways)) | 
|  | 256                     has_any_pathways = True | 
|  | 257                 else: | 
|  | 258                     reaction_pathways[reaction.id] = [] | 
|  | 259             else: | 
|  | 260                 # Single pathway value | 
|  | 261                 if reaction.annotation['pathways']: | 
|  | 262                     reaction_pathways[reaction.id] = [reaction.annotation['pathways']] | 
|  | 263                     max_pathways = max(max_pathways, 1) | 
|  | 264                     has_any_pathways = True | 
|  | 265                 else: | 
|  | 266                     reaction_pathways[reaction.id] = [] | 
|  | 267         else: | 
|  | 268             # No pathway annotation - use empty list | 
|  | 269             reaction_pathways[reaction.id] = [] | 
|  | 270 | 
|  | 271     # If no pathways exist in the model, return DataFrame with only ReactionID | 
|  | 272     if not has_any_pathways: | 
|  | 273         return None | 
|  | 274 | 
|  | 275     # Create column names for pathways only if they exist | 
|  | 276     pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)] | 
|  | 277 | 
|  | 278     # Second pass: create the data with pathway columns | 
|  | 279     for reaction_id, pathways in reaction_pathways.items(): | 
|  | 280         row = {"ReactionID": reaction_id} | 
|  | 281 | 
|  | 282         # Fill pathway columns | 
|  | 283         for i in range(max_pathways): | 
|  | 284             col_name = pathway_columns[i] | 
|  | 285             if i < len(pathways): | 
|  | 286                 row[col_name] = pathways[i] | 
|  | 287             else: | 
|  | 288                 row[col_name] = None | 
|  | 289 | 
|  | 290         pathway_data.append(row) | 
|  | 291 | 
|  | 292     return pd.DataFrame(pathway_data) | 
|  | 293 | 
|  | 294 def set_annotation_pathways_from_data(model: cobraModel, df: pd.DataFrame): | 
|  | 295     """Set reaction pathways based on 'Pathway_1', 'Pathway_2', ... columns in the dataframe.""" | 
|  | 296     pathway_cols = [col for col in df.columns if col.startswith('Pathway_')] | 
|  | 297     if not pathway_cols: | 
|  | 298         print("No 'Pathway_' columns found, skipping pathway annotation") | 
|  | 299         return | 
|  | 300 | 
|  | 301     pathway_data = defaultdict(list) | 
|  | 302 | 
|  | 303     for idx, row in df.iterrows(): | 
|  | 304         reaction_id = str(row['ReactionID']).strip() | 
|  | 305         if reaction_id not in model.reactions: | 
|  | 306             continue | 
|  | 307 | 
|  | 308         pathways = [] | 
|  | 309         for col in pathway_cols: | 
|  | 310             if pd.notna(row[col]) and str(row[col]).strip(): | 
|  | 311                 pathways.append(str(row[col]).strip()) | 
|  | 312 | 
|  | 313         if pathways: | 
|  | 314 | 
|  | 315             reaction = model.reactions.get_by_id(reaction_id) | 
|  | 316             if len(pathways) == 1: | 
|  | 317                 reaction.annotation['pathways'] = pathways[0] | 
|  | 318             else: | 
|  | 319                 reaction.annotation['pathways'] = pathways | 
|  | 320 | 
|  | 321             pathway_data[reaction_id] = pathways | 
|  | 322 | 
|  | 323     print(f"Annotated {len(pathway_data)} reactions with pathways.") | 
|  | 324 | 
|  | 325 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: | 
|  | 326     """ | 
|  | 327     Build a COBRApy model from a tabular file with reaction data. | 
|  | 328 | 
|  | 329     Args: | 
|  | 330         csv_path: Path to the tab-separated file. | 
|  | 331         model_id: ID for the newly created model. | 
|  | 332 | 
|  | 333     Returns: | 
|  | 334         cobra.Model: The constructed COBRApy model. | 
|  | 335     """ | 
|  | 336 | 
|  | 337     # Try to detect separator | 
|  | 338     with open(csv_path, 'r') as f: | 
|  | 339         first_line = f.readline() | 
|  | 340         sep = '\t' if '\t' in first_line else ',' | 
|  | 341 | 
|  | 342     df = pd.read_csv(csv_path, sep=sep) | 
|  | 343 | 
|  | 344     # Check required columns | 
|  | 345     required_cols = ['ReactionID', 'Formula'] | 
|  | 346     missing_cols = [col for col in required_cols if col not in df.columns] | 
|  | 347     if missing_cols: | 
|  | 348         raise ValueError(f"Missing required columns: {missing_cols}. Available columns: {list(df.columns)}") | 
|  | 349 | 
|  | 350     model = cobraModel(model_id) | 
|  | 351 | 
|  | 352     metabolites_dict = {} | 
|  | 353     compartments_dict = {} | 
|  | 354 | 
|  | 355     print(f"Building model from {len(df)} reactions...") | 
|  | 356 | 
|  | 357     for idx, row in df.iterrows(): | 
|  | 358         reaction_formula = str(row['Formula']).strip() | 
|  | 359         if not reaction_formula or reaction_formula == 'nan': | 
|  | 360             continue | 
|  | 361 | 
|  | 362         metabolites = extract_metabolites_from_reaction(reaction_formula) | 
|  | 363 | 
|  | 364         for met_id in metabolites: | 
|  | 365             compartment = extract_compartment_from_metabolite(met_id) | 
|  | 366 | 
|  | 367             if compartment not in compartments_dict: | 
|  | 368                 compartments_dict[compartment] = compartment | 
|  | 369 | 
|  | 370             if met_id not in metabolites_dict: | 
|  | 371                 metabolites_dict[met_id] = Metabolite( | 
|  | 372                     id=met_id, | 
|  | 373                     compartment=compartment, | 
|  | 374                     name=met_id.replace(f"_{compartment}", "").replace("__", "_") | 
|  | 375                 ) | 
|  | 376 | 
|  | 377     model.compartments = compartments_dict | 
|  | 378 | 
|  | 379     model.add_metabolites(list(metabolites_dict.values())) | 
|  | 380 | 
|  | 381     print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments") | 
|  | 382 | 
|  | 383     reactions_added = 0 | 
|  | 384     reactions_skipped = 0 | 
|  | 385 | 
|  | 386     for idx, row in df.iterrows(): | 
|  | 387 | 
|  | 388         reaction_id = str(row['ReactionID']).strip() | 
|  | 389         reaction_formula = str(row['Formula']).strip() | 
|  | 390 | 
|  | 391         if not reaction_formula or reaction_formula == 'nan': | 
|  | 392             raise ValueError(f"Missing reaction formula for {reaction_id}") | 
|  | 393 | 
|  | 394         reaction = Reaction(reaction_id) | 
|  | 395         reaction.name = reaction_id | 
|  | 396 | 
|  | 397         reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 | 
|  | 398         reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 | 
|  | 399 | 
|  | 400         if pd.notna(row['GPR']) and str(row['GPR']).strip(): | 
|  | 401             reaction.gene_reaction_rule = str(row['GPR']).strip() | 
|  | 402 | 
|  | 403         try: | 
|  | 404             parse_reaction_formula(reaction, reaction_formula, metabolites_dict) | 
|  | 405         except Exception as e: | 
|  | 406             print(f"Error parsing reaction {reaction_id}: {e}") | 
|  | 407             reactions_skipped += 1 | 
|  | 408             continue | 
|  | 409 | 
|  | 410         model.add_reactions([reaction]) | 
|  | 411         reactions_added += 1 | 
|  | 412 | 
|  | 413 | 
|  | 414     print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions") | 
|  | 415 | 
|  | 416     # set objective function | 
|  | 417     set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient") | 
|  | 418 | 
|  | 419     set_medium_from_data(model, df) | 
|  | 420 | 
|  | 421     set_annotation_pathways_from_data(model, df) | 
|  | 422 | 
|  | 423     print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites") | 
|  | 424 | 
|  | 425     return model | 
|  | 426 | 
|  | 427 | 
|  | 428 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) | 
|  | 429 #def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: | 
|  | 430 #    """ | 
|  | 431 #    Extract metabolite IDs from a reaction formula. | 
|  | 432 #    Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e), | 
|  | 433 #    allowing leading digits/underscores. | 
|  | 434 #    """ | 
|  | 435 #    metabolites = set() | 
|  | 436 #    # optional coefficient followed by a token ending with _<letters> | 
|  | 437 #    if reaction_formula[-1] == ']' and reaction_formula[-3] == '[': | 
|  | 438 #        pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+[[A-Za-z0-9]]+)' | 
|  | 439 #    else: | 
|  | 440 #        pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[A-Za-z0-9]+)' | 
|  | 441 #    matches = re.findall(pattern, reaction_formula) | 
|  | 442 #    metabolites.update(matches) | 
|  | 443 #    return metabolites | 
|  | 444 | 
|  | 445 | 
|  | 446 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: | 
|  | 447     """ | 
|  | 448     Extract metabolite IDs from a reaction formula. | 
|  | 449 | 
|  | 450     Handles: | 
|  | 451       - optional stoichiometric coefficients (integers or decimals) | 
|  | 452       - compartment tags at the end of the metabolite, either [c] or _c | 
|  | 453 | 
|  | 454     Returns the IDs including the compartment suffix exactly as written. | 
|  | 455     """ | 
|  | 456     pattern = re.compile( | 
|  | 457         r'(?:^|(?<=\s)|(?<=\+)|(?<=,)|(?<==)|(?<=:))'              # left boundary (start, space, +, comma, =, :) | 
|  | 458         r'(?:\d+(?:\.\d+)?\s+)?'                                   # optional coefficient (requires space after) | 
|  | 459         r'([A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+))'  # metabolite + compartment (can start with number) | 
|  | 460     ) | 
|  | 461     return {m.group(1) for m in pattern.finditer(reaction_formula)} | 
|  | 462 | 
|  | 463 | 
|  | 464 | 
|  | 465 def extract_compartment_from_metabolite(metabolite_id: str) -> str: | 
|  | 466     """Extract the compartment from a metabolite ID.""" | 
|  | 467     if '_' == metabolite_id[-2]: | 
|  | 468         return metabolite_id.split('_')[-1] | 
|  | 469     if metabolite_id[-1] == ']' and metabolite_id[-3] == '[': | 
|  | 470         return metabolite_id[-2] | 
|  | 471     return 'c'  # default cytoplasm | 
|  | 472 | 
|  | 473 | 
|  | 474 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): | 
|  | 475     """Parse a reaction formula and set metabolites with their coefficients.""" | 
|  | 476 | 
|  | 477     if '<=>' in formula: | 
|  | 478         parts = formula.split('<=>') | 
|  | 479         reversible = True | 
|  | 480     elif '<--' in formula: | 
|  | 481         parts = formula.split('<--') | 
|  | 482         reversible = False | 
|  | 483     elif '-->' in formula: | 
|  | 484         parts = formula.split('-->') | 
|  | 485         reversible = False | 
|  | 486     elif '<-' in formula: | 
|  | 487         parts = formula.split('<-') | 
|  | 488         reversible = False | 
|  | 489     else: | 
|  | 490         raise ValueError(f"Unrecognized reaction format: {formula}") | 
|  | 491 | 
|  | 492     # Handle cases where one side might be empty (exchange reactions) | 
|  | 493     if len(parts) != 2: | 
|  | 494         raise ValueError(f"Invalid reaction format, expected 2 parts: {formula}") | 
|  | 495 | 
|  | 496     left, right = parts[0].strip(), parts[1].strip() | 
|  | 497 | 
|  | 498     reactants = parse_metabolites_side(left) if left else {} | 
|  | 499     products = parse_metabolites_side(right) if right else {} | 
|  | 500 | 
|  | 501     metabolites_to_add = {} | 
|  | 502 | 
|  | 503     for met_id, coeff in reactants.items(): | 
|  | 504         if met_id in metabolites_dict: | 
|  | 505             metabolites_to_add[metabolites_dict[met_id]] = -coeff | 
|  | 506 | 
|  | 507     for met_id, coeff in products.items(): | 
|  | 508         if met_id in metabolites_dict: | 
|  | 509             metabolites_to_add[metabolites_dict[met_id]] = coeff | 
|  | 510 | 
|  | 511     reaction.add_metabolites(metabolites_to_add) | 
|  | 512 | 
|  | 513 | 
|  | 514 def parse_metabolites_side(side_str: str) -> Dict[str, float]: | 
|  | 515     """Parse one side of a reaction and extract metabolites with coefficients.""" | 
|  | 516     metabolites = {} | 
|  | 517     if not side_str or side_str.strip() == '': | 
|  | 518         return metabolites | 
|  | 519 | 
|  | 520     terms = side_str.split('+') | 
|  | 521     for term in terms: | 
|  | 522         term = term.strip() | 
|  | 523         if not term: | 
|  | 524             continue | 
|  | 525 | 
|  | 526         # First check if term has space-separated coefficient and metabolite | 
|  | 527         parts = term.split() | 
|  | 528         if len(parts) == 2: | 
|  | 529             # Two parts: potential coefficient + metabolite | 
|  | 530             try: | 
|  | 531                 coeff = float(parts[0]) | 
|  | 532                 met_id = parts[1] | 
|  | 533                 # Verify the second part looks like a metabolite with compartment | 
|  | 534                 if re.match(r'[A-Za-z0-9_]+(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', met_id): | 
|  | 535                     metabolites[met_id] = coeff | 
|  | 536                     continue | 
|  | 537             except ValueError: | 
|  | 538                 pass | 
|  | 539 | 
|  | 540         # Single term - check if it's a metabolite (no coefficient) | 
|  | 541         # Updated pattern to include metabolites starting with numbers | 
|  | 542         if re.match(r'[A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', term): | 
|  | 543             metabolites[term] = 1.0 | 
|  | 544         else: | 
|  | 545             print(f"Warning: Could not parse metabolite term: '{term}'") | 
|  | 546 | 
|  | 547     return metabolites | 
|  | 548 | 
|  | 549 | 
|  | 550 | 
|  | 551 def set_objective_from_csv(model: cobra.Model, df: pd.DataFrame, obj_col: str = "ObjectiveCoefficient"): | 
|  | 552     """ | 
|  | 553     Sets the model's objective function based on a column of coefficients in the CSV. | 
|  | 554     Can be any reaction(s), not necessarily biomass. | 
|  | 555     """ | 
|  | 556     obj_dict = {} | 
|  | 557 | 
|  | 558     for idx, row in df.iterrows(): | 
|  | 559         reaction_id = str(row['ReactionID']).strip() | 
|  | 560         coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0 | 
|  | 561         if coeff != 0: | 
|  | 562             if reaction_id in model.reactions: | 
|  | 563                 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff | 
|  | 564             else: | 
|  | 565                 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.") | 
|  | 566 | 
|  | 567     if not obj_dict: | 
|  | 568         raise ValueError("No reactions found with non-zero objective coefficient.") | 
|  | 569 | 
|  | 570     model.objective = obj_dict | 
|  | 571     print(f"Objective set with {len(obj_dict)} reactions.") | 
|  | 572 | 
|  | 573 | 
|  | 574 | 
|  | 575 | 
|  | 576 def set_medium_from_data(model: cobraModel, df: pd.DataFrame): | 
|  | 577     """Set the medium based on the 'InMedium' column in the dataframe.""" | 
|  | 578     if 'InMedium' not in df.columns: | 
|  | 579         print("No 'InMedium' column found, skipping medium setup") | 
|  | 580         return | 
|  | 581 | 
|  | 582     medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() | 
|  | 583 | 
|  | 584     medium_dict = {} | 
|  | 585     for rxn_id in medium_reactions: | 
|  | 586         if rxn_id in [r.id for r in model.reactions]: | 
|  | 587             reaction = model.reactions.get_by_id(rxn_id) | 
|  | 588             if reaction.lower_bound < 0: | 
|  | 589                 medium_dict[rxn_id] = abs(reaction.lower_bound) | 
|  | 590 | 
|  | 591     if medium_dict: | 
|  | 592         model.medium = medium_dict | 
|  | 593         print(f"Medium set with {len(medium_dict)} components") | 
|  | 594     else: | 
|  | 595         print("No medium components found") | 
|  | 596 def validate_model(model: cobraModel) -> Dict[str, any]: | 
|  | 597     """Validate the model and return basic statistics.""" | 
|  | 598     validation = { | 
|  | 599         'num_reactions': len(model.reactions), | 
|  | 600         'num_metabolites': len(model.metabolites), | 
|  | 601         'num_genes': len(model.genes), | 
|  | 602         'num_compartments': len(model.compartments), | 
|  | 603         'objective': str(model.objective), | 
|  | 604         'medium_size': len(model.medium), | 
|  | 605         'reversible_reactions': len([r for r in model.reactions if r.reversibility]), | 
|  | 606         'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), | 
|  | 607     } | 
|  | 608 | 
|  | 609     try: | 
|  | 610         # Growth test | 
|  | 611         solution = model.optimize() | 
|  | 612         validation['growth_rate'] = solution.objective_value | 
|  | 613         validation['status'] = solution.status | 
|  | 614     except Exception as e: | 
|  | 615         validation['growth_rate'] = None | 
|  | 616         validation['status'] = f"Error: {e}" | 
|  | 617 | 
|  | 618     return validation | 
|  | 619 | 
|  | 620 def convert_genes(model, annotation): | 
|  | 621     """Rename genes using a selected annotation key in gene.notes; returns a model copy.""" | 
|  | 622     from cobra.manipulation import rename_genes | 
|  | 623     model2=model.copy() | 
|  | 624     try: | 
|  | 625         dict_genes={gene.id:gene.notes[annotation]  for gene in model2.genes} | 
|  | 626     except: | 
|  | 627         print("No annotation in gene dict!") | 
|  | 628         return -1 | 
|  | 629     rename_genes(model2,dict_genes) | 
|  | 630 | 
|  | 631     return model2 | 
|  | 632 | 
|  | 633 # ---------- Utility helpers ---------- | 
|  | 634 def _normalize_colname(col: str) -> str: | 
|  | 635     return col.strip().lower().replace(' ', '_') | 
|  | 636 | 
|  | 637 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]: | 
|  | 638     """ | 
|  | 639     Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}. | 
|  | 640     Raise ValueError if no suitable mapping is found. | 
|  | 641     """ | 
|  | 642     cols = { _normalize_colname(c): c for c in mapping_df.columns } | 
|  | 643     chosen = {} | 
|  | 644     # candidate names for each category | 
|  | 645     candidates = { | 
|  | 646         'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], | 
|  | 647         'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], | 
|  | 648         'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], | 
|  | 649         'entrez_id': ['entrez', 'entrez_id', 'entrezgene'], | 
|  | 650         'gene_number': ['gene_number'] | 
|  | 651     } | 
|  | 652     for key, names in candidates.items(): | 
|  | 653         for n in names: | 
|  | 654             if n in cols: | 
|  | 655                 chosen[key] = cols[n] | 
|  | 656                 break | 
|  | 657     return chosen | 
|  | 658 | 
|  | 659 def _validate_target_uniqueness(mapping_df: 'pd.DataFrame', | 
|  | 660                                 source_col: str, | 
|  | 661                                 target_col: str, | 
|  | 662                                 model_source_genes: Optional[Set[str]] = None, | 
|  | 663                                 logger: Optional[logging.Logger] = None) -> None: | 
|  | 664     """ | 
|  | 665         Check that, within the filtered mapping_df, each target maps to at most one source. | 
|  | 666         Log examples if duplicates are found. | 
|  | 667     """ | 
|  | 668     if logger is None: | 
|  | 669         logger = logging.getLogger(__name__) | 
|  | 670 | 
|  | 671     if mapping_df.empty: | 
|  | 672         logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.") | 
|  | 673         return | 
|  | 674 | 
|  | 675     # normalize temporary columns for grouping (without altering the original df) | 
|  | 676     tmp = mapping_df[[source_col, target_col]].copy() | 
|  | 677     tmp['_src_norm'] = tmp[source_col].astype(str).apply(_normalize_gene_id) | 
|  | 678     tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip() | 
|  | 679 | 
|  | 680     # optionally filter to the set of model source genes | 
|  | 681     if model_source_genes is not None: | 
|  | 682         tmp = tmp[tmp['_src_norm'].isin(model_source_genes)] | 
|  | 683 | 
|  | 684     if tmp.empty: | 
|  | 685         logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.") | 
|  | 686         return | 
|  | 687 | 
|  | 688     # build reverse mapping: target -> set(sources) | 
|  | 689     grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna())) | 
|  | 690     # find targets with more than one source | 
|  | 691     problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} | 
|  | 692 | 
|  | 693     if problematic: | 
|  | 694     # prepare warning message with examples (limited subset) | 
|  | 695         sample_items = list(problematic.items()) | 
|  | 696         msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] | 
|  | 697         for tgt, sources in sample_items: | 
|  | 698             msg_lines.append(f"  - target '{tgt}' <- sources: {', '.join(sources)}") | 
|  | 699         full_msg = "\n".join(msg_lines) | 
|  | 700     # log warning | 
|  | 701         logger.warning(full_msg) | 
|  | 702 | 
|  | 703     # if everything is fine | 
|  | 704     logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") | 
|  | 705 | 
|  | 706 | 
|  | 707 def _normalize_gene_id(g: str) -> str: | 
|  | 708     """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips).""" | 
|  | 709     if g is None: | 
|  | 710         return "" | 
|  | 711     g = str(g).strip() | 
|  | 712     # remove common prefixes | 
|  | 713     g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) | 
|  | 714     g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) | 
|  | 715     return g | 
|  | 716 | 
|  | 717 def _is_or_only_expression(expr: str) -> bool: | 
|  | 718     """ | 
|  | 719     Check if a GPR expression contains only OR operators (no AND operators). | 
|  | 720 | 
|  | 721     Args: | 
|  | 722         expr: GPR expression string | 
|  | 723 | 
|  | 724     Returns: | 
|  | 725         bool: True if expression contains only OR (and parentheses) and has multiple genes, False otherwise | 
|  | 726     """ | 
|  | 727     if not expr or not expr.strip(): | 
|  | 728         return False | 
|  | 729 | 
|  | 730     # Normalize the expression | 
|  | 731     normalized = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') | 
|  | 732 | 
|  | 733     # Check if it contains any AND operators | 
|  | 734     has_and = ' and ' in normalized.lower() | 
|  | 735 | 
|  | 736     # Check if it contains OR operators | 
|  | 737     has_or = ' or ' in normalized.lower() | 
|  | 738 | 
|  | 739     # Must have OR operators and no AND operators | 
|  | 740     return has_or and not has_and | 
|  | 741 | 
|  | 742 | 
|  | 743 def _flatten_or_only_gpr(expr: str) -> str: | 
|  | 744     """ | 
|  | 745     Flatten a GPR expression that contains only OR operators by: | 
|  | 746     1. Removing all parentheses | 
|  | 747     2. Extracting unique gene names | 
|  | 748     3. Joining them with ' or ' | 
|  | 749 | 
|  | 750     Args: | 
|  | 751         expr: GPR expression string with only OR operators | 
|  | 752 | 
|  | 753     Returns: | 
|  | 754         str: Flattened GPR expression | 
|  | 755     """ | 
|  | 756     if not expr or not expr.strip(): | 
|  | 757         return expr | 
|  | 758 | 
|  | 759     # Extract all gene tokens (exclude logical operators and parentheses) | 
|  | 760     gene_pattern = r'\b[A-Za-z0-9:_.-]+\b' | 
|  | 761     logical = {'and', 'or', 'AND', 'OR', '(', ')'} | 
|  | 762 | 
|  | 763     tokens = re.findall(gene_pattern, expr) | 
|  | 764     genes = [t for t in tokens if t not in logical] | 
|  | 765 | 
|  | 766     # Create set to remove duplicates, then convert back to list to maintain some order | 
|  | 767     unique_genes = list(dict.fromkeys(genes))  # Preserves insertion order | 
|  | 768 | 
|  | 769     if len(unique_genes) == 0: | 
|  | 770         return expr | 
|  | 771     elif len(unique_genes) == 1: | 
|  | 772         return unique_genes[0] | 
|  | 773     else: | 
|  | 774         return ' or '.join(unique_genes) | 
|  | 775 | 
|  | 776 | 
|  | 777 def _simplify_boolean_expression(expr: str) -> str: | 
|  | 778     """ | 
|  | 779     Simplify a boolean expression by removing duplicates while strictly preserving semantics. | 
|  | 780     This function handles simple duplicates within parentheses while being conservative about | 
|  | 781     complex expressions that could change semantics. | 
|  | 782     """ | 
|  | 783     if not expr or not expr.strip(): | 
|  | 784         return expr | 
|  | 785 | 
|  | 786     # Normalize operators and whitespace | 
|  | 787     expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') | 
|  | 788     expr = ' '.join(expr.split())  # Normalize whitespace | 
|  | 789 | 
|  | 790     def simplify_parentheses_content(match_obj): | 
|  | 791         """Helper function to simplify content within parentheses.""" | 
|  | 792         content = match_obj.group(1)  # Content inside parentheses | 
|  | 793 | 
|  | 794         # Only simplify if it's a pure OR or pure AND chain | 
|  | 795         if ' or ' in content and ' and ' not in content: | 
|  | 796             # Pure OR chain - safe to deduplicate | 
|  | 797             parts = [p.strip() for p in content.split(' or ') if p.strip()] | 
|  | 798             unique_parts = [] | 
|  | 799             seen = set() | 
|  | 800             for part in parts: | 
|  | 801                 if part not in seen: | 
|  | 802                     unique_parts.append(part) | 
|  | 803                     seen.add(part) | 
|  | 804 | 
|  | 805             if len(unique_parts) == 1: | 
|  | 806                 return unique_parts[0]  # Remove unnecessary parentheses for single items | 
|  | 807             else: | 
|  | 808                 return '(' + ' or '.join(unique_parts) + ')' | 
|  | 809 | 
|  | 810         elif ' and ' in content and ' or ' not in content: | 
|  | 811             # Pure AND chain - safe to deduplicate | 
|  | 812             parts = [p.strip() for p in content.split(' and ') if p.strip()] | 
|  | 813             unique_parts = [] | 
|  | 814             seen = set() | 
|  | 815             for part in parts: | 
|  | 816                 if part not in seen: | 
|  | 817                     unique_parts.append(part) | 
|  | 818                     seen.add(part) | 
|  | 819 | 
|  | 820             if len(unique_parts) == 1: | 
|  | 821                 return unique_parts[0]  # Remove unnecessary parentheses for single items | 
|  | 822             else: | 
|  | 823                 return '(' + ' and '.join(unique_parts) + ')' | 
|  | 824         else: | 
|  | 825             # Mixed operators or single item - return with parentheses as-is | 
|  | 826             return '(' + content + ')' | 
|  | 827 | 
|  | 828     def remove_duplicates_simple(parts_str: str, separator: str) -> str: | 
|  | 829         """Remove duplicates from a simple chain of operations.""" | 
|  | 830         parts = [p.strip() for p in parts_str.split(separator) if p.strip()] | 
|  | 831 | 
|  | 832         # Remove duplicates while preserving order | 
|  | 833         unique_parts = [] | 
|  | 834         seen = set() | 
|  | 835         for part in parts: | 
|  | 836             if part not in seen: | 
|  | 837                 unique_parts.append(part) | 
|  | 838                 seen.add(part) | 
|  | 839 | 
|  | 840         if len(unique_parts) == 1: | 
|  | 841             return unique_parts[0] | 
|  | 842         else: | 
|  | 843             return f' {separator} '.join(unique_parts) | 
|  | 844 | 
|  | 845     try: | 
|  | 846         import re | 
|  | 847 | 
|  | 848         # First, simplify content within parentheses | 
|  | 849         # This handles cases like (A or A) -> A and (B and B) -> B | 
|  | 850         expr_simplified = re.sub(r'\(([^()]+)\)', simplify_parentheses_content, expr) | 
|  | 851 | 
|  | 852         # Check if the resulting expression has mixed operators | 
|  | 853         has_and = ' and ' in expr_simplified | 
|  | 854         has_or = ' or ' in expr_simplified | 
|  | 855 | 
|  | 856         # Only simplify top-level if it's pure AND or pure OR | 
|  | 857         if has_and and not has_or and '(' not in expr_simplified: | 
|  | 858             # Pure AND chain at top level - safe to deduplicate | 
|  | 859             return remove_duplicates_simple(expr_simplified, 'and') | 
|  | 860         elif has_or and not has_and and '(' not in expr_simplified: | 
|  | 861             # Pure OR chain at top level - safe to deduplicate | 
|  | 862             return remove_duplicates_simple(expr_simplified, 'or') | 
|  | 863         else: | 
|  | 864             # Mixed operators or has parentheses - return the simplified version (with parentheses content cleaned) | 
|  | 865             return expr_simplified | 
|  | 866 | 
|  | 867     except Exception: | 
|  | 868         # If anything goes wrong, return the original expression | 
|  | 869         return expr | 
|  | 870 | 
|  | 871 | 
|  | 872 def translate_model_genes(model: 'cobra.Model', | 
|  | 873                          mapping_df: 'pd.DataFrame', | 
|  | 874                          target_nomenclature: str, | 
|  | 875                          source_nomenclature: str = 'hgnc_id', | 
|  | 876                          allow_many_to_one: bool = False, | 
|  | 877                          logger: Optional[logging.Logger] = None) -> Tuple['cobra.Model', Dict[str, str]]: | 
|  | 878     """ | 
|  | 879     Translate model genes from source_nomenclature to target_nomenclature using a mapping table. | 
|  | 880     mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez). | 
|  | 881 | 
|  | 882     Args: | 
|  | 883         model: COBRA model to translate. | 
|  | 884         mapping_df: DataFrame containing the mapping information. | 
|  | 885         target_nomenclature: Desired target key (e.g., 'hgnc_symbol'). | 
|  | 886         source_nomenclature: Current source key in the model (default 'hgnc_id'). | 
|  | 887         allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs. | 
|  | 888         logger: Optional logger. | 
|  | 889 | 
|  | 890     Returns: | 
|  | 891         Tuple containing: | 
|  | 892         - Translated COBRA model | 
|  | 893         - Dictionary mapping reaction IDs to translation issue descriptions | 
|  | 894     """ | 
|  | 895     if logger is None: | 
|  | 896         logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | 
|  | 897         logger = logging.getLogger(__name__) | 
|  | 898 | 
|  | 899     logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'") | 
|  | 900 | 
|  | 901     # normalize column names and choose relevant columns | 
|  | 902     chosen = _choose_columns(mapping_df) | 
|  | 903     if not chosen: | 
|  | 904         raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.") | 
|  | 905 | 
|  | 906     # map source/target to actual dataframe column names (allow user-specified source/target keys) | 
|  | 907     # normalize input args | 
|  | 908     src_key = source_nomenclature.strip().lower() | 
|  | 909     tgt_key = target_nomenclature.strip().lower() | 
|  | 910 | 
|  | 911     # try to find the actual column names for requested keys | 
|  | 912     col_for_src = None | 
|  | 913     col_for_tgt = None | 
|  | 914     # first, try exact match | 
|  | 915     for k, actual in chosen.items(): | 
|  | 916         if k == src_key: | 
|  | 917             col_for_src = actual | 
|  | 918         if k == tgt_key: | 
|  | 919             col_for_tgt = actual | 
|  | 920 | 
|  | 921     # if not found, try mapping common names | 
|  | 922     if col_for_src is None: | 
|  | 923         possible_src_names = {k: v for k, v in chosen.items()} | 
|  | 924         # try to match by contained substring | 
|  | 925         for k, actual in possible_src_names.items(): | 
|  | 926             if src_key in k: | 
|  | 927                 col_for_src = actual | 
|  | 928                 break | 
|  | 929 | 
|  | 930     if col_for_tgt is None: | 
|  | 931         for k, actual in chosen.items(): | 
|  | 932             if tgt_key in k: | 
|  | 933                 col_for_tgt = actual | 
|  | 934                 break | 
|  | 935 | 
|  | 936     if col_for_src is None: | 
|  | 937         raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.") | 
|  | 938     if col_for_tgt is None: | 
|  | 939         raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.") | 
|  | 940 | 
|  | 941     model_source_genes = { _normalize_gene_id(g.id) for g in model.genes } | 
|  | 942     logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).") | 
|  | 943 | 
|  | 944     tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() | 
|  | 945     tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).apply(_normalize_gene_id) | 
|  | 946 | 
|  | 947     filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() | 
|  | 948 | 
|  | 949     if filtered_map.empty: | 
|  | 950         logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") | 
|  | 951 | 
|  | 952     if not allow_many_to_one: | 
|  | 953         _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) | 
|  | 954 | 
|  | 955     # Crea il mapping | 
|  | 956     gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) | 
|  | 957 | 
|  | 958     # copy model | 
|  | 959     model_copy = model.copy() | 
|  | 960 | 
|  | 961     # statistics | 
|  | 962     stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0, 'flattened_or_gprs': 0} | 
|  | 963     unmapped = [] | 
|  | 964     multi = [] | 
|  | 965 | 
|  | 966     # Dictionary to store translation issues per reaction | 
|  | 967     reaction_translation_issues = {} | 
|  | 968 | 
|  | 969     original_genes = {g.id for g in model_copy.genes} | 
|  | 970     logger.info(f"Original genes count: {len(original_genes)}") | 
|  | 971 | 
|  | 972     # translate GPRs | 
|  | 973     for rxn in model_copy.reactions: | 
|  | 974         gpr = rxn.gene_reaction_rule | 
|  | 975         if gpr and gpr.strip(): | 
|  | 976             new_gpr, rxn_issues = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger, track_issues=True) | 
|  | 977             if rxn_issues: | 
|  | 978                 reaction_translation_issues[rxn.id] = rxn_issues | 
|  | 979 | 
|  | 980             if new_gpr != gpr: | 
|  | 981                 # Check if this GPR has translation issues and contains only OR operators | 
|  | 982                 if rxn_issues and _is_or_only_expression(new_gpr): | 
|  | 983                     # Flatten the GPR: remove parentheses and create set of unique genes | 
|  | 984                     flattened_gpr = _flatten_or_only_gpr(new_gpr) | 
|  | 985                     if flattened_gpr != new_gpr: | 
|  | 986                         stats['flattened_or_gprs'] += 1 | 
|  | 987                         logger.debug(f"Flattened OR-only GPR with issues for {rxn.id}: '{new_gpr}' -> '{flattened_gpr}'") | 
|  | 988                         new_gpr = flattened_gpr | 
|  | 989 | 
|  | 990                 simplified_gpr = _simplify_boolean_expression(new_gpr) | 
|  | 991                 if simplified_gpr != new_gpr: | 
|  | 992                     stats['simplified_gprs'] += 1 | 
|  | 993                     logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") | 
|  | 994                 rxn.gene_reaction_rule = simplified_gpr | 
|  | 995                 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'") | 
|  | 996 | 
|  | 997     # update model genes based on new GPRs | 
|  | 998     _update_model_genes(model_copy, logger) | 
|  | 999 | 
|  | 1000     # final logging | 
|  | 1001     _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger) | 
|  | 1002 | 
|  | 1003     logger.info("Translation finished") | 
|  | 1004     return model_copy, reaction_translation_issues | 
|  | 1005 | 
|  | 1006 | 
|  | 1007 # ---------- helper functions ---------- | 
|  | 1008 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]: | 
|  | 1009     """ | 
|  | 1010     Build mapping dict: source_id -> list of target_ids | 
|  | 1011     Normalizes IDs (removes prefixes like 'HGNC:' etc). | 
|  | 1012     """ | 
|  | 1013     df = mapping_df[[source_col, target_col]].dropna().copy() | 
|  | 1014     # normalize to string | 
|  | 1015     df[source_col] = df[source_col].astype(str).apply(_normalize_gene_id) | 
|  | 1016     df[target_col] = df[target_col].astype(str).str.strip() | 
|  | 1017 | 
|  | 1018     df = df.drop_duplicates() | 
|  | 1019 | 
|  | 1020     logger.info(f"Creating mapping from {len(df)} rows") | 
|  | 1021 | 
|  | 1022     mapping = defaultdict(list) | 
|  | 1023     for _, row in df.iterrows(): | 
|  | 1024         s = row[source_col] | 
|  | 1025         t = row[target_col] | 
|  | 1026         if t not in mapping[s]: | 
|  | 1027             mapping[s].append(t) | 
|  | 1028 | 
|  | 1029     # stats | 
|  | 1030     one_to_one = sum(1 for v in mapping.values() if len(v) == 1) | 
|  | 1031     one_to_many = sum(1 for v in mapping.values() if len(v) > 1) | 
|  | 1032     logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many") | 
|  | 1033     return dict(mapping) | 
|  | 1034 | 
|  | 1035 | 
|  | 1036 def _translate_gpr(gpr_string: str, | 
|  | 1037                    gene_mapping: Dict[str, List[str]], | 
|  | 1038                    stats: Dict[str, int], | 
|  | 1039                    unmapped_genes: List[str], | 
|  | 1040                    multi_mapping_genes: List[Tuple[str, List[str]]], | 
|  | 1041                    logger: logging.Logger, | 
|  | 1042                    track_issues: bool = False) -> Union[str, Tuple[str, str]]: | 
|  | 1043     """ | 
|  | 1044     Translate genes inside a GPR string using gene_mapping. | 
|  | 1045     Returns new GPR string, and optionally translation issues if track_issues=True. | 
|  | 1046     """ | 
|  | 1047     # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols) | 
|  | 1048     token_pattern = r'\b[A-Za-z0-9:_.-]+\b' | 
|  | 1049     tokens = re.findall(token_pattern, gpr_string) | 
|  | 1050 | 
|  | 1051     logical = {'and', 'or', 'AND', 'OR', '(', ')'} | 
|  | 1052     tokens = [t for t in tokens if t not in logical] | 
|  | 1053 | 
|  | 1054     new_gpr = gpr_string | 
|  | 1055     issues = [] | 
|  | 1056 | 
|  | 1057     for token in sorted(set(tokens), key=lambda x: -len(x)):  # longer tokens first to avoid partial replacement | 
|  | 1058         norm = _normalize_gene_id(token) | 
|  | 1059         if norm in gene_mapping: | 
|  | 1060             targets = gene_mapping[norm] | 
|  | 1061             stats['translated'] += 1 | 
|  | 1062             if len(targets) == 1: | 
|  | 1063                 stats['one_to_one'] += 1 | 
|  | 1064                 replacement = targets[0] | 
|  | 1065             else: | 
|  | 1066                 stats['one_to_many'] += 1 | 
|  | 1067                 multi_mapping_genes.append((token, targets)) | 
|  | 1068                 replacement = "(" + " or ".join(targets) + ")" | 
|  | 1069                 if track_issues: | 
|  | 1070                     issues.append(f"{token} -> {' or '.join(targets)}") | 
|  | 1071 | 
|  | 1072             pattern = r'\b' + re.escape(token) + r'\b' | 
|  | 1073             new_gpr = re.sub(pattern, replacement, new_gpr) | 
|  | 1074         else: | 
|  | 1075             stats['not_found'] += 1 | 
|  | 1076             if token not in unmapped_genes: | 
|  | 1077                 unmapped_genes.append(token) | 
|  | 1078             logger.debug(f"Token not found in mapping (left as-is): {token}") | 
|  | 1079 | 
|  | 1080     # Check for many-to-one cases (multiple source genes mapping to same target) | 
|  | 1081     if track_issues: | 
|  | 1082         # Build reverse mapping to detect many-to-one cases from original tokens | 
|  | 1083         original_to_target = {} | 
|  | 1084 | 
|  | 1085         for orig_token in tokens: | 
|  | 1086             norm = _normalize_gene_id(orig_token) | 
|  | 1087             if norm in gene_mapping: | 
|  | 1088                 targets = gene_mapping[norm] | 
|  | 1089                 for target in targets: | 
|  | 1090                     if target not in original_to_target: | 
|  | 1091                         original_to_target[target] = [] | 
|  | 1092                     if orig_token not in original_to_target[target]: | 
|  | 1093                         original_to_target[target].append(orig_token) | 
|  | 1094 | 
|  | 1095         # Identify many-to-one mappings in this specific GPR | 
|  | 1096         for target, original_genes in original_to_target.items(): | 
|  | 1097             if len(original_genes) > 1: | 
|  | 1098                 issues.append(f"{' or '.join(original_genes)} -> {target}") | 
|  | 1099 | 
|  | 1100     issue_text = "; ".join(issues) if issues else "" | 
|  | 1101 | 
|  | 1102     if track_issues: | 
|  | 1103         return new_gpr, issue_text | 
|  | 1104     else: | 
|  | 1105         return new_gpr | 
|  | 1106 | 
|  | 1107 | 
|  | 1108 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger): | 
|  | 1109     """ | 
|  | 1110     Rebuild model.genes from gene_reaction_rule content. | 
|  | 1111     Removes genes not referenced and adds missing ones. | 
|  | 1112     """ | 
|  | 1113     # collect genes in GPRs | 
|  | 1114     gene_pattern = r'\b[A-Za-z0-9:_.-]+\b' | 
|  | 1115     logical = {'and', 'or', 'AND', 'OR', '(', ')'} | 
|  | 1116     genes_in_gpr: Set[str] = set() | 
|  | 1117 | 
|  | 1118     for rxn in model.reactions: | 
|  | 1119         gpr = rxn.gene_reaction_rule | 
|  | 1120         if gpr and gpr.strip(): | 
|  | 1121             toks = re.findall(gene_pattern, gpr) | 
|  | 1122             toks = [t for t in toks if t not in logical] | 
|  | 1123             # normalize IDs consistent with mapping normalization | 
|  | 1124             toks = [_normalize_gene_id(t) for t in toks] | 
|  | 1125             genes_in_gpr.update(toks) | 
|  | 1126 | 
|  | 1127     # existing gene ids | 
|  | 1128     existing = {g.id for g in model.genes} | 
|  | 1129 | 
|  | 1130     # remove obsolete genes | 
|  | 1131     to_remove = [gid for gid in existing if gid not in genes_in_gpr] | 
|  | 1132     removed = 0 | 
|  | 1133     for gid in to_remove: | 
|  | 1134         try: | 
|  | 1135             gene_obj = model.genes.get_by_id(gid) | 
|  | 1136             model.genes.remove(gene_obj) | 
|  | 1137             removed += 1 | 
|  | 1138         except Exception: | 
|  | 1139             # safe-ignore | 
|  | 1140             pass | 
|  | 1141 | 
|  | 1142     # add new genes | 
|  | 1143     added = 0 | 
|  | 1144     for gid in genes_in_gpr: | 
|  | 1145         if gid not in existing: | 
|  | 1146             new_gene = cobra.Gene(gid) | 
|  | 1147             try: | 
|  | 1148                 model.genes.add(new_gene) | 
|  | 1149             except Exception: | 
|  | 1150                 # fallback: if model.genes doesn't support add, try append or model.add_genes | 
|  | 1151                 try: | 
|  | 1152                     model.genes.append(new_gene) | 
|  | 1153                 except Exception: | 
|  | 1154                     try: | 
|  | 1155                         model.add_genes([new_gene]) | 
|  | 1156                     except Exception: | 
|  | 1157                         logger.warning(f"Could not add gene object for {gid}") | 
|  | 1158             added += 1 | 
|  | 1159 | 
|  | 1160     logger.info(f"Model genes updated: removed {removed}, added {added}") | 
|  | 1161 | 
|  | 1162 | 
|  | 1163 def export_model_to_tabular(model: cobraModel, | 
|  | 1164                            output_path: str, | 
|  | 1165                            translation_issues: Dict = None, | 
|  | 1166                            include_objective: bool = True, | 
|  | 1167                            save_function = None) -> pd.DataFrame: | 
|  | 1168     """ | 
|  | 1169     Export a COBRA model to tabular format with optional components. | 
|  | 1170 | 
|  | 1171     Args: | 
|  | 1172         model: COBRA model to export | 
|  | 1173         output_path: Path where to save the tabular file | 
|  | 1174         translation_issues: Optional dict of {reaction_id: issues} from gene translation | 
|  | 1175         include_objective: Whether to include objective coefficient column | 
|  | 1176         save_function: Optional custom save function, if None uses pd.DataFrame.to_csv | 
|  | 1177 | 
|  | 1178     Returns: | 
|  | 1179         pd.DataFrame: The merged tabular data | 
|  | 1180     """ | 
|  | 1181     # Generate model data | 
|  | 1182     rules = generate_rules(model, asParsed=False) | 
|  | 1183 | 
|  | 1184     reactions = generate_reactions(model, asParsed=False) | 
|  | 1185     bounds = generate_bounds(model) | 
|  | 1186     medium = get_medium(model) | 
|  | 1187     compartments = generate_compartments(model) | 
|  | 1188 | 
|  | 1189     # Create base DataFrames | 
|  | 1190     df_rules = pd.DataFrame(list(rules.items()), columns=["ReactionID", "GPR"]) | 
|  | 1191     df_reactions = pd.DataFrame(list(reactions.items()), columns=["ReactionID", "Formula"]) | 
|  | 1192     df_bounds = bounds.reset_index().rename(columns={"index": "ReactionID"}) | 
|  | 1193     df_medium = medium.rename(columns={"reaction": "ReactionID"}) | 
|  | 1194     df_medium["InMedium"] = True | 
|  | 1195 | 
|  | 1196     # Start merging | 
|  | 1197     merged = df_reactions.merge(df_rules, on="ReactionID", how="outer") | 
|  | 1198     merged = merged.merge(df_bounds, on="ReactionID", how="outer") | 
|  | 1199 | 
|  | 1200     # Add objective coefficients if requested | 
|  | 1201     if include_objective: | 
|  | 1202         objective_function = extract_objective_coefficients(model) | 
|  | 1203         merged = merged.merge(objective_function, on="ReactionID", how="outer") | 
|  | 1204 | 
|  | 1205     # Add compartments/pathways if they exist | 
|  | 1206     if compartments is not None: | 
|  | 1207         merged = merged.merge(compartments, on="ReactionID", how="outer") | 
|  | 1208 | 
|  | 1209     # Add medium information | 
|  | 1210     merged = merged.merge(df_medium, on="ReactionID", how="left") | 
|  | 1211 | 
|  | 1212     # Add translation issues if provided | 
|  | 1213     if translation_issues: | 
|  | 1214         df_translation_issues = pd.DataFrame([ | 
|  | 1215             {"ReactionID": rxn_id, "TranslationIssues": issues} | 
|  | 1216             for rxn_id, issues in translation_issues.items() | 
|  | 1217         ]) | 
|  | 1218         if not df_translation_issues.empty: | 
|  | 1219             merged = merged.merge(df_translation_issues, on="ReactionID", how="left") | 
|  | 1220             merged["TranslationIssues"] = merged["TranslationIssues"].fillna("") | 
|  | 1221 | 
|  | 1222     # Final processing | 
|  | 1223     merged["InMedium"] = merged["InMedium"].fillna(False) | 
|  | 1224     merged = merged.sort_values(by="InMedium", ascending=False) | 
|  | 1225 | 
|  | 1226     # Save the file | 
|  | 1227     if save_function: | 
|  | 1228         save_function(merged, output_path) | 
|  | 1229     else: | 
|  | 1230         merged.to_csv(output_path, sep="\t", index=False) | 
|  | 1231 | 
|  | 1232     return merged | 
|  | 1233 | 
|  | 1234 | 
|  | 1235 def _log_translation_statistics(stats: Dict[str, int], | 
|  | 1236                                unmapped_genes: List[str], | 
|  | 1237                                multi_mapping_genes: List[Tuple[str, List[str]]], | 
|  | 1238                                original_genes: Set[str], | 
|  | 1239                                final_genes, | 
|  | 1240                                logger: logging.Logger): | 
|  | 1241     logger.info("=== TRANSLATION STATISTICS ===") | 
|  | 1242     logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})") | 
|  | 1243     logger.info(f"Not found tokens: {stats.get('not_found', 0)}") | 
|  | 1244     logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}") | 
|  | 1245     logger.info(f"Flattened OR-only GPRs with issues: {stats.get('flattened_or_gprs', 0)}") | 
|  | 1246 | 
|  | 1247     final_ids = {g.id for g in final_genes} | 
|  | 1248     logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}") | 
|  | 1249 | 
|  | 1250     if unmapped_genes: | 
|  | 1251         logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}") | 
|  | 1252     if multi_mapping_genes: | 
|  | 1253         logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):") | 
|  | 1254         for orig, targets in multi_mapping_genes[:10]: | 
|  | 1255             logger.info(f"  {orig} -> {', '.join(targets)}") | 
|  | 1256 | 
|  | 1257     # Log summary of flattened GPRs if any | 
|  | 1258     if stats.get('flattened_or_gprs', 0) > 0: | 
|  | 1259         logger.info(f"Flattened {stats['flattened_or_gprs']} OR-only GPRs that had translation issues (removed parentheses, created unique gene sets)") | 
|  | 1260 | 
|  | 1261 |