Mercurial > repos > bimib > cobraxy
comparison COBRAxy/src/utils/model_utils.py @ 539:2fb97466e404 draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Sat, 25 Oct 2025 14:55:13 +0000 |
| parents | |
| children | fcdbc81feb45 |
comparison
equal
deleted
inserted
replaced
| 538:fd53d42348bd | 539:2fb97466e404 |
|---|---|
| 1 """ | |
| 2 Utilities for generating and manipulating COBRA models and related metadata. | |
| 3 | |
| 4 This module includes helpers to: | |
| 5 - extract rules, reactions, bounds, objective coefficients, and compartments | |
| 6 - build a COBRA model from a tabular file | |
| 7 - set objective and medium from dataframes | |
| 8 - validate a model and convert gene identifiers | |
| 9 - translate model GPRs using mapping tables | |
| 10 """ | |
| 11 import os | |
| 12 import cobra | |
| 13 import pandas as pd | |
| 14 import re | |
| 15 import logging | |
| 16 from typing import Optional, Tuple, Union, List, Dict, Set | |
| 17 from collections import defaultdict | |
| 18 import utils.rule_parsing as rulesUtils | |
| 19 import utils.reaction_parsing as reactionUtils | |
| 20 from cobra import Model as cobraModel, Reaction, Metabolite | |
| 21 import sys | |
| 22 | |
| 23 | |
| 24 ############################ check_methods #################################### | |
| 25 def gene_type(l :str, name :str) -> str: | |
| 26 """ | |
| 27 Determine the type of gene ID. | |
| 28 | |
| 29 Args: | |
| 30 l (str): The gene identifier to check. | |
| 31 name (str): The name of the dataset, used in error messages. | |
| 32 | |
| 33 Returns: | |
| 34 str: The type of gene ID ('HGNC_ID', 'ENSG', 'HGNC_symbol', or 'entrez_id'). | |
| 35 | |
| 36 Raises: | |
| 37 sys.exit: If the gene ID type is not supported, the execution is aborted. | |
| 38 """ | |
| 39 if check_hgnc(l): | |
| 40 return 'HGNC_ID' | |
| 41 elif check_ensembl(l): | |
| 42 return 'ENSG' | |
| 43 elif check_symbol(l): | |
| 44 return 'HGNC_symbol' | |
| 45 elif check_entrez(l): | |
| 46 return 'entrez_id' | |
| 47 else: | |
| 48 sys.exit('Execution aborted:\n' + | |
| 49 'gene ID type in ' + name + ' not supported. Supported ID'+ | |
| 50 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n') | |
| 51 | |
| 52 def check_hgnc(l :str) -> bool: | |
| 53 """ | |
| 54 Check if a gene identifier follows the HGNC format. | |
| 55 | |
| 56 Args: | |
| 57 l (str): The gene identifier to check. | |
| 58 | |
| 59 Returns: | |
| 60 bool: True if the gene identifier follows the HGNC format, False otherwise. | |
| 61 """ | |
| 62 if len(l) > 5: | |
| 63 if (l.upper()).startswith('HGNC:'): | |
| 64 return l[5:].isdigit() | |
| 65 else: | |
| 66 return False | |
| 67 else: | |
| 68 return False | |
| 69 | |
| 70 def check_ensembl(l :str) -> bool: | |
| 71 """ | |
| 72 Check if a gene identifier follows the Ensembl format. | |
| 73 | |
| 74 Args: | |
| 75 l (str): The gene identifier to check. | |
| 76 | |
| 77 Returns: | |
| 78 bool: True if the gene identifier follows the Ensembl format, False otherwise. | |
| 79 """ | |
| 80 return l.upper().startswith('ENS') | |
| 81 | |
| 82 | |
| 83 def check_symbol(l :str) -> bool: | |
| 84 """ | |
| 85 Check if a gene identifier follows the symbol format. | |
| 86 | |
| 87 Args: | |
| 88 l (str): The gene identifier to check. | |
| 89 | |
| 90 Returns: | |
| 91 bool: True if the gene identifier follows the symbol format, False otherwise. | |
| 92 """ | |
| 93 if len(l) > 0: | |
| 94 if l[0].isalpha() and l[1:].isalnum(): | |
| 95 return True | |
| 96 else: | |
| 97 return False | |
| 98 else: | |
| 99 return False | |
| 100 | |
| 101 def check_entrez(l :str) -> bool: | |
| 102 """ | |
| 103 Check if a gene identifier follows the Entrez ID format. | |
| 104 | |
| 105 Args: | |
| 106 l (str): The gene identifier to check. | |
| 107 | |
| 108 Returns: | |
| 109 bool: True if the gene identifier follows the Entrez ID format, False otherwise. | |
| 110 """ | |
| 111 if len(l) > 0: | |
| 112 return l.isdigit() | |
| 113 else: | |
| 114 return False | |
| 115 | |
| 116 ################################- DATA GENERATION -################################ | |
| 117 ReactionId = str | |
| 118 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | |
| 119 """ | |
| 120 Generate a dictionary mapping reaction IDs to GPR rules from the model. | |
| 121 | |
| 122 Args: | |
| 123 model: COBRA model to derive data from. | |
| 124 asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings. | |
| 125 | |
| 126 Returns: | |
| 127 Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID. | |
| 128 Dict[ReactionId, str]: Raw rules by reaction ID. | |
| 129 """ | |
| 130 _ruleGetter = lambda reaction : reaction.gene_reaction_rule | |
| 131 ruleExtractor = (lambda reaction : | |
| 132 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter | |
| 133 | |
| 134 return { | |
| 135 reaction.id : ruleExtractor(reaction) | |
| 136 for reaction in model.reactions | |
| 137 if reaction.gene_reaction_rule } | |
| 138 | |
| 139 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: | |
| 140 """ | |
| 141 Generate a dictionary mapping reaction IDs to reaction formulas from the model. | |
| 142 | |
| 143 Args: | |
| 144 model: COBRA model to derive data from. | |
| 145 asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings. | |
| 146 | |
| 147 Returns: | |
| 148 Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested). | |
| 149 """ | |
| 150 | |
| 151 unparsedReactions = { | |
| 152 reaction.id : reaction.reaction | |
| 153 for reaction in model.reactions | |
| 154 if reaction.reaction | |
| 155 } | |
| 156 | |
| 157 if not asParsed: return unparsedReactions | |
| 158 | |
| 159 return reactionUtils.create_reaction_dict(unparsedReactions) | |
| 160 | |
| 161 def get_medium(model:cobraModel) -> pd.DataFrame: | |
| 162 """ | |
| 163 Extract the uptake reactions representing the model medium. | |
| 164 | |
| 165 Returns a DataFrame with a single column 'reaction' listing exchange reactions | |
| 166 with negative lower bound and no positive stoichiometric coefficients (uptake only). | |
| 167 """ | |
| 168 trueMedium=[] | |
| 169 for r in model.reactions: | |
| 170 positiveCoeff=0 | |
| 171 for m in r.metabolites: | |
| 172 if r.get_coefficient(m.id)>0: | |
| 173 positiveCoeff=1; | |
| 174 if (positiveCoeff==0 and r.lower_bound<0): | |
| 175 trueMedium.append(r.id) | |
| 176 | |
| 177 df_medium = pd.DataFrame() | |
| 178 df_medium["reaction"] = trueMedium | |
| 179 return df_medium | |
| 180 | |
| 181 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame: | |
| 182 """ | |
| 183 Extract objective coefficients for each reaction. | |
| 184 | |
| 185 Args: | |
| 186 model: COBRA model | |
| 187 | |
| 188 Returns: | |
| 189 pd.DataFrame with columns: ReactionID, ObjectiveCoefficient | |
| 190 """ | |
| 191 coeffs = [] | |
| 192 # model.objective.expression is a linear expression | |
| 193 objective_expr = model.objective.expression.as_coefficients_dict() | |
| 194 | |
| 195 for reaction in model.reactions: | |
| 196 coeff = objective_expr.get(reaction.forward_variable, 0.0) | |
| 197 coeffs.append({ | |
| 198 "ReactionID": reaction.id, | |
| 199 "ObjectiveCoefficient": coeff | |
| 200 }) | |
| 201 | |
| 202 return pd.DataFrame(coeffs) | |
| 203 | |
| 204 def generate_bounds(model:cobraModel) -> pd.DataFrame: | |
| 205 """ | |
| 206 Build a DataFrame of lower/upper bounds for all reactions. | |
| 207 | |
| 208 Returns: | |
| 209 pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound']. | |
| 210 """ | |
| 211 | |
| 212 rxns = [] | |
| 213 for reaction in model.reactions: | |
| 214 rxns.append(reaction.id) | |
| 215 | |
| 216 bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns) | |
| 217 | |
| 218 for reaction in model.reactions: | |
| 219 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] | |
| 220 return bounds | |
| 221 | |
| 222 | |
| 223 | |
| 224 def generate_compartments(model: cobraModel) -> pd.DataFrame: | |
| 225 """ | |
| 226 Generates a DataFrame containing pathway information for each reaction. | |
| 227 Creates columns for each pathway position (Pathway_1, Pathway_2, etc.) only if pathways exist. | |
| 228 | |
| 229 Args: | |
| 230 model: the COBRA model to extract pathway data from. | |
| 231 | |
| 232 Returns: | |
| 233 pd.DataFrame: DataFrame with ReactionID and pathway columns (if any pathways exist) | |
| 234 """ | |
| 235 pathway_data = [] | |
| 236 | |
| 237 # First pass: determine the maximum number of pathways any reaction has | |
| 238 max_pathways = 0 | |
| 239 reaction_pathways = {} | |
| 240 has_any_pathways = False | |
| 241 | |
| 242 for reaction in model.reactions: | |
| 243 # Get unique pathways from all metabolites in the reaction | |
| 244 if 'pathways' in reaction.annotation and reaction.annotation['pathways']: | |
| 245 if type(reaction.annotation['pathways']) == list: | |
| 246 # Filter out empty/None values | |
| 247 valid_pathways = [p for p in reaction.annotation['pathways'] if p] | |
| 248 if valid_pathways: | |
| 249 reaction_pathways[reaction.id] = valid_pathways | |
| 250 max_pathways = max(max_pathways, len(valid_pathways)) | |
| 251 has_any_pathways = True | |
| 252 else: | |
| 253 reaction_pathways[reaction.id] = [] | |
| 254 else: | |
| 255 # Single pathway value | |
| 256 if reaction.annotation['pathways']: | |
| 257 reaction_pathways[reaction.id] = [reaction.annotation['pathways']] | |
| 258 max_pathways = max(max_pathways, 1) | |
| 259 has_any_pathways = True | |
| 260 else: | |
| 261 reaction_pathways[reaction.id] = [] | |
| 262 else: | |
| 263 # No pathway annotation - use empty list | |
| 264 reaction_pathways[reaction.id] = [] | |
| 265 | |
| 266 # If no pathways exist in the model, return DataFrame with only ReactionID | |
| 267 if not has_any_pathways: | |
| 268 return None | |
| 269 | |
| 270 # Create column names for pathways only if they exist | |
| 271 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)] | |
| 272 | |
| 273 # Second pass: create the data with pathway columns | |
| 274 for reaction_id, pathways in reaction_pathways.items(): | |
| 275 row = {"ReactionID": reaction_id} | |
| 276 | |
| 277 # Fill pathway columns | |
| 278 for i in range(max_pathways): | |
| 279 col_name = pathway_columns[i] | |
| 280 if i < len(pathways): | |
| 281 row[col_name] = pathways[i] | |
| 282 else: | |
| 283 row[col_name] = None | |
| 284 | |
| 285 pathway_data.append(row) | |
| 286 | |
| 287 return pd.DataFrame(pathway_data) | |
| 288 | |
| 289 def set_annotation_pathways_from_data(model: cobraModel, df: pd.DataFrame): | |
| 290 """Set reaction pathways based on 'Pathway_1', 'Pathway_2', ... columns in the dataframe.""" | |
| 291 pathway_cols = [col for col in df.columns if col.startswith('Pathway_')] | |
| 292 if not pathway_cols: | |
| 293 print("No 'Pathway_' columns found, skipping pathway annotation") | |
| 294 return | |
| 295 | |
| 296 pathway_data = defaultdict(list) | |
| 297 | |
| 298 for idx, row in df.iterrows(): | |
| 299 reaction_id = str(row['ReactionID']).strip() | |
| 300 if reaction_id not in model.reactions: | |
| 301 continue | |
| 302 | |
| 303 pathways = [] | |
| 304 for col in pathway_cols: | |
| 305 if pd.notna(row[col]) and str(row[col]).strip(): | |
| 306 pathways.append(str(row[col]).strip()) | |
| 307 | |
| 308 if pathways: | |
| 309 | |
| 310 reaction = model.reactions.get_by_id(reaction_id) | |
| 311 if len(pathways) == 1: | |
| 312 reaction.annotation['pathways'] = pathways[0] | |
| 313 else: | |
| 314 reaction.annotation['pathways'] = pathways | |
| 315 | |
| 316 pathway_data[reaction_id] = pathways | |
| 317 | |
| 318 print(f"Annotated {len(pathway_data)} reactions with pathways.") | |
| 319 | |
| 320 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: | |
| 321 """ | |
| 322 Build a COBRApy model from a tabular file with reaction data. | |
| 323 | |
| 324 Args: | |
| 325 csv_path: Path to the tab-separated file. | |
| 326 model_id: ID for the newly created model. | |
| 327 | |
| 328 Returns: | |
| 329 cobra.Model: The constructed COBRApy model. | |
| 330 """ | |
| 331 | |
| 332 # Try to detect separator | |
| 333 with open(csv_path, 'r') as f: | |
| 334 first_line = f.readline() | |
| 335 sep = '\t' if '\t' in first_line else ',' | |
| 336 | |
| 337 df = pd.read_csv(csv_path, sep=sep) | |
| 338 | |
| 339 # Check required columns | |
| 340 required_cols = ['ReactionID', 'Formula'] | |
| 341 missing_cols = [col for col in required_cols if col not in df.columns] | |
| 342 if missing_cols: | |
| 343 raise ValueError(f"Missing required columns: {missing_cols}. Available columns: {list(df.columns)}") | |
| 344 | |
| 345 model = cobraModel(model_id) | |
| 346 | |
| 347 metabolites_dict = {} | |
| 348 compartments_dict = {} | |
| 349 | |
| 350 print(f"Building model from {len(df)} reactions...") | |
| 351 | |
| 352 for idx, row in df.iterrows(): | |
| 353 reaction_formula = str(row['Formula']).strip() | |
| 354 if not reaction_formula or reaction_formula == 'nan': | |
| 355 continue | |
| 356 | |
| 357 metabolites = extract_metabolites_from_reaction(reaction_formula) | |
| 358 | |
| 359 for met_id in metabolites: | |
| 360 compartment = extract_compartment_from_metabolite(met_id) | |
| 361 | |
| 362 if compartment not in compartments_dict: | |
| 363 compartments_dict[compartment] = compartment | |
| 364 | |
| 365 if met_id not in metabolites_dict: | |
| 366 metabolites_dict[met_id] = Metabolite( | |
| 367 id=met_id, | |
| 368 compartment=compartment, | |
| 369 name=met_id.replace(f"_{compartment}", "").replace("__", "_") | |
| 370 ) | |
| 371 | |
| 372 model.compartments = compartments_dict | |
| 373 | |
| 374 model.add_metabolites(list(metabolites_dict.values())) | |
| 375 | |
| 376 print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments") | |
| 377 | |
| 378 reactions_added = 0 | |
| 379 reactions_skipped = 0 | |
| 380 | |
| 381 for idx, row in df.iterrows(): | |
| 382 | |
| 383 reaction_id = str(row['ReactionID']).strip() | |
| 384 reaction_formula = str(row['Formula']).strip() | |
| 385 | |
| 386 if not reaction_formula or reaction_formula == 'nan': | |
| 387 raise ValueError(f"Missing reaction formula for {reaction_id}") | |
| 388 | |
| 389 reaction = Reaction(reaction_id) | |
| 390 reaction.name = reaction_id | |
| 391 | |
| 392 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 | |
| 393 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 | |
| 394 | |
| 395 if pd.notna(row['GPR']) and str(row['GPR']).strip(): | |
| 396 reaction.gene_reaction_rule = str(row['GPR']).strip() | |
| 397 | |
| 398 try: | |
| 399 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) | |
| 400 except Exception as e: | |
| 401 print(f"Error parsing reaction {reaction_id}: {e}") | |
| 402 reactions_skipped += 1 | |
| 403 continue | |
| 404 | |
| 405 model.add_reactions([reaction]) | |
| 406 reactions_added += 1 | |
| 407 | |
| 408 | |
| 409 print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions") | |
| 410 | |
| 411 # set objective function | |
| 412 set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient") | |
| 413 | |
| 414 set_medium_from_data(model, df) | |
| 415 | |
| 416 set_annotation_pathways_from_data(model, df) | |
| 417 | |
| 418 print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites") | |
| 419 | |
| 420 return model | |
| 421 | |
| 422 | |
| 423 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) | |
| 424 #def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: | |
| 425 # """ | |
| 426 # Extract metabolite IDs from a reaction formula. | |
| 427 # Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e), | |
| 428 # allowing leading digits/underscores. | |
| 429 # """ | |
| 430 # metabolites = set() | |
| 431 # # optional coefficient followed by a token ending with _<letters> | |
| 432 # if reaction_formula[-1] == ']' and reaction_formula[-3] == '[': | |
| 433 # pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+[[A-Za-z0-9]]+)' | |
| 434 # else: | |
| 435 # pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[A-Za-z0-9]+)' | |
| 436 # matches = re.findall(pattern, reaction_formula) | |
| 437 # metabolites.update(matches) | |
| 438 # return metabolites | |
| 439 | |
| 440 | |
| 441 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: | |
| 442 """ | |
| 443 Extract metabolite IDs from a reaction formula. | |
| 444 | |
| 445 Handles: | |
| 446 - optional stoichiometric coefficients (integers or decimals) | |
| 447 - compartment tags at the end of the metabolite, either [c] or _c | |
| 448 | |
| 449 Returns the IDs including the compartment suffix exactly as written. | |
| 450 """ | |
| 451 pattern = re.compile( | |
| 452 r'(?:^|(?<=\s)|(?<=\+)|(?<=,)|(?<==)|(?<=:))' # left boundary (start, space, +, comma, =, :) | |
| 453 r'(?:\d+(?:\.\d+)?\s+)?' # optional coefficient (requires space after) | |
| 454 r'([A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+))' # metabolite + compartment (can start with number) | |
| 455 ) | |
| 456 return {m.group(1) for m in pattern.finditer(reaction_formula)} | |
| 457 | |
| 458 | |
| 459 | |
| 460 def extract_compartment_from_metabolite(metabolite_id: str) -> str: | |
| 461 """Extract the compartment from a metabolite ID.""" | |
| 462 if '_' == metabolite_id[-2]: | |
| 463 return metabolite_id.split('_')[-1] | |
| 464 if metabolite_id[-1] == ']' and metabolite_id[-3] == '[': | |
| 465 return metabolite_id[-2] | |
| 466 return 'c' # default cytoplasm | |
| 467 | |
| 468 | |
| 469 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): | |
| 470 """Parse a reaction formula and set metabolites with their coefficients.""" | |
| 471 | |
| 472 if '<=>' in formula: | |
| 473 parts = formula.split('<=>') | |
| 474 reversible = True | |
| 475 elif '<--' in formula: | |
| 476 parts = formula.split('<--') | |
| 477 reversible = False | |
| 478 elif '-->' in formula: | |
| 479 parts = formula.split('-->') | |
| 480 reversible = False | |
| 481 elif '<-' in formula: | |
| 482 parts = formula.split('<-') | |
| 483 reversible = False | |
| 484 else: | |
| 485 raise ValueError(f"Unrecognized reaction format: {formula}") | |
| 486 | |
| 487 # Handle cases where one side might be empty (exchange reactions) | |
| 488 if len(parts) != 2: | |
| 489 raise ValueError(f"Invalid reaction format, expected 2 parts: {formula}") | |
| 490 | |
| 491 left, right = parts[0].strip(), parts[1].strip() | |
| 492 | |
| 493 reactants = parse_metabolites_side(left) if left else {} | |
| 494 products = parse_metabolites_side(right) if right else {} | |
| 495 | |
| 496 metabolites_to_add = {} | |
| 497 | |
| 498 for met_id, coeff in reactants.items(): | |
| 499 if met_id in metabolites_dict: | |
| 500 metabolites_to_add[metabolites_dict[met_id]] = -coeff | |
| 501 | |
| 502 for met_id, coeff in products.items(): | |
| 503 if met_id in metabolites_dict: | |
| 504 metabolites_to_add[metabolites_dict[met_id]] = coeff | |
| 505 | |
| 506 reaction.add_metabolites(metabolites_to_add) | |
| 507 | |
| 508 | |
| 509 def parse_metabolites_side(side_str: str) -> Dict[str, float]: | |
| 510 """Parse one side of a reaction and extract metabolites with coefficients.""" | |
| 511 metabolites = {} | |
| 512 if not side_str or side_str.strip() == '': | |
| 513 return metabolites | |
| 514 | |
| 515 terms = side_str.split('+') | |
| 516 for term in terms: | |
| 517 term = term.strip() | |
| 518 if not term: | |
| 519 continue | |
| 520 | |
| 521 # First check if term has space-separated coefficient and metabolite | |
| 522 parts = term.split() | |
| 523 if len(parts) == 2: | |
| 524 # Two parts: potential coefficient + metabolite | |
| 525 try: | |
| 526 coeff = float(parts[0]) | |
| 527 met_id = parts[1] | |
| 528 # Verify the second part looks like a metabolite with compartment | |
| 529 if re.match(r'[A-Za-z0-9_]+(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', met_id): | |
| 530 metabolites[met_id] = coeff | |
| 531 continue | |
| 532 except ValueError: | |
| 533 pass | |
| 534 | |
| 535 # Single term - check if it's a metabolite (no coefficient) | |
| 536 # Updated pattern to include metabolites starting with numbers | |
| 537 if re.match(r'[A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', term): | |
| 538 metabolites[term] = 1.0 | |
| 539 else: | |
| 540 print(f"Warning: Could not parse metabolite term: '{term}'") | |
| 541 | |
| 542 return metabolites | |
| 543 | |
| 544 | |
| 545 | |
| 546 def set_objective_from_csv(model: cobra.Model, df: pd.DataFrame, obj_col: str = "ObjectiveCoefficient"): | |
| 547 """ | |
| 548 Sets the model's objective function based on a column of coefficients in the CSV. | |
| 549 Can be any reaction(s), not necessarily biomass. | |
| 550 """ | |
| 551 obj_dict = {} | |
| 552 | |
| 553 for idx, row in df.iterrows(): | |
| 554 reaction_id = str(row['ReactionID']).strip() | |
| 555 coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0 | |
| 556 if coeff != 0: | |
| 557 if reaction_id in model.reactions: | |
| 558 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff | |
| 559 else: | |
| 560 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.") | |
| 561 | |
| 562 if not obj_dict: | |
| 563 raise ValueError("No reactions found with non-zero objective coefficient.") | |
| 564 | |
| 565 model.objective = obj_dict | |
| 566 print(f"Objective set with {len(obj_dict)} reactions.") | |
| 567 | |
| 568 | |
| 569 | |
| 570 | |
| 571 def set_medium_from_data(model: cobraModel, df: pd.DataFrame): | |
| 572 """Set the medium based on the 'InMedium' column in the dataframe.""" | |
| 573 if 'InMedium' not in df.columns: | |
| 574 print("No 'InMedium' column found, skipping medium setup") | |
| 575 return | |
| 576 | |
| 577 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() | |
| 578 | |
| 579 medium_dict = {} | |
| 580 for rxn_id in medium_reactions: | |
| 581 if rxn_id in [r.id for r in model.reactions]: | |
| 582 reaction = model.reactions.get_by_id(rxn_id) | |
| 583 if reaction.lower_bound < 0: | |
| 584 medium_dict[rxn_id] = abs(reaction.lower_bound) | |
| 585 | |
| 586 if medium_dict: | |
| 587 model.medium = medium_dict | |
| 588 print(f"Medium set with {len(medium_dict)} components") | |
| 589 else: | |
| 590 print("No medium components found") | |
| 591 def validate_model(model: cobraModel) -> Dict[str, any]: | |
| 592 """Validate the model and return basic statistics.""" | |
| 593 validation = { | |
| 594 'num_reactions': len(model.reactions), | |
| 595 'num_metabolites': len(model.metabolites), | |
| 596 'num_genes': len(model.genes), | |
| 597 'num_compartments': len(model.compartments), | |
| 598 'objective': str(model.objective), | |
| 599 'medium_size': len(model.medium), | |
| 600 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), | |
| 601 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), | |
| 602 } | |
| 603 | |
| 604 try: | |
| 605 # Growth test | |
| 606 solution = model.optimize() | |
| 607 validation['growth_rate'] = solution.objective_value | |
| 608 validation['status'] = solution.status | |
| 609 except Exception as e: | |
| 610 validation['growth_rate'] = None | |
| 611 validation['status'] = f"Error: {e}" | |
| 612 | |
| 613 return validation | |
| 614 | |
| 615 def convert_genes(model, annotation): | |
| 616 """Rename genes using a selected annotation key in gene.notes; returns a model copy.""" | |
| 617 from cobra.manipulation import rename_genes | |
| 618 model2=model.copy() | |
| 619 try: | |
| 620 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} | |
| 621 except: | |
| 622 print("No annotation in gene dict!") | |
| 623 return -1 | |
| 624 rename_genes(model2,dict_genes) | |
| 625 | |
| 626 return model2 | |
| 627 | |
| 628 # ---------- Utility helpers ---------- | |
| 629 def _normalize_colname(col: str) -> str: | |
| 630 return col.strip().lower().replace(' ', '_') | |
| 631 | |
| 632 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]: | |
| 633 """ | |
| 634 Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}. | |
| 635 Raise ValueError if no suitable mapping is found. | |
| 636 """ | |
| 637 cols = { _normalize_colname(c): c for c in mapping_df.columns } | |
| 638 chosen = {} | |
| 639 # candidate names for each category | |
| 640 candidates = { | |
| 641 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], | |
| 642 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], | |
| 643 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], | |
| 644 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'], | |
| 645 'gene_number': ['gene_number'] | |
| 646 } | |
| 647 for key, names in candidates.items(): | |
| 648 for n in names: | |
| 649 if n in cols: | |
| 650 chosen[key] = cols[n] | |
| 651 break | |
| 652 return chosen | |
| 653 | |
| 654 def _validate_target_uniqueness(mapping_df: 'pd.DataFrame', | |
| 655 source_col: str, | |
| 656 target_col: str, | |
| 657 model_source_genes: Optional[Set[str]] = None, | |
| 658 logger: Optional[logging.Logger] = None) -> None: | |
| 659 """ | |
| 660 Check that, within the filtered mapping_df, each target maps to at most one source. | |
| 661 Log examples if duplicates are found. | |
| 662 """ | |
| 663 if logger is None: | |
| 664 logger = logging.getLogger(__name__) | |
| 665 | |
| 666 if mapping_df.empty: | |
| 667 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.") | |
| 668 return | |
| 669 | |
| 670 # normalize temporary columns for grouping (without altering the original df) | |
| 671 tmp = mapping_df[[source_col, target_col]].copy() | |
| 672 tmp['_src_norm'] = tmp[source_col].astype(str).apply(_normalize_gene_id) | |
| 673 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip() | |
| 674 | |
| 675 # optionally filter to the set of model source genes | |
| 676 if model_source_genes is not None: | |
| 677 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)] | |
| 678 | |
| 679 if tmp.empty: | |
| 680 logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.") | |
| 681 return | |
| 682 | |
| 683 # build reverse mapping: target -> set(sources) | |
| 684 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna())) | |
| 685 # find targets with more than one source | |
| 686 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} | |
| 687 | |
| 688 if problematic: | |
| 689 # prepare warning message with examples (limited subset) | |
| 690 sample_items = list(problematic.items()) | |
| 691 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] | |
| 692 for tgt, sources in sample_items: | |
| 693 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") | |
| 694 full_msg = "\n".join(msg_lines) | |
| 695 # log warning | |
| 696 logger.warning(full_msg) | |
| 697 | |
| 698 # if everything is fine | |
| 699 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") | |
| 700 | |
| 701 | |
| 702 def _normalize_gene_id(g: str) -> str: | |
| 703 """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips).""" | |
| 704 if g is None: | |
| 705 return "" | |
| 706 g = str(g).strip() | |
| 707 # remove common prefixes | |
| 708 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) | |
| 709 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) | |
| 710 return g | |
| 711 | |
| 712 def _is_or_only_expression(expr: str) -> bool: | |
| 713 """ | |
| 714 Check if a GPR expression contains only OR operators (no AND operators). | |
| 715 | |
| 716 Args: | |
| 717 expr: GPR expression string | |
| 718 | |
| 719 Returns: | |
| 720 bool: True if expression contains only OR (and parentheses) and has multiple genes, False otherwise | |
| 721 """ | |
| 722 if not expr or not expr.strip(): | |
| 723 return False | |
| 724 | |
| 725 # Normalize the expression | |
| 726 normalized = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') | |
| 727 | |
| 728 # Check if it contains any AND operators | |
| 729 has_and = ' and ' in normalized.lower() | |
| 730 | |
| 731 # Check if it contains OR operators | |
| 732 has_or = ' or ' in normalized.lower() | |
| 733 | |
| 734 # Must have OR operators and no AND operators | |
| 735 return has_or and not has_and | |
| 736 | |
| 737 | |
| 738 def _flatten_or_only_gpr(expr: str) -> str: | |
| 739 """ | |
| 740 Flatten a GPR expression that contains only OR operators by: | |
| 741 1. Removing all parentheses | |
| 742 2. Extracting unique gene names | |
| 743 3. Joining them with ' or ' | |
| 744 | |
| 745 Args: | |
| 746 expr: GPR expression string with only OR operators | |
| 747 | |
| 748 Returns: | |
| 749 str: Flattened GPR expression | |
| 750 """ | |
| 751 if not expr or not expr.strip(): | |
| 752 return expr | |
| 753 | |
| 754 # Extract all gene tokens (exclude logical operators and parentheses) | |
| 755 gene_pattern = r'\b[A-Za-z0-9:_.-]+\b' | |
| 756 logical = {'and', 'or', 'AND', 'OR', '(', ')'} | |
| 757 | |
| 758 tokens = re.findall(gene_pattern, expr) | |
| 759 genes = [t for t in tokens if t not in logical] | |
| 760 | |
| 761 # Create set to remove duplicates, then convert back to list to maintain some order | |
| 762 unique_genes = list(dict.fromkeys(genes)) # Preserves insertion order | |
| 763 | |
| 764 if len(unique_genes) == 0: | |
| 765 return expr | |
| 766 elif len(unique_genes) == 1: | |
| 767 return unique_genes[0] | |
| 768 else: | |
| 769 return ' or '.join(unique_genes) | |
| 770 | |
| 771 | |
| 772 def _simplify_boolean_expression(expr: str) -> str: | |
| 773 """ | |
| 774 Simplify a boolean expression by removing duplicates while strictly preserving semantics. | |
| 775 This function handles simple duplicates within parentheses while being conservative about | |
| 776 complex expressions that could change semantics. | |
| 777 """ | |
| 778 if not expr or not expr.strip(): | |
| 779 return expr | |
| 780 | |
| 781 # Normalize operators and whitespace | |
| 782 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') | |
| 783 expr = ' '.join(expr.split()) # Normalize whitespace | |
| 784 | |
| 785 def simplify_parentheses_content(match_obj): | |
| 786 """Helper function to simplify content within parentheses.""" | |
| 787 content = match_obj.group(1) # Content inside parentheses | |
| 788 | |
| 789 # Only simplify if it's a pure OR or pure AND chain | |
| 790 if ' or ' in content and ' and ' not in content: | |
| 791 # Pure OR chain - safe to deduplicate | |
| 792 parts = [p.strip() for p in content.split(' or ') if p.strip()] | |
| 793 unique_parts = [] | |
| 794 seen = set() | |
| 795 for part in parts: | |
| 796 if part not in seen: | |
| 797 unique_parts.append(part) | |
| 798 seen.add(part) | |
| 799 | |
| 800 if len(unique_parts) == 1: | |
| 801 return unique_parts[0] # Remove unnecessary parentheses for single items | |
| 802 else: | |
| 803 return '(' + ' or '.join(unique_parts) + ')' | |
| 804 | |
| 805 elif ' and ' in content and ' or ' not in content: | |
| 806 # Pure AND chain - safe to deduplicate | |
| 807 parts = [p.strip() for p in content.split(' and ') if p.strip()] | |
| 808 unique_parts = [] | |
| 809 seen = set() | |
| 810 for part in parts: | |
| 811 if part not in seen: | |
| 812 unique_parts.append(part) | |
| 813 seen.add(part) | |
| 814 | |
| 815 if len(unique_parts) == 1: | |
| 816 return unique_parts[0] # Remove unnecessary parentheses for single items | |
| 817 else: | |
| 818 return '(' + ' and '.join(unique_parts) + ')' | |
| 819 else: | |
| 820 # Mixed operators or single item - return with parentheses as-is | |
| 821 return '(' + content + ')' | |
| 822 | |
| 823 def remove_duplicates_simple(parts_str: str, separator: str) -> str: | |
| 824 """Remove duplicates from a simple chain of operations.""" | |
| 825 parts = [p.strip() for p in parts_str.split(separator) if p.strip()] | |
| 826 | |
| 827 # Remove duplicates while preserving order | |
| 828 unique_parts = [] | |
| 829 seen = set() | |
| 830 for part in parts: | |
| 831 if part not in seen: | |
| 832 unique_parts.append(part) | |
| 833 seen.add(part) | |
| 834 | |
| 835 if len(unique_parts) == 1: | |
| 836 return unique_parts[0] | |
| 837 else: | |
| 838 return f' {separator} '.join(unique_parts) | |
| 839 | |
| 840 try: | |
| 841 import re | |
| 842 | |
| 843 # First, simplify content within parentheses | |
| 844 # This handles cases like (A or A) -> A and (B and B) -> B | |
| 845 expr_simplified = re.sub(r'\(([^()]+)\)', simplify_parentheses_content, expr) | |
| 846 | |
| 847 # Check if the resulting expression has mixed operators | |
| 848 has_and = ' and ' in expr_simplified | |
| 849 has_or = ' or ' in expr_simplified | |
| 850 | |
| 851 # Only simplify top-level if it's pure AND or pure OR | |
| 852 if has_and and not has_or and '(' not in expr_simplified: | |
| 853 # Pure AND chain at top level - safe to deduplicate | |
| 854 return remove_duplicates_simple(expr_simplified, 'and') | |
| 855 elif has_or and not has_and and '(' not in expr_simplified: | |
| 856 # Pure OR chain at top level - safe to deduplicate | |
| 857 return remove_duplicates_simple(expr_simplified, 'or') | |
| 858 else: | |
| 859 # Mixed operators or has parentheses - return the simplified version (with parentheses content cleaned) | |
| 860 return expr_simplified | |
| 861 | |
| 862 except Exception: | |
| 863 # If anything goes wrong, return the original expression | |
| 864 return expr | |
| 865 | |
| 866 | |
| 867 def translate_model_genes(model: 'cobra.Model', | |
| 868 mapping_df: 'pd.DataFrame', | |
| 869 target_nomenclature: str, | |
| 870 source_nomenclature: str = 'hgnc_id', | |
| 871 allow_many_to_one: bool = False, | |
| 872 logger: Optional[logging.Logger] = None) -> Tuple['cobra.Model', Dict[str, str]]: | |
| 873 """ | |
| 874 Translate model genes from source_nomenclature to target_nomenclature using a mapping table. | |
| 875 mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez). | |
| 876 | |
| 877 Args: | |
| 878 model: COBRA model to translate. | |
| 879 mapping_df: DataFrame containing the mapping information. | |
| 880 target_nomenclature: Desired target key (e.g., 'hgnc_symbol'). | |
| 881 source_nomenclature: Current source key in the model (default 'hgnc_id'). | |
| 882 allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs. | |
| 883 logger: Optional logger. | |
| 884 | |
| 885 Returns: | |
| 886 Tuple containing: | |
| 887 - Translated COBRA model | |
| 888 - Dictionary mapping reaction IDs to translation issue descriptions | |
| 889 """ | |
| 890 if logger is None: | |
| 891 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| 892 logger = logging.getLogger(__name__) | |
| 893 | |
| 894 logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'") | |
| 895 | |
| 896 # normalize column names and choose relevant columns | |
| 897 chosen = _choose_columns(mapping_df) | |
| 898 if not chosen: | |
| 899 raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.") | |
| 900 | |
| 901 # map source/target to actual dataframe column names (allow user-specified source/target keys) | |
| 902 # normalize input args | |
| 903 src_key = source_nomenclature.strip().lower() | |
| 904 tgt_key = target_nomenclature.strip().lower() | |
| 905 | |
| 906 # try to find the actual column names for requested keys | |
| 907 col_for_src = None | |
| 908 col_for_tgt = None | |
| 909 # first, try exact match | |
| 910 for k, actual in chosen.items(): | |
| 911 if k == src_key: | |
| 912 col_for_src = actual | |
| 913 if k == tgt_key: | |
| 914 col_for_tgt = actual | |
| 915 | |
| 916 # if not found, try mapping common names | |
| 917 if col_for_src is None: | |
| 918 possible_src_names = {k: v for k, v in chosen.items()} | |
| 919 # try to match by contained substring | |
| 920 for k, actual in possible_src_names.items(): | |
| 921 if src_key in k: | |
| 922 col_for_src = actual | |
| 923 break | |
| 924 | |
| 925 if col_for_tgt is None: | |
| 926 for k, actual in chosen.items(): | |
| 927 if tgt_key in k: | |
| 928 col_for_tgt = actual | |
| 929 break | |
| 930 | |
| 931 if col_for_src is None: | |
| 932 raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.") | |
| 933 if col_for_tgt is None: | |
| 934 raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.") | |
| 935 | |
| 936 model_source_genes = { _normalize_gene_id(g.id) for g in model.genes } | |
| 937 logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).") | |
| 938 | |
| 939 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() | |
| 940 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).apply(_normalize_gene_id) | |
| 941 | |
| 942 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() | |
| 943 | |
| 944 if filtered_map.empty: | |
| 945 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") | |
| 946 | |
| 947 if not allow_many_to_one: | |
| 948 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) | |
| 949 | |
| 950 # Crea il mapping | |
| 951 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) | |
| 952 | |
| 953 # copy model | |
| 954 model_copy = model.copy() | |
| 955 | |
| 956 # statistics | |
| 957 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0, 'flattened_or_gprs': 0} | |
| 958 unmapped = [] | |
| 959 multi = [] | |
| 960 | |
| 961 # Dictionary to store translation issues per reaction | |
| 962 reaction_translation_issues = {} | |
| 963 | |
| 964 original_genes = {g.id for g in model_copy.genes} | |
| 965 logger.info(f"Original genes count: {len(original_genes)}") | |
| 966 | |
| 967 # translate GPRs | |
| 968 for rxn in model_copy.reactions: | |
| 969 gpr = rxn.gene_reaction_rule | |
| 970 if gpr and gpr.strip(): | |
| 971 new_gpr, rxn_issues = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger, track_issues=True) | |
| 972 if rxn_issues: | |
| 973 reaction_translation_issues[rxn.id] = rxn_issues | |
| 974 | |
| 975 if new_gpr != gpr: | |
| 976 # Check if this GPR has translation issues and contains only OR operators | |
| 977 if rxn_issues and _is_or_only_expression(new_gpr): | |
| 978 # Flatten the GPR: remove parentheses and create set of unique genes | |
| 979 flattened_gpr = _flatten_or_only_gpr(new_gpr) | |
| 980 if flattened_gpr != new_gpr: | |
| 981 stats['flattened_or_gprs'] += 1 | |
| 982 logger.debug(f"Flattened OR-only GPR with issues for {rxn.id}: '{new_gpr}' -> '{flattened_gpr}'") | |
| 983 new_gpr = flattened_gpr | |
| 984 | |
| 985 simplified_gpr = _simplify_boolean_expression(new_gpr) | |
| 986 if simplified_gpr != new_gpr: | |
| 987 stats['simplified_gprs'] += 1 | |
| 988 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") | |
| 989 rxn.gene_reaction_rule = simplified_gpr | |
| 990 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'") | |
| 991 | |
| 992 # update model genes based on new GPRs | |
| 993 _update_model_genes(model_copy, logger) | |
| 994 | |
| 995 # final logging | |
| 996 _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger) | |
| 997 | |
| 998 logger.info("Translation finished") | |
| 999 return model_copy, reaction_translation_issues | |
| 1000 | |
| 1001 | |
| 1002 # ---------- helper functions ---------- | |
| 1003 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]: | |
| 1004 """ | |
| 1005 Build mapping dict: source_id -> list of target_ids | |
| 1006 Normalizes IDs (removes prefixes like 'HGNC:' etc). | |
| 1007 """ | |
| 1008 df = mapping_df[[source_col, target_col]].dropna().copy() | |
| 1009 # normalize to string | |
| 1010 df[source_col] = df[source_col].astype(str).apply(_normalize_gene_id) | |
| 1011 df[target_col] = df[target_col].astype(str).str.strip() | |
| 1012 | |
| 1013 df = df.drop_duplicates() | |
| 1014 | |
| 1015 logger.info(f"Creating mapping from {len(df)} rows") | |
| 1016 | |
| 1017 mapping = defaultdict(list) | |
| 1018 for _, row in df.iterrows(): | |
| 1019 s = row[source_col] | |
| 1020 t = row[target_col] | |
| 1021 if t not in mapping[s]: | |
| 1022 mapping[s].append(t) | |
| 1023 | |
| 1024 # stats | |
| 1025 one_to_one = sum(1 for v in mapping.values() if len(v) == 1) | |
| 1026 one_to_many = sum(1 for v in mapping.values() if len(v) > 1) | |
| 1027 logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many") | |
| 1028 return dict(mapping) | |
| 1029 | |
| 1030 | |
| 1031 def _translate_gpr(gpr_string: str, | |
| 1032 gene_mapping: Dict[str, List[str]], | |
| 1033 stats: Dict[str, int], | |
| 1034 unmapped_genes: List[str], | |
| 1035 multi_mapping_genes: List[Tuple[str, List[str]]], | |
| 1036 logger: logging.Logger, | |
| 1037 track_issues: bool = False) -> Union[str, Tuple[str, str]]: | |
| 1038 """ | |
| 1039 Translate genes inside a GPR string using gene_mapping. | |
| 1040 Returns new GPR string, and optionally translation issues if track_issues=True. | |
| 1041 """ | |
| 1042 # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols) | |
| 1043 token_pattern = r'\b[A-Za-z0-9:_.-]+\b' | |
| 1044 tokens = re.findall(token_pattern, gpr_string) | |
| 1045 | |
| 1046 logical = {'and', 'or', 'AND', 'OR', '(', ')'} | |
| 1047 tokens = [t for t in tokens if t not in logical] | |
| 1048 | |
| 1049 new_gpr = gpr_string | |
| 1050 issues = [] | |
| 1051 | |
| 1052 for token in sorted(set(tokens), key=lambda x: -len(x)): # longer tokens first to avoid partial replacement | |
| 1053 norm = _normalize_gene_id(token) | |
| 1054 if norm in gene_mapping: | |
| 1055 targets = gene_mapping[norm] | |
| 1056 stats['translated'] += 1 | |
| 1057 if len(targets) == 1: | |
| 1058 stats['one_to_one'] += 1 | |
| 1059 replacement = targets[0] | |
| 1060 else: | |
| 1061 stats['one_to_many'] += 1 | |
| 1062 multi_mapping_genes.append((token, targets)) | |
| 1063 replacement = "(" + " or ".join(targets) + ")" | |
| 1064 if track_issues: | |
| 1065 issues.append(f"{token} -> {' or '.join(targets)}") | |
| 1066 | |
| 1067 pattern = r'\b' + re.escape(token) + r'\b' | |
| 1068 new_gpr = re.sub(pattern, replacement, new_gpr) | |
| 1069 else: | |
| 1070 stats['not_found'] += 1 | |
| 1071 if token not in unmapped_genes: | |
| 1072 unmapped_genes.append(token) | |
| 1073 logger.debug(f"Token not found in mapping (left as-is): {token}") | |
| 1074 | |
| 1075 # Check for many-to-one cases (multiple source genes mapping to same target) | |
| 1076 if track_issues: | |
| 1077 # Build reverse mapping to detect many-to-one cases from original tokens | |
| 1078 original_to_target = {} | |
| 1079 | |
| 1080 for orig_token in tokens: | |
| 1081 norm = _normalize_gene_id(orig_token) | |
| 1082 if norm in gene_mapping: | |
| 1083 targets = gene_mapping[norm] | |
| 1084 for target in targets: | |
| 1085 if target not in original_to_target: | |
| 1086 original_to_target[target] = [] | |
| 1087 if orig_token not in original_to_target[target]: | |
| 1088 original_to_target[target].append(orig_token) | |
| 1089 | |
| 1090 # Identify many-to-one mappings in this specific GPR | |
| 1091 for target, original_genes in original_to_target.items(): | |
| 1092 if len(original_genes) > 1: | |
| 1093 issues.append(f"{' or '.join(original_genes)} -> {target}") | |
| 1094 | |
| 1095 issue_text = "; ".join(issues) if issues else "" | |
| 1096 | |
| 1097 if track_issues: | |
| 1098 return new_gpr, issue_text | |
| 1099 else: | |
| 1100 return new_gpr | |
| 1101 | |
| 1102 | |
| 1103 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger): | |
| 1104 """ | |
| 1105 Rebuild model.genes from gene_reaction_rule content. | |
| 1106 Removes genes not referenced and adds missing ones. | |
| 1107 """ | |
| 1108 # collect genes in GPRs | |
| 1109 gene_pattern = r'\b[A-Za-z0-9:_.-]+\b' | |
| 1110 logical = {'and', 'or', 'AND', 'OR', '(', ')'} | |
| 1111 genes_in_gpr: Set[str] = set() | |
| 1112 | |
| 1113 for rxn in model.reactions: | |
| 1114 gpr = rxn.gene_reaction_rule | |
| 1115 if gpr and gpr.strip(): | |
| 1116 toks = re.findall(gene_pattern, gpr) | |
| 1117 toks = [t for t in toks if t not in logical] | |
| 1118 # normalize IDs consistent with mapping normalization | |
| 1119 toks = [_normalize_gene_id(t) for t in toks] | |
| 1120 genes_in_gpr.update(toks) | |
| 1121 | |
| 1122 # existing gene ids | |
| 1123 existing = {g.id for g in model.genes} | |
| 1124 | |
| 1125 # remove obsolete genes | |
| 1126 to_remove = [gid for gid in existing if gid not in genes_in_gpr] | |
| 1127 removed = 0 | |
| 1128 for gid in to_remove: | |
| 1129 try: | |
| 1130 gene_obj = model.genes.get_by_id(gid) | |
| 1131 model.genes.remove(gene_obj) | |
| 1132 removed += 1 | |
| 1133 except Exception: | |
| 1134 # safe-ignore | |
| 1135 pass | |
| 1136 | |
| 1137 # add new genes | |
| 1138 added = 0 | |
| 1139 for gid in genes_in_gpr: | |
| 1140 if gid not in existing: | |
| 1141 new_gene = cobra.Gene(gid) | |
| 1142 try: | |
| 1143 model.genes.add(new_gene) | |
| 1144 except Exception: | |
| 1145 # fallback: if model.genes doesn't support add, try append or model.add_genes | |
| 1146 try: | |
| 1147 model.genes.append(new_gene) | |
| 1148 except Exception: | |
| 1149 try: | |
| 1150 model.add_genes([new_gene]) | |
| 1151 except Exception: | |
| 1152 logger.warning(f"Could not add gene object for {gid}") | |
| 1153 added += 1 | |
| 1154 | |
| 1155 logger.info(f"Model genes updated: removed {removed}, added {added}") | |
| 1156 | |
| 1157 | |
| 1158 def export_model_to_tabular(model: cobraModel, | |
| 1159 output_path: str, | |
| 1160 translation_issues: Dict = None, | |
| 1161 include_objective: bool = True, | |
| 1162 save_function = None) -> pd.DataFrame: | |
| 1163 """ | |
| 1164 Export a COBRA model to tabular format with optional components. | |
| 1165 | |
| 1166 Args: | |
| 1167 model: COBRA model to export | |
| 1168 output_path: Path where to save the tabular file | |
| 1169 translation_issues: Optional dict of {reaction_id: issues} from gene translation | |
| 1170 include_objective: Whether to include objective coefficient column | |
| 1171 save_function: Optional custom save function, if None uses pd.DataFrame.to_csv | |
| 1172 | |
| 1173 Returns: | |
| 1174 pd.DataFrame: The merged tabular data | |
| 1175 """ | |
| 1176 # Generate model data | |
| 1177 rules = generate_rules(model, asParsed=False) | |
| 1178 | |
| 1179 reactions = generate_reactions(model, asParsed=False) | |
| 1180 bounds = generate_bounds(model) | |
| 1181 medium = get_medium(model) | |
| 1182 compartments = generate_compartments(model) | |
| 1183 | |
| 1184 # Create base DataFrames | |
| 1185 df_rules = pd.DataFrame(list(rules.items()), columns=["ReactionID", "GPR"]) | |
| 1186 df_reactions = pd.DataFrame(list(reactions.items()), columns=["ReactionID", "Formula"]) | |
| 1187 df_bounds = bounds.reset_index().rename(columns={"index": "ReactionID"}) | |
| 1188 df_medium = medium.rename(columns={"reaction": "ReactionID"}) | |
| 1189 df_medium["InMedium"] = True | |
| 1190 | |
| 1191 # Start merging | |
| 1192 merged = df_reactions.merge(df_rules, on="ReactionID", how="outer") | |
| 1193 merged = merged.merge(df_bounds, on="ReactionID", how="outer") | |
| 1194 | |
| 1195 # Add objective coefficients if requested | |
| 1196 if include_objective: | |
| 1197 objective_function = extract_objective_coefficients(model) | |
| 1198 merged = merged.merge(objective_function, on="ReactionID", how="outer") | |
| 1199 | |
| 1200 # Add compartments/pathways if they exist | |
| 1201 if compartments is not None: | |
| 1202 merged = merged.merge(compartments, on="ReactionID", how="outer") | |
| 1203 | |
| 1204 # Add medium information | |
| 1205 merged = merged.merge(df_medium, on="ReactionID", how="left") | |
| 1206 | |
| 1207 # Add translation issues if provided | |
| 1208 if translation_issues: | |
| 1209 df_translation_issues = pd.DataFrame([ | |
| 1210 {"ReactionID": rxn_id, "TranslationIssues": issues} | |
| 1211 for rxn_id, issues in translation_issues.items() | |
| 1212 ]) | |
| 1213 if not df_translation_issues.empty: | |
| 1214 merged = merged.merge(df_translation_issues, on="ReactionID", how="left") | |
| 1215 merged["TranslationIssues"] = merged["TranslationIssues"].fillna("") | |
| 1216 | |
| 1217 # Final processing | |
| 1218 merged["InMedium"] = merged["InMedium"].fillna(False) | |
| 1219 merged = merged.sort_values(by="InMedium", ascending=False) | |
| 1220 | |
| 1221 # Save the file | |
| 1222 if save_function: | |
| 1223 save_function(merged, output_path) | |
| 1224 else: | |
| 1225 merged.to_csv(output_path, sep="\t", index=False) | |
| 1226 | |
| 1227 return merged | |
| 1228 | |
| 1229 | |
| 1230 def _log_translation_statistics(stats: Dict[str, int], | |
| 1231 unmapped_genes: List[str], | |
| 1232 multi_mapping_genes: List[Tuple[str, List[str]]], | |
| 1233 original_genes: Set[str], | |
| 1234 final_genes, | |
| 1235 logger: logging.Logger): | |
| 1236 logger.info("=== TRANSLATION STATISTICS ===") | |
| 1237 logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})") | |
| 1238 logger.info(f"Not found tokens: {stats.get('not_found', 0)}") | |
| 1239 logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}") | |
| 1240 logger.info(f"Flattened OR-only GPRs with issues: {stats.get('flattened_or_gprs', 0)}") | |
| 1241 | |
| 1242 final_ids = {g.id for g in final_genes} | |
| 1243 logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}") | |
| 1244 | |
| 1245 if unmapped_genes: | |
| 1246 logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}") | |
| 1247 if multi_mapping_genes: | |
| 1248 logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):") | |
| 1249 for orig, targets in multi_mapping_genes[:10]: | |
| 1250 logger.info(f" {orig} -> {', '.join(targets)}") | |
| 1251 | |
| 1252 # Log summary of flattened GPRs if any | |
| 1253 if stats.get('flattened_or_gprs', 0) > 0: | |
| 1254 logger.info(f"Flattened {stats['flattened_or_gprs']} OR-only GPRs that had translation issues (removed parentheses, created unique gene sets)") | |
| 1255 | |
| 1256 |
