Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/model_utils.py @ 456:a6e45049c1b9 draft default tip
Uploaded
| author | francesco_lapi |
|---|---|
| date | Fri, 12 Sep 2025 17:28:45 +0000 |
| parents | 4e2bc80764b6 |
| children |
comparison
equal
deleted
inserted
replaced
| 455:4e2bc80764b6 | 456:a6e45049c1b9 |
|---|---|
| 1 """ | |
| 2 Utilities for generating and manipulating COBRA models and related metadata. | |
| 3 | |
| 4 This module includes helpers to: | |
| 5 - extract rules, reactions, bounds, objective coefficients, and compartments | |
| 6 - build a COBRA model from a tabular file | |
| 7 - set objective and medium from dataframes | |
| 8 - validate a model and convert gene identifiers | |
| 9 - translate model GPRs using mapping tables | |
| 10 """ | |
| 1 import os | 11 import os |
| 2 import csv | |
| 3 import cobra | 12 import cobra |
| 4 import pickle | |
| 5 import argparse | |
| 6 import pandas as pd | 13 import pandas as pd |
| 7 import re | 14 import re |
| 8 import logging | 15 import logging |
| 9 from typing import Optional, Tuple, Union, List, Dict, Set | 16 from typing import Optional, Tuple, Union, List, Dict, Set |
| 10 from collections import defaultdict | 17 from collections import defaultdict |
| 11 import utils.general_utils as utils | |
| 12 import utils.rule_parsing as rulesUtils | 18 import utils.rule_parsing as rulesUtils |
| 13 import utils.reaction_parsing as reactionUtils | 19 import utils.reaction_parsing as reactionUtils |
| 14 from cobra import Model as cobraModel, Reaction, Metabolite | 20 from cobra import Model as cobraModel, Reaction, Metabolite |
| 15 | 21 |
| 16 ################################- DATA GENERATION -################################ | 22 ################################- DATA GENERATION -################################ |
| 17 ReactionId = str | 23 ReactionId = str |
| 18 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | 24 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: |
| 19 """ | 25 """ |
| 20 Generates a dictionary mapping reaction ids to rules from the model. | 26 Generate a dictionary mapping reaction IDs to GPR rules from the model. |
| 21 | 27 |
| 22 Args: | 28 Args: |
| 23 model : the model to derive data from. | 29 model: COBRA model to derive data from. |
| 24 asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings. | 30 asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings. |
| 25 | 31 |
| 26 Returns: | 32 Returns: |
| 27 Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules. | 33 Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID. |
| 28 Dict[ReactionId, str] : the generated dictionary of raw rules. | 34 Dict[ReactionId, str]: Raw rules by reaction ID. |
| 29 """ | 35 """ |
| 30 # Is the below approach convoluted? yes | |
| 31 # Ok but is it inefficient? probably | |
| 32 # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane) | |
| 33 _ruleGetter = lambda reaction : reaction.gene_reaction_rule | 36 _ruleGetter = lambda reaction : reaction.gene_reaction_rule |
| 34 ruleExtractor = (lambda reaction : | 37 ruleExtractor = (lambda reaction : |
| 35 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter | 38 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter |
| 36 | 39 |
| 37 return { | 40 return { |
| 39 for reaction in model.reactions | 42 for reaction in model.reactions |
| 40 if reaction.gene_reaction_rule } | 43 if reaction.gene_reaction_rule } |
| 41 | 44 |
| 42 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: | 45 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: |
| 43 """ | 46 """ |
| 44 Generates a dictionary mapping reaction ids to reaction formulas from the model. | 47 Generate a dictionary mapping reaction IDs to reaction formulas from the model. |
| 45 | 48 |
| 46 Args: | 49 Args: |
| 47 model : the model to derive data from. | 50 model: COBRA model to derive data from. |
| 48 asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are. | 51 asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings. |
| 49 | 52 |
| 50 Returns: | 53 Returns: |
| 51 Dict[ReactionId, str] : the generated dictionary. | 54 Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested). |
| 52 """ | 55 """ |
| 53 | 56 |
| 54 unparsedReactions = { | 57 unparsedReactions = { |
| 55 reaction.id : reaction.reaction | 58 reaction.id : reaction.reaction |
| 56 for reaction in model.reactions | 59 for reaction in model.reactions |
| 60 if not asParsed: return unparsedReactions | 63 if not asParsed: return unparsedReactions |
| 61 | 64 |
| 62 return reactionUtils.create_reaction_dict(unparsedReactions) | 65 return reactionUtils.create_reaction_dict(unparsedReactions) |
| 63 | 66 |
| 64 def get_medium(model:cobraModel) -> pd.DataFrame: | 67 def get_medium(model:cobraModel) -> pd.DataFrame: |
| 68 """ | |
| 69 Extract the uptake reactions representing the model medium. | |
| 70 | |
| 71 Returns a DataFrame with a single column 'reaction' listing exchange reactions | |
| 72 with negative lower bound and no positive stoichiometric coefficients (uptake only). | |
| 73 """ | |
| 65 trueMedium=[] | 74 trueMedium=[] |
| 66 for r in model.reactions: | 75 for r in model.reactions: |
| 67 positiveCoeff=0 | 76 positiveCoeff=0 |
| 68 for m in r.metabolites: | 77 for m in r.metabolites: |
| 69 if r.get_coefficient(m.id)>0: | 78 if r.get_coefficient(m.id)>0: |
| 75 df_medium["reaction"] = trueMedium | 84 df_medium["reaction"] = trueMedium |
| 76 return df_medium | 85 return df_medium |
| 77 | 86 |
| 78 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame: | 87 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame: |
| 79 """ | 88 """ |
| 80 Estrae i coefficienti della funzione obiettivo per ciascuna reazione del modello. | 89 Extract objective coefficients for each reaction. |
| 81 | 90 |
| 82 Args: | 91 Args: |
| 83 model : cobra.Model | 92 model: COBRA model |
| 84 | 93 |
| 85 Returns: | 94 Returns: |
| 86 pd.DataFrame con colonne: ReactionID, ObjectiveCoefficient | 95 pd.DataFrame with columns: ReactionID, ObjectiveCoefficient |
| 87 """ | 96 """ |
| 88 coeffs = [] | 97 coeffs = [] |
| 89 # model.objective.expression è un'espressione lineare | 98 # model.objective.expression is a linear expression |
| 90 objective_expr = model.objective.expression.as_coefficients_dict() | 99 objective_expr = model.objective.expression.as_coefficients_dict() |
| 91 | 100 |
| 92 for reaction in model.reactions: | 101 for reaction in model.reactions: |
| 93 coeff = objective_expr.get(reaction.forward_variable, 0.0) | 102 coeff = objective_expr.get(reaction.forward_variable, 0.0) |
| 94 coeffs.append({ | 103 coeffs.append({ |
| 97 }) | 106 }) |
| 98 | 107 |
| 99 return pd.DataFrame(coeffs) | 108 return pd.DataFrame(coeffs) |
| 100 | 109 |
| 101 def generate_bounds(model:cobraModel) -> pd.DataFrame: | 110 def generate_bounds(model:cobraModel) -> pd.DataFrame: |
| 111 """ | |
| 112 Build a DataFrame of lower/upper bounds for all reactions. | |
| 113 | |
| 114 Returns: | |
| 115 pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound']. | |
| 116 """ | |
| 102 | 117 |
| 103 rxns = [] | 118 rxns = [] |
| 104 for reaction in model.reactions: | 119 for reaction in model.reactions: |
| 105 rxns.append(reaction.id) | 120 rxns.append(reaction.id) |
| 106 | 121 |
| 158 | 173 |
| 159 | 174 |
| 160 | 175 |
| 161 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: | 176 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: |
| 162 """ | 177 """ |
| 163 Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. | 178 Build a COBRApy model from a tabular file with reaction data. |
| 164 | 179 |
| 165 Args: | 180 Args: |
| 166 csv_path: Path al file CSV (separato da tab) | 181 csv_path: Path to the tab-separated file. |
| 167 model_id: ID del modello da creare | 182 model_id: ID for the newly created model. |
| 168 | 183 |
| 169 Returns: | 184 Returns: |
| 170 cobra.Model: Il modello COBRApy costruito | 185 cobra.Model: The constructed COBRApy model. |
| 171 """ | 186 """ |
| 172 | 187 |
| 173 # Leggi i dati dal CSV | |
| 174 df = pd.read_csv(csv_path, sep='\t') | 188 df = pd.read_csv(csv_path, sep='\t') |
| 175 | 189 |
| 176 # Crea il modello vuoto | |
| 177 model = cobraModel(model_id) | 190 model = cobraModel(model_id) |
| 178 | 191 |
| 179 # Dict per tenere traccia di metaboliti e compartimenti | |
| 180 metabolites_dict = {} | 192 metabolites_dict = {} |
| 181 compartments_dict = {} | 193 compartments_dict = {} |
| 182 | 194 |
| 183 print(f"Costruendo modello da {len(df)} reazioni...") | 195 print(f"Building model from {len(df)} reactions...") |
| 184 | 196 |
| 185 # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni | |
| 186 for idx, row in df.iterrows(): | 197 for idx, row in df.iterrows(): |
| 187 reaction_formula = str(row['Formula']).strip() | 198 reaction_formula = str(row['Formula']).strip() |
| 188 if not reaction_formula or reaction_formula == 'nan': | 199 if not reaction_formula or reaction_formula == 'nan': |
| 189 continue | 200 continue |
| 190 | 201 |
| 191 # Estrai metaboliti dalla formula della reazione | |
| 192 metabolites = extract_metabolites_from_reaction(reaction_formula) | 202 metabolites = extract_metabolites_from_reaction(reaction_formula) |
| 193 | 203 |
| 194 for met_id in metabolites: | 204 for met_id in metabolites: |
| 195 compartment = extract_compartment_from_metabolite(met_id) | 205 compartment = extract_compartment_from_metabolite(met_id) |
| 196 | 206 |
| 197 # Aggiungi compartimento se non esiste | |
| 198 if compartment not in compartments_dict: | 207 if compartment not in compartments_dict: |
| 199 compartments_dict[compartment] = compartment | 208 compartments_dict[compartment] = compartment |
| 200 | 209 |
| 201 # Aggiungi metabolita se non esiste | |
| 202 if met_id not in metabolites_dict: | 210 if met_id not in metabolites_dict: |
| 203 metabolites_dict[met_id] = Metabolite( | 211 metabolites_dict[met_id] = Metabolite( |
| 204 id=met_id, | 212 id=met_id, |
| 205 compartment=compartment, | 213 compartment=compartment, |
| 206 name=met_id.replace(f"_{compartment}", "").replace("__", "_") | 214 name=met_id.replace(f"_{compartment}", "").replace("__", "_") |
| 207 ) | 215 ) |
| 208 | 216 |
| 209 # Aggiungi compartimenti al modello | |
| 210 model.compartments = compartments_dict | 217 model.compartments = compartments_dict |
| 211 | 218 |
| 212 # Aggiungi metaboliti al modello | |
| 213 model.add_metabolites(list(metabolites_dict.values())) | 219 model.add_metabolites(list(metabolites_dict.values())) |
| 214 | 220 |
| 215 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") | 221 print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments") |
| 216 | 222 |
| 217 # Seconda passata: aggiungi le reazioni | |
| 218 reactions_added = 0 | 223 reactions_added = 0 |
| 219 reactions_skipped = 0 | 224 reactions_skipped = 0 |
| 220 | 225 |
| 221 for idx, row in df.iterrows(): | 226 for idx, row in df.iterrows(): |
| 222 | 227 |
| 223 reaction_id = str(row['ReactionID']).strip() | 228 reaction_id = str(row['ReactionID']).strip() |
| 224 reaction_formula = str(row['Formula']).strip() | 229 reaction_formula = str(row['Formula']).strip() |
| 225 | 230 |
| 226 # Salta reazioni senza formula | |
| 227 if not reaction_formula or reaction_formula == 'nan': | 231 if not reaction_formula or reaction_formula == 'nan': |
| 228 raise ValueError(f"Formula della reazione mancante {reaction_id}") | 232 raise ValueError(f"Missing reaction formula for {reaction_id}") |
| 229 | 233 |
| 230 # Crea la reazione | |
| 231 reaction = Reaction(reaction_id) | 234 reaction = Reaction(reaction_id) |
| 232 reaction.name = reaction_id | 235 reaction.name = reaction_id |
| 233 | 236 |
| 234 # Imposta bounds | |
| 235 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 | 237 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 |
| 236 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 | 238 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 |
| 237 | 239 |
| 238 # Aggiungi gene rule se presente | |
| 239 if pd.notna(row['GPR']) and str(row['GPR']).strip(): | 240 if pd.notna(row['GPR']) and str(row['GPR']).strip(): |
| 240 reaction.gene_reaction_rule = str(row['GPR']).strip() | 241 reaction.gene_reaction_rule = str(row['GPR']).strip() |
| 241 | 242 |
| 242 # Parse della formula della reazione | |
| 243 try: | 243 try: |
| 244 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) | 244 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) |
| 245 except Exception as e: | 245 except Exception as e: |
| 246 print(f"Errore nel parsing della reazione {reaction_id}: {e}") | 246 print(f"Error parsing reaction {reaction_id}: {e}") |
| 247 reactions_skipped += 1 | 247 reactions_skipped += 1 |
| 248 continue | 248 continue |
| 249 | 249 |
| 250 # Aggiungi la reazione al modello | |
| 251 model.add_reactions([reaction]) | 250 model.add_reactions([reaction]) |
| 252 reactions_added += 1 | 251 reactions_added += 1 |
| 253 | 252 |
| 254 | 253 |
| 255 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") | 254 print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions") |
| 256 | 255 |
| 257 # set objective function | 256 # set objective function |
| 258 set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient") | 257 set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient") |
| 259 | 258 |
| 260 # Imposta il medium | |
| 261 set_medium_from_data(model, df) | 259 set_medium_from_data(model, df) |
| 262 | 260 |
| 263 print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") | 261 print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites") |
| 264 | 262 |
| 265 return model | 263 return model |
| 266 | 264 |
| 267 | 265 |
| 268 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) | 266 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) |
| 269 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: | 267 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: |
| 270 """ | 268 """ |
| 271 Estrae gli ID dei metaboliti da una formula di reazione. | 269 Extract metabolite IDs from a reaction formula. |
| 272 Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) | 270 Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e), |
| 273 e permette che comincino con cifre o underscore. | 271 allowing leading digits/underscores. |
| 274 """ | 272 """ |
| 275 metabolites = set() | 273 metabolites = set() |
| 276 # coefficiente opzionale seguito da un token che termina con _<letters> | 274 # optional coefficient followed by a token ending with _<letters> |
| 277 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' | 275 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' |
| 278 matches = re.findall(pattern, reaction_formula) | 276 matches = re.findall(pattern, reaction_formula) |
| 279 metabolites.update(matches) | 277 metabolites.update(matches) |
| 280 return metabolites | 278 return metabolites |
| 281 | 279 |
| 282 | 280 |
| 283 def extract_compartment_from_metabolite(metabolite_id: str) -> str: | 281 def extract_compartment_from_metabolite(metabolite_id: str) -> str: |
| 284 """ | 282 """Extract the compartment from a metabolite ID.""" |
| 285 Estrae il compartimento dall'ID del metabolita. | |
| 286 """ | |
| 287 # Il compartimento è solitamente l'ultima lettera dopo l'underscore | |
| 288 if '_' in metabolite_id: | 283 if '_' in metabolite_id: |
| 289 return metabolite_id.split('_')[-1] | 284 return metabolite_id.split('_')[-1] |
| 290 return 'c' # default cytoplasm | 285 return 'c' # default cytoplasm |
| 291 | 286 |
| 292 | 287 |
| 293 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): | 288 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): |
| 294 """ | 289 """Parse a reaction formula and set metabolites with their coefficients.""" |
| 295 Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. | 290 |
| 296 """ | |
| 297 | |
| 298 if reaction.id == 'EX_thbpt_e': | |
| 299 print(reaction.id) | |
| 300 print(formula) | |
| 301 # Dividi in parte sinistra e destra | |
| 302 if '<=>' in formula: | 291 if '<=>' in formula: |
| 303 left, right = formula.split('<=>') | 292 left, right = formula.split('<=>') |
| 304 reversible = True | 293 reversible = True |
| 305 elif '<--' in formula: | 294 elif '<--' in formula: |
| 306 left, right = formula.split('<--') | 295 left, right = formula.split('<--') |
| 307 reversible = False | 296 reversible = False |
| 308 left, right = left, right | |
| 309 elif '-->' in formula: | 297 elif '-->' in formula: |
| 310 left, right = formula.split('-->') | 298 left, right = formula.split('-->') |
| 311 reversible = False | 299 reversible = False |
| 312 elif '<-' in formula: | 300 elif '<-' in formula: |
| 313 left, right = formula.split('<-') | 301 left, right = formula.split('<-') |
| 314 reversible = False | 302 reversible = False |
| 315 left, right = left, right | |
| 316 else: | 303 else: |
| 317 raise ValueError(f"Formato reazione non riconosciuto: {formula}") | 304 raise ValueError(f"Unrecognized reaction format: {formula}") |
| 318 | 305 |
| 319 # Parse dei metaboliti e coefficienti | |
| 320 reactants = parse_metabolites_side(left.strip()) | 306 reactants = parse_metabolites_side(left.strip()) |
| 321 products = parse_metabolites_side(right.strip()) | 307 products = parse_metabolites_side(right.strip()) |
| 322 | 308 |
| 323 # Aggiungi metaboliti alla reazione | |
| 324 metabolites_to_add = {} | 309 metabolites_to_add = {} |
| 325 | 310 |
| 326 # Reagenti (coefficienti negativi) | |
| 327 for met_id, coeff in reactants.items(): | 311 for met_id, coeff in reactants.items(): |
| 328 if met_id in metabolites_dict: | 312 if met_id in metabolites_dict: |
| 329 metabolites_to_add[metabolites_dict[met_id]] = -coeff | 313 metabolites_to_add[metabolites_dict[met_id]] = -coeff |
| 330 | 314 |
| 331 # Prodotti (coefficienti positivi) | |
| 332 for met_id, coeff in products.items(): | 315 for met_id, coeff in products.items(): |
| 333 if met_id in metabolites_dict: | 316 if met_id in metabolites_dict: |
| 334 metabolites_to_add[metabolites_dict[met_id]] = coeff | 317 metabolites_to_add[metabolites_dict[met_id]] = coeff |
| 335 | 318 |
| 336 reaction.add_metabolites(metabolites_to_add) | 319 reaction.add_metabolites(metabolites_to_add) |
| 337 | 320 |
| 338 | 321 |
| 339 def parse_metabolites_side(side_str: str) -> Dict[str, float]: | 322 def parse_metabolites_side(side_str: str) -> Dict[str, float]: |
| 340 """ | 323 """Parse one side of a reaction and extract metabolites with coefficients.""" |
| 341 Parsa un lato della reazione per estrarre metaboliti e coefficienti. | |
| 342 """ | |
| 343 metabolites = {} | 324 metabolites = {} |
| 344 if not side_str or side_str.strip() == '': | 325 if not side_str or side_str.strip() == '': |
| 345 return metabolites | 326 return metabolites |
| 346 | 327 |
| 347 terms = side_str.split('+') | 328 terms = side_str.split('+') |
| 348 for term in terms: | 329 for term in terms: |
| 349 term = term.strip() | 330 term = term.strip() |
| 350 if not term: | 331 if not term: |
| 351 continue | 332 continue |
| 352 | 333 |
| 353 # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> | 334 # optional coefficient + id ending with _<compartment> |
| 354 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) | 335 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) |
| 355 if match: | 336 if match: |
| 356 coeff_str, met_id = match.groups() | 337 coeff_str, met_id = match.groups() |
| 357 coeff = float(coeff_str) if coeff_str else 1.0 | 338 coeff = float(coeff_str) if coeff_str else 1.0 |
| 358 metabolites[met_id] = coeff | 339 metabolites[met_id] = coeff |
| 370 | 351 |
| 371 for idx, row in df.iterrows(): | 352 for idx, row in df.iterrows(): |
| 372 reaction_id = str(row['ReactionID']).strip() | 353 reaction_id = str(row['ReactionID']).strip() |
| 373 coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0 | 354 coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0 |
| 374 if coeff != 0: | 355 if coeff != 0: |
| 375 # Assicuriamoci che la reazione esista nel modello | |
| 376 if reaction_id in model.reactions: | 356 if reaction_id in model.reactions: |
| 377 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff | 357 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff |
| 378 else: | 358 else: |
| 379 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.") | 359 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.") |
| 380 | 360 |
| 381 if not obj_dict: | 361 if not obj_dict: |
| 382 raise ValueError("No reactions found with non-zero objective coefficient.") | 362 raise ValueError("No reactions found with non-zero objective coefficient.") |
| 383 | 363 |
| 384 # Imposta la funzione obiettivo | |
| 385 model.objective = obj_dict | 364 model.objective = obj_dict |
| 386 print(f"Objective set with {len(obj_dict)} reactions.") | 365 print(f"Objective set with {len(obj_dict)} reactions.") |
| 387 | 366 |
| 388 | 367 |
| 389 | 368 |
| 390 | 369 |
| 391 def set_medium_from_data(model: cobraModel, df: pd.DataFrame): | 370 def set_medium_from_data(model: cobraModel, df: pd.DataFrame): |
| 392 """ | 371 """Set the medium based on the 'InMedium' column in the dataframe.""" |
| 393 Imposta il medium basato sulla colonna InMedium. | |
| 394 """ | |
| 395 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() | 372 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() |
| 396 | 373 |
| 397 medium_dict = {} | 374 medium_dict = {} |
| 398 for rxn_id in medium_reactions: | 375 for rxn_id in medium_reactions: |
| 399 if rxn_id in [r.id for r in model.reactions]: | 376 if rxn_id in [r.id for r in model.reactions]: |
| 401 if reaction.lower_bound < 0: # Solo reazioni di uptake | 378 if reaction.lower_bound < 0: # Solo reazioni di uptake |
| 402 medium_dict[rxn_id] = abs(reaction.lower_bound) | 379 medium_dict[rxn_id] = abs(reaction.lower_bound) |
| 403 | 380 |
| 404 if medium_dict: | 381 if medium_dict: |
| 405 model.medium = medium_dict | 382 model.medium = medium_dict |
| 406 print(f"Medium impostato con {len(medium_dict)} componenti") | 383 print(f"Medium set with {len(medium_dict)} components") |
| 407 | 384 |
| 408 | 385 |
| 409 def validate_model(model: cobraModel) -> Dict[str, any]: | 386 def validate_model(model: cobraModel) -> Dict[str, any]: |
| 410 """ | 387 """Validate the model and return basic statistics.""" |
| 411 Valida il modello e fornisce statistiche di base. | |
| 412 """ | |
| 413 validation = { | 388 validation = { |
| 414 'num_reactions': len(model.reactions), | 389 'num_reactions': len(model.reactions), |
| 415 'num_metabolites': len(model.metabolites), | 390 'num_metabolites': len(model.metabolites), |
| 416 'num_genes': len(model.genes), | 391 'num_genes': len(model.genes), |
| 417 'num_compartments': len(model.compartments), | 392 'num_compartments': len(model.compartments), |
| 420 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), | 395 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), |
| 421 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), | 396 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), |
| 422 } | 397 } |
| 423 | 398 |
| 424 try: | 399 try: |
| 425 # Test di crescita | 400 # Growth test |
| 426 solution = model.optimize() | 401 solution = model.optimize() |
| 427 validation['growth_rate'] = solution.objective_value | 402 validation['growth_rate'] = solution.objective_value |
| 428 validation['status'] = solution.status | 403 validation['status'] = solution.status |
| 429 except Exception as e: | 404 except Exception as e: |
| 430 validation['growth_rate'] = None | 405 validation['growth_rate'] = None |
| 431 validation['status'] = f"Error: {e}" | 406 validation['status'] = f"Error: {e}" |
| 432 | 407 |
| 433 return validation | 408 return validation |
| 434 | 409 |
| 435 def convert_genes(model,annotation): | 410 def convert_genes(model, annotation): |
| 411 """Rename genes using a selected annotation key in gene.notes; returns a model copy.""" | |
| 436 from cobra.manipulation import rename_genes | 412 from cobra.manipulation import rename_genes |
| 437 model2=model.copy() | 413 model2=model.copy() |
| 438 try: | 414 try: |
| 439 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} | 415 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} |
| 440 except: | 416 except: |
| 448 def _normalize_colname(col: str) -> str: | 424 def _normalize_colname(col: str) -> str: |
| 449 return col.strip().lower().replace(' ', '_') | 425 return col.strip().lower().replace(' ', '_') |
| 450 | 426 |
| 451 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]: | 427 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]: |
| 452 """ | 428 """ |
| 453 Cerca colonne utili e ritorna dict {ensg: colname1, hgnc_id: colname2, ...} | 429 Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}. |
| 454 Lancia ValueError se non trova almeno un mapping utile. | 430 Raise ValueError if no suitable mapping is found. |
| 455 """ | 431 """ |
| 456 cols = { _normalize_colname(c): c for c in mapping_df.columns } | 432 cols = { _normalize_colname(c): c for c in mapping_df.columns } |
| 457 chosen = {} | 433 chosen = {} |
| 458 # possibili nomi per ciascuna categoria | 434 # candidate names for each category |
| 459 candidates = { | 435 candidates = { |
| 460 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], | 436 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], |
| 461 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], | 437 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], |
| 462 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], | 438 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], |
| 463 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'], | 439 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'], |
| 474 source_col: str, | 450 source_col: str, |
| 475 target_col: str, | 451 target_col: str, |
| 476 model_source_genes: Optional[Set[str]] = None, | 452 model_source_genes: Optional[Set[str]] = None, |
| 477 logger: Optional[logging.Logger] = None) -> None: | 453 logger: Optional[logging.Logger] = None) -> None: |
| 478 """ | 454 """ |
| 479 Verifica che, nel mapping_df (eventualmente già filtrato sui source di interesse), | 455 Check that, within the filtered mapping_df, each target maps to at most one source. |
| 480 ogni target sia associato ad al massimo un source. Se trova target associati a | 456 Log examples if duplicates are found. |
| 481 >1 source solleva ValueError mostrando esempi. | |
| 482 | |
| 483 - mapping_df: DataFrame con colonne source_col, target_col | |
| 484 - model_source_genes: se fornito, è un set di source normalizzati che stiamo traducendo | |
| 485 (se None, si usa tutto mapping_df) | |
| 486 """ | 457 """ |
| 487 if logger is None: | 458 if logger is None: |
| 488 logger = logging.getLogger(__name__) | 459 logger = logging.getLogger(__name__) |
| 489 | 460 |
| 490 if mapping_df.empty: | 461 if mapping_df.empty: |
| 491 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.") | 462 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.") |
| 492 return | 463 return |
| 493 | 464 |
| 494 # normalizza le colonne temporanee per gruppi (senza modificare il df originale) | 465 # normalize temporary columns for grouping (without altering the original df) |
| 495 tmp = mapping_df[[source_col, target_col]].copy() | 466 tmp = mapping_df[[source_col, target_col]].copy() |
| 496 tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id) | 467 tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id) |
| 497 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip() | 468 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip() |
| 498 | 469 |
| 499 # se è passato un insieme di geni modello, filtra qui (già fatto nella chiamata, ma doppio-check ok) | 470 # optionally filter to the set of model source genes |
| 500 if model_source_genes is not None: | 471 if model_source_genes is not None: |
| 501 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)] | 472 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)] |
| 502 | 473 |
| 503 if tmp.empty: | 474 if tmp.empty: |
| 504 logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.") | 475 logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.") |
| 505 return | 476 return |
| 506 | 477 |
| 507 # costruisci il reverse mapping target -> set(sources) | 478 # build reverse mapping: target -> set(sources) |
| 508 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna())) | 479 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna())) |
| 509 # trova target con più di 1 source | 480 # find targets with more than one source |
| 510 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} | 481 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} |
| 511 | 482 |
| 512 if problematic: | 483 if problematic: |
| 513 # prepara messaggio di errore con esempi (fino a 20) | 484 # prepare warning message with examples (limited subset) |
| 514 sample_items = list(problematic.items()) | 485 sample_items = list(problematic.items()) |
| 515 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] | 486 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] |
| 516 for tgt, sources in sample_items: | 487 for tgt, sources in sample_items: |
| 517 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") | 488 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") |
| 518 full_msg = "\n".join(msg_lines) | 489 full_msg = "\n".join(msg_lines) |
| 519 # loggare e sollevare errore | 490 # log warning |
| 520 logger.warning(full_msg) | 491 logger.warning(full_msg) |
| 521 | 492 |
| 522 # se tutto ok | 493 # if everything is fine |
| 523 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") | 494 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") |
| 524 | 495 |
| 525 | 496 |
| 526 def _normalize_gene_id(g: str) -> str: | 497 def _normalize_gene_id(g: str) -> str: |
| 527 """Rendi consistente un gene id per l'uso come chiave (rimuove prefissi come 'HGNC:' e strip).""" | 498 """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips).""" |
| 528 if g is None: | 499 if g is None: |
| 529 return "" | 500 return "" |
| 530 g = str(g).strip() | 501 g = str(g).strip() |
| 531 # remove common prefixes | 502 # remove common prefixes |
| 532 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) | 503 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) |
| 533 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) | 504 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) |
| 534 return g | 505 return g |
| 535 | 506 |
| 536 def _simplify_boolean_expression(expr: str) -> str: | 507 def _simplify_boolean_expression(expr: str) -> str: |
| 537 """ | 508 """ |
| 538 Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze. | 509 Simplify a boolean expression by removing duplicates and redundancies. |
| 539 Gestisce espressioni con 'and' e 'or'. | 510 Handles expressions with 'and' and 'or'. |
| 540 """ | 511 """ |
| 541 if not expr or not expr.strip(): | 512 if not expr or not expr.strip(): |
| 542 return expr | 513 return expr |
| 543 | 514 |
| 544 # Normalizza gli operatori | 515 # normalize operators |
| 545 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') | 516 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') |
| 546 | 517 |
| 547 # Funzione ricorsiva per processare le espressioni | 518 # recursive helper to process expressions |
| 548 def process_expression(s: str) -> str: | 519 def process_expression(s: str) -> str: |
| 549 s = s.strip() | 520 s = s.strip() |
| 550 if not s: | 521 if not s: |
| 551 return s | 522 return s |
| 552 | 523 |
| 553 # Gestisci le parentesi | 524 # handle parentheses |
| 554 while '(' in s: | 525 while '(' in s: |
| 555 # Trova la parentesi più interna | 526 # find the innermost parentheses |
| 556 start = -1 | 527 start = -1 |
| 557 for i, c in enumerate(s): | 528 for i, c in enumerate(s): |
| 558 if c == '(': | 529 if c == '(': |
| 559 start = i | 530 start = i |
| 560 elif c == ')' and start != -1: | 531 elif c == ')' and start != -1: |
| 561 # Processa il contenuto tra parentesi | 532 # process inner content |
| 562 inner = s[start+1:i] | 533 inner = s[start+1:i] |
| 563 processed_inner = process_expression(inner) | 534 processed_inner = process_expression(inner) |
| 564 s = s[:start] + processed_inner + s[i+1:] | 535 s = s[:start] + processed_inner + s[i+1:] |
| 565 break | 536 break |
| 566 else: | 537 else: |
| 567 break | 538 break |
| 568 | 539 |
| 569 # Dividi per 'or' al livello più alto | 540 # split by 'or' at top level |
| 570 or_parts = [] | 541 or_parts = [] |
| 571 current_part = "" | 542 current_part = "" |
| 572 paren_count = 0 | 543 paren_count = 0 |
| 573 | 544 |
| 574 tokens = s.split() | 545 tokens = s.split() |
| 588 i += 1 | 559 i += 1 |
| 589 | 560 |
| 590 if current_part.strip(): | 561 if current_part.strip(): |
| 591 or_parts.append(current_part.strip()) | 562 or_parts.append(current_part.strip()) |
| 592 | 563 |
| 593 # Processa ogni parte OR | 564 # process each OR part |
| 594 processed_or_parts = [] | 565 processed_or_parts = [] |
| 595 for or_part in or_parts: | 566 for or_part in or_parts: |
| 596 # Dividi per 'and' all'interno di ogni parte OR | 567 # split by 'and' within each OR part |
| 597 and_parts = [] | 568 and_parts = [] |
| 598 current_and = "" | 569 current_and = "" |
| 599 paren_count = 0 | 570 paren_count = 0 |
| 600 | 571 |
| 601 and_tokens = or_part.split() | 572 and_tokens = or_part.split() |
| 615 j += 1 | 586 j += 1 |
| 616 | 587 |
| 617 if current_and.strip(): | 588 if current_and.strip(): |
| 618 and_parts.append(current_and.strip()) | 589 and_parts.append(current_and.strip()) |
| 619 | 590 |
| 620 # Rimuovi duplicati nelle parti AND | 591 # deduplicate AND parts |
| 621 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine | 592 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine |
| 622 | 593 |
| 623 if len(unique_and_parts) == 1: | 594 if len(unique_and_parts) == 1: |
| 624 processed_or_parts.append(unique_and_parts[0]) | 595 processed_or_parts.append(unique_and_parts[0]) |
| 625 elif len(unique_and_parts) > 1: | 596 elif len(unique_and_parts) > 1: |
| 626 processed_or_parts.append(" and ".join(unique_and_parts)) | 597 processed_or_parts.append(" and ".join(unique_and_parts)) |
| 627 | 598 |
| 628 # Rimuovi duplicati nelle parti OR | 599 # deduplicate OR parts |
| 629 unique_or_parts = list(dict.fromkeys(processed_or_parts)) | 600 unique_or_parts = list(dict.fromkeys(processed_or_parts)) |
| 630 | 601 |
| 631 if len(unique_or_parts) == 1: | 602 if len(unique_or_parts) == 1: |
| 632 return unique_or_parts[0] | 603 return unique_or_parts[0] |
| 633 elif len(unique_or_parts) > 1: | 604 elif len(unique_or_parts) > 1: |
| 636 return "" | 607 return "" |
| 637 | 608 |
| 638 try: | 609 try: |
| 639 return process_expression(expr) | 610 return process_expression(expr) |
| 640 except Exception: | 611 except Exception: |
| 641 # Se la semplificazione fallisce, ritorna l'espressione originale | 612 # if simplification fails, return the original expression |
| 642 return expr | 613 return expr |
| 643 | 614 |
| 644 # ---------- Main public function ---------- | 615 # ---------- Main public function ---------- |
| 645 def translate_model_genes(model: 'cobra.Model', | 616 def translate_model_genes(model: 'cobra.Model', |
| 646 mapping_df: 'pd.DataFrame', | 617 mapping_df: 'pd.DataFrame', |
| 647 target_nomenclature: str, | 618 target_nomenclature: str, |
| 648 source_nomenclature: str = 'hgnc_id', | 619 source_nomenclature: str = 'hgnc_id', |
| 649 allow_many_to_one: bool = False, | 620 allow_many_to_one: bool = False, |
| 650 logger: Optional[logging.Logger] = None) -> 'cobra.Model': | 621 logger: Optional[logging.Logger] = None) -> 'cobra.Model': |
| 651 """ | 622 """ |
| 652 Translate model genes from source_nomenclature to target_nomenclature. | 623 Translate model genes from source_nomenclature to target_nomenclature using a mapping table. |
| 653 mapping_df should contain at least columns that allow the mapping | 624 mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez). |
| 654 (e.g. ensg, hgnc_id, hgnc_symbol, entrez). | 625 |
| 655 | |
| 656 Args: | 626 Args: |
| 657 allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target | 627 model: COBRA model to translate. |
| 658 e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target. | 628 mapping_df: DataFrame containing the mapping information. |
| 629 target_nomenclature: Desired target key (e.g., 'hgnc_symbol'). | |
| 630 source_nomenclature: Current source key in the model (default 'hgnc_id'). | |
| 631 allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs. | |
| 632 logger: Optional logger. | |
| 659 """ | 633 """ |
| 660 if logger is None: | 634 if logger is None: |
| 661 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | 635 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| 662 logger = logging.getLogger(__name__) | 636 logger = logging.getLogger(__name__) |
| 663 | 637 |
| 709 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() | 683 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() |
| 710 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id) | 684 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id) |
| 711 | 685 |
| 712 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() | 686 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() |
| 713 | 687 |
| 714 # Se non ci sono righe rilevanti, avvisa | |
| 715 if filtered_map.empty: | 688 if filtered_map.empty: |
| 716 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") | 689 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") |
| 717 | 690 |
| 718 # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one --- | |
| 719 if not allow_many_to_one: | 691 if not allow_many_to_one: |
| 720 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) | 692 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) |
| 721 | 693 |
| 722 # Crea il mapping | 694 # Crea il mapping |
| 723 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) | 695 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) |
| 737 for rxn in model_copy.reactions: | 709 for rxn in model_copy.reactions: |
| 738 gpr = rxn.gene_reaction_rule | 710 gpr = rxn.gene_reaction_rule |
| 739 if gpr and gpr.strip(): | 711 if gpr and gpr.strip(): |
| 740 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) | 712 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) |
| 741 if new_gpr != gpr: | 713 if new_gpr != gpr: |
| 742 # Semplifica l'espressione booleana per rimuovere duplicati | |
| 743 simplified_gpr = _simplify_boolean_expression(new_gpr) | 714 simplified_gpr = _simplify_boolean_expression(new_gpr) |
| 744 if simplified_gpr != new_gpr: | 715 if simplified_gpr != new_gpr: |
| 745 stats['simplified_gprs'] += 1 | 716 stats['simplified_gprs'] += 1 |
| 746 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") | 717 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") |
| 747 rxn.gene_reaction_rule = simplified_gpr | 718 rxn.gene_reaction_rule = simplified_gpr |
