456
|
1 """
|
|
2 Utilities for generating and manipulating COBRA models and related metadata.
|
|
3
|
|
4 This module includes helpers to:
|
|
5 - extract rules, reactions, bounds, objective coefficients, and compartments
|
|
6 - build a COBRA model from a tabular file
|
|
7 - set objective and medium from dataframes
|
|
8 - validate a model and convert gene identifiers
|
|
9 - translate model GPRs using mapping tables
|
|
10 """
|
418
|
11 import os
|
|
12 import cobra
|
|
13 import pandas as pd
|
419
|
14 import re
|
426
|
15 import logging
|
419
|
16 from typing import Optional, Tuple, Union, List, Dict, Set
|
426
|
17 from collections import defaultdict
|
418
|
18 import utils.rule_parsing as rulesUtils
|
419
|
19 import utils.reaction_parsing as reactionUtils
|
|
20 from cobra import Model as cobraModel, Reaction, Metabolite
|
490
|
21 import sys
|
|
22
|
|
23
|
|
24 ############################ check_methods ####################################
|
|
25 def gene_type(l :str, name :str) -> str:
|
|
26 """
|
|
27 Determine the type of gene ID.
|
|
28
|
|
29 Args:
|
|
30 l (str): The gene identifier to check.
|
|
31 name (str): The name of the dataset, used in error messages.
|
|
32
|
|
33 Returns:
|
|
34 str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id').
|
|
35
|
|
36 Raises:
|
|
37 sys.exit: If the gene ID type is not supported, the execution is aborted.
|
|
38 """
|
|
39 if check_hgnc(l):
|
|
40 return 'hugo_id'
|
|
41 elif check_ensembl(l):
|
|
42 return 'ensembl_gene_id'
|
|
43 elif check_symbol(l):
|
|
44 return 'symbol'
|
|
45 elif check_entrez(l):
|
|
46 return 'entrez_id'
|
|
47 else:
|
|
48 sys.exit('Execution aborted:\n' +
|
|
49 'gene ID type in ' + name + ' not supported. Supported ID'+
|
|
50 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
|
|
51
|
|
52 def check_hgnc(l :str) -> bool:
|
|
53 """
|
|
54 Check if a gene identifier follows the HGNC format.
|
|
55
|
|
56 Args:
|
|
57 l (str): The gene identifier to check.
|
|
58
|
|
59 Returns:
|
|
60 bool: True if the gene identifier follows the HGNC format, False otherwise.
|
|
61 """
|
|
62 if len(l) > 5:
|
|
63 if (l.upper()).startswith('HGNC:'):
|
|
64 return l[5:].isdigit()
|
|
65 else:
|
|
66 return False
|
|
67 else:
|
|
68 return False
|
|
69
|
|
70 def check_ensembl(l :str) -> bool:
|
|
71 """
|
|
72 Check if a gene identifier follows the Ensembl format.
|
|
73
|
|
74 Args:
|
|
75 l (str): The gene identifier to check.
|
|
76
|
|
77 Returns:
|
|
78 bool: True if the gene identifier follows the Ensembl format, False otherwise.
|
|
79 """
|
|
80 return l.upper().startswith('ENS')
|
|
81
|
|
82
|
|
83 def check_symbol(l :str) -> bool:
|
|
84 """
|
|
85 Check if a gene identifier follows the symbol format.
|
|
86
|
|
87 Args:
|
|
88 l (str): The gene identifier to check.
|
|
89
|
|
90 Returns:
|
|
91 bool: True if the gene identifier follows the symbol format, False otherwise.
|
|
92 """
|
|
93 if len(l) > 0:
|
|
94 if l[0].isalpha() and l[1:].isalnum():
|
|
95 return True
|
|
96 else:
|
|
97 return False
|
|
98 else:
|
|
99 return False
|
|
100
|
|
101 def check_entrez(l :str) -> bool:
|
|
102 """
|
|
103 Check if a gene identifier follows the Entrez ID format.
|
|
104
|
|
105 Args:
|
|
106 l (str): The gene identifier to check.
|
|
107
|
|
108 Returns:
|
|
109 bool: True if the gene identifier follows the Entrez ID format, False otherwise.
|
|
110 """
|
|
111 if len(l) > 0:
|
|
112 return l.isdigit()
|
|
113 else:
|
|
114 return False
|
418
|
115
|
|
116 ################################- DATA GENERATION -################################
|
|
117 ReactionId = str
|
419
|
118 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
|
418
|
119 """
|
456
|
120 Generate a dictionary mapping reaction IDs to GPR rules from the model.
|
418
|
121
|
|
122 Args:
|
456
|
123 model: COBRA model to derive data from.
|
|
124 asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings.
|
418
|
125
|
|
126 Returns:
|
456
|
127 Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID.
|
|
128 Dict[ReactionId, str]: Raw rules by reaction ID.
|
418
|
129 """
|
|
130 _ruleGetter = lambda reaction : reaction.gene_reaction_rule
|
|
131 ruleExtractor = (lambda reaction :
|
|
132 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
|
|
133
|
|
134 return {
|
|
135 reaction.id : ruleExtractor(reaction)
|
|
136 for reaction in model.reactions
|
|
137 if reaction.gene_reaction_rule }
|
|
138
|
419
|
139 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
|
418
|
140 """
|
456
|
141 Generate a dictionary mapping reaction IDs to reaction formulas from the model.
|
418
|
142
|
|
143 Args:
|
456
|
144 model: COBRA model to derive data from.
|
|
145 asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings.
|
418
|
146
|
|
147 Returns:
|
456
|
148 Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested).
|
418
|
149 """
|
|
150
|
|
151 unparsedReactions = {
|
|
152 reaction.id : reaction.reaction
|
|
153 for reaction in model.reactions
|
|
154 if reaction.reaction
|
|
155 }
|
|
156
|
|
157 if not asParsed: return unparsedReactions
|
|
158
|
|
159 return reactionUtils.create_reaction_dict(unparsedReactions)
|
|
160
|
419
|
161 def get_medium(model:cobraModel) -> pd.DataFrame:
|
456
|
162 """
|
|
163 Extract the uptake reactions representing the model medium.
|
|
164
|
|
165 Returns a DataFrame with a single column 'reaction' listing exchange reactions
|
|
166 with negative lower bound and no positive stoichiometric coefficients (uptake only).
|
|
167 """
|
418
|
168 trueMedium=[]
|
|
169 for r in model.reactions:
|
|
170 positiveCoeff=0
|
|
171 for m in r.metabolites:
|
|
172 if r.get_coefficient(m.id)>0:
|
|
173 positiveCoeff=1;
|
|
174 if (positiveCoeff==0 and r.lower_bound<0):
|
|
175 trueMedium.append(r.id)
|
|
176
|
|
177 df_medium = pd.DataFrame()
|
|
178 df_medium["reaction"] = trueMedium
|
|
179 return df_medium
|
|
180
|
426
|
181 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
|
|
182 """
|
456
|
183 Extract objective coefficients for each reaction.
|
|
184
|
426
|
185 Args:
|
456
|
186 model: COBRA model
|
|
187
|
426
|
188 Returns:
|
456
|
189 pd.DataFrame with columns: ReactionID, ObjectiveCoefficient
|
426
|
190 """
|
|
191 coeffs = []
|
456
|
192 # model.objective.expression is a linear expression
|
426
|
193 objective_expr = model.objective.expression.as_coefficients_dict()
|
|
194
|
|
195 for reaction in model.reactions:
|
|
196 coeff = objective_expr.get(reaction.forward_variable, 0.0)
|
|
197 coeffs.append({
|
|
198 "ReactionID": reaction.id,
|
|
199 "ObjectiveCoefficient": coeff
|
|
200 })
|
|
201
|
|
202 return pd.DataFrame(coeffs)
|
|
203
|
419
|
204 def generate_bounds(model:cobraModel) -> pd.DataFrame:
|
456
|
205 """
|
|
206 Build a DataFrame of lower/upper bounds for all reactions.
|
|
207
|
|
208 Returns:
|
|
209 pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound'].
|
|
210 """
|
418
|
211
|
|
212 rxns = []
|
|
213 for reaction in model.reactions:
|
|
214 rxns.append(reaction.id)
|
|
215
|
|
216 bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns)
|
|
217
|
|
218 for reaction in model.reactions:
|
|
219 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
|
|
220 return bounds
|
|
221
|
|
222
|
|
223
|
419
|
224 def generate_compartments(model: cobraModel) -> pd.DataFrame:
|
418
|
225 """
|
|
226 Generates a DataFrame containing compartment information for each reaction.
|
|
227 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.)
|
|
228
|
|
229 Args:
|
|
230 model: the COBRA model to extract compartment data from.
|
|
231
|
|
232 Returns:
|
|
233 pd.DataFrame: DataFrame with ReactionID and compartment columns
|
|
234 """
|
|
235 pathway_data = []
|
|
236
|
|
237 # First pass: determine the maximum number of pathways any reaction has
|
|
238 max_pathways = 0
|
|
239 reaction_pathways = {}
|
|
240
|
|
241 for reaction in model.reactions:
|
|
242 # Get unique pathways from all metabolites in the reaction
|
503
|
243 if 'pathways' in reaction.annotation:
|
|
244 if type(reaction.annotation['pathways']) == list:
|
|
245 reaction_pathways[reaction.id] = reaction.annotation['pathways']
|
|
246 max_pathways = max(max_pathways, len(reaction.annotation['pathways']))
|
|
247 else:
|
|
248 reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
|
418
|
249 else:
|
503
|
250 # No pathway annotation - use empty list
|
|
251 reaction_pathways[reaction.id] = []
|
418
|
252
|
|
253 # Create column names for pathways
|
|
254 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
|
|
255
|
|
256 # Second pass: create the data
|
|
257 for reaction_id, pathways in reaction_pathways.items():
|
|
258 row = {"ReactionID": reaction_id}
|
|
259
|
|
260 # Fill pathway columns
|
|
261 for i in range(max_pathways):
|
|
262 col_name = pathway_columns[i]
|
|
263 if i < len(pathways):
|
|
264 row[col_name] = pathways[i]
|
|
265 else:
|
|
266 row[col_name] = None # or "" if you prefer empty strings
|
|
267
|
|
268 pathway_data.append(row)
|
|
269
|
419
|
270 return pd.DataFrame(pathway_data)
|
|
271
|
505
|
272 def set_annotation_pathways_from_data(model: cobraModel, df: pd.DataFrame):
|
|
273 """Set reaction pathways based on 'Pathway_1', 'Pathway_2', ... columns in the dataframe."""
|
|
274 pathway_cols = [col for col in df.columns if col.startswith('Pathway_')]
|
|
275 if not pathway_cols:
|
|
276 print("No 'Pathway_' columns found, skipping pathway annotation")
|
|
277 return
|
|
278
|
|
279 pathway_data = defaultdict(list)
|
|
280
|
|
281 for idx, row in df.iterrows():
|
|
282 reaction_id = str(row['ReactionID']).strip()
|
|
283 if reaction_id not in model.reactions:
|
|
284 continue
|
|
285
|
|
286 pathways = []
|
|
287 for col in pathway_cols:
|
|
288 if pd.notna(row[col]) and str(row[col]).strip():
|
|
289 pathways.append(str(row[col]).strip())
|
|
290
|
|
291 if pathways:
|
|
292
|
|
293 reaction = model.reactions.get_by_id(reaction_id)
|
|
294 if len(pathways) == 1:
|
|
295 reaction.annotation['pathways'] = pathways[0]
|
|
296 else:
|
|
297 reaction.annotation['pathways'] = pathways
|
419
|
298
|
505
|
299 pathway_data[reaction_id] = pathways
|
|
300
|
|
301 print(f"Annotated {len(pathway_data)} reactions with pathways.")
|
419
|
302
|
|
303 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
|
|
304 """
|
456
|
305 Build a COBRApy model from a tabular file with reaction data.
|
|
306
|
419
|
307 Args:
|
456
|
308 csv_path: Path to the tab-separated file.
|
|
309 model_id: ID for the newly created model.
|
|
310
|
419
|
311 Returns:
|
456
|
312 cobra.Model: The constructed COBRApy model.
|
419
|
313 """
|
|
314
|
501
|
315 # Try to detect separator
|
|
316 with open(csv_path, 'r') as f:
|
|
317 first_line = f.readline()
|
|
318 sep = '\t' if '\t' in first_line else ','
|
|
319
|
|
320 df = pd.read_csv(csv_path, sep=sep)
|
|
321
|
|
322 # Check required columns
|
|
323 required_cols = ['ReactionID', 'Formula']
|
|
324 missing_cols = [col for col in required_cols if col not in df.columns]
|
|
325 if missing_cols:
|
|
326 raise ValueError(f"Missing required columns: {missing_cols}. Available columns: {list(df.columns)}")
|
419
|
327
|
|
328 model = cobraModel(model_id)
|
|
329
|
|
330 metabolites_dict = {}
|
|
331 compartments_dict = {}
|
|
332
|
456
|
333 print(f"Building model from {len(df)} reactions...")
|
419
|
334
|
|
335 for idx, row in df.iterrows():
|
448
|
336 reaction_formula = str(row['Formula']).strip()
|
419
|
337 if not reaction_formula or reaction_formula == 'nan':
|
|
338 continue
|
|
339
|
|
340 metabolites = extract_metabolites_from_reaction(reaction_formula)
|
|
341
|
|
342 for met_id in metabolites:
|
|
343 compartment = extract_compartment_from_metabolite(met_id)
|
|
344
|
|
345 if compartment not in compartments_dict:
|
|
346 compartments_dict[compartment] = compartment
|
|
347
|
|
348 if met_id not in metabolites_dict:
|
|
349 metabolites_dict[met_id] = Metabolite(
|
|
350 id=met_id,
|
|
351 compartment=compartment,
|
|
352 name=met_id.replace(f"_{compartment}", "").replace("__", "_")
|
|
353 )
|
|
354
|
|
355 model.compartments = compartments_dict
|
|
356
|
|
357 model.add_metabolites(list(metabolites_dict.values()))
|
|
358
|
456
|
359 print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments")
|
419
|
360
|
|
361 reactions_added = 0
|
|
362 reactions_skipped = 0
|
|
363
|
|
364 for idx, row in df.iterrows():
|
|
365
|
|
366 reaction_id = str(row['ReactionID']).strip()
|
427
|
367 reaction_formula = str(row['Formula']).strip()
|
419
|
368
|
|
369 if not reaction_formula or reaction_formula == 'nan':
|
456
|
370 raise ValueError(f"Missing reaction formula for {reaction_id}")
|
419
|
371
|
|
372 reaction = Reaction(reaction_id)
|
|
373 reaction.name = reaction_id
|
|
374
|
|
375 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
|
|
376 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
|
|
377
|
427
|
378 if pd.notna(row['GPR']) and str(row['GPR']).strip():
|
|
379 reaction.gene_reaction_rule = str(row['GPR']).strip()
|
419
|
380
|
|
381 try:
|
|
382 parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
|
|
383 except Exception as e:
|
456
|
384 print(f"Error parsing reaction {reaction_id}: {e}")
|
419
|
385 reactions_skipped += 1
|
|
386 continue
|
|
387
|
|
388 model.add_reactions([reaction])
|
|
389 reactions_added += 1
|
|
390
|
|
391
|
456
|
392 print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions")
|
419
|
393
|
430
|
394 # set objective function
|
|
395 set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient")
|
|
396
|
419
|
397 set_medium_from_data(model, df)
|
505
|
398
|
|
399 set_annotation_pathways_from_data(model, df)
|
419
|
400
|
456
|
401 print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites")
|
419
|
402
|
|
403 return model
|
|
404
|
|
405
|
|
406 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
|
499
|
407 #def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
|
|
408 # """
|
|
409 # Extract metabolite IDs from a reaction formula.
|
|
410 # Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e),
|
|
411 # allowing leading digits/underscores.
|
|
412 # """
|
|
413 # metabolites = set()
|
|
414 # # optional coefficient followed by a token ending with _<letters>
|
|
415 # if reaction_formula[-1] == ']' and reaction_formula[-3] == '[':
|
|
416 # pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+[[A-Za-z0-9]]+)'
|
|
417 # else:
|
|
418 # pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[A-Za-z0-9]+)'
|
|
419 # matches = re.findall(pattern, reaction_formula)
|
|
420 # metabolites.update(matches)
|
|
421 # return metabolites
|
|
422
|
|
423
|
419
|
424 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
|
|
425 """
|
500
|
426 Extract metabolite IDs from a reaction formula.
|
|
427
|
|
428 Handles:
|
|
429 - optional stoichiometric coefficients (integers or decimals)
|
|
430 - compartment tags at the end of the metabolite, either [c] or _c
|
|
431
|
|
432 Returns the IDs including the compartment suffix exactly as written.
|
419
|
433 """
|
499
|
434 pattern = re.compile(
|
500
|
435 r'(?:^|(?<=\s)|(?<=\+)|(?<=,)|(?<==)|(?<=:))' # left boundary (start, space, +, comma, =, :)
|
501
|
436 r'(?:\d+(?:\.\d+)?\s+)?' # optional coefficient (requires space after)
|
|
437 r'([A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+))' # metabolite + compartment (can start with number)
|
499
|
438 )
|
|
439 return {m.group(1) for m in pattern.finditer(reaction_formula)}
|
419
|
440
|
|
441
|
500
|
442
|
419
|
443 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
|
456
|
444 """Extract the compartment from a metabolite ID."""
|
500
|
445 if '_' == metabolite_id[-2]:
|
419
|
446 return metabolite_id.split('_')[-1]
|
493
|
447 if metabolite_id[-1] == ']' and metabolite_id[-3] == '[':
|
|
448 return metabolite_id[-2]
|
419
|
449 return 'c' # default cytoplasm
|
|
450
|
|
451
|
|
452 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
|
456
|
453 """Parse a reaction formula and set metabolites with their coefficients."""
|
419
|
454
|
|
455 if '<=>' in formula:
|
501
|
456 parts = formula.split('<=>')
|
419
|
457 reversible = True
|
|
458 elif '<--' in formula:
|
501
|
459 parts = formula.split('<--')
|
419
|
460 reversible = False
|
|
461 elif '-->' in formula:
|
501
|
462 parts = formula.split('-->')
|
419
|
463 reversible = False
|
|
464 elif '<-' in formula:
|
501
|
465 parts = formula.split('<-')
|
419
|
466 reversible = False
|
|
467 else:
|
456
|
468 raise ValueError(f"Unrecognized reaction format: {formula}")
|
419
|
469
|
501
|
470 # Handle cases where one side might be empty (exchange reactions)
|
|
471 if len(parts) != 2:
|
|
472 raise ValueError(f"Invalid reaction format, expected 2 parts: {formula}")
|
|
473
|
|
474 left, right = parts[0].strip(), parts[1].strip()
|
|
475
|
|
476 reactants = parse_metabolites_side(left) if left else {}
|
|
477 products = parse_metabolites_side(right) if right else {}
|
419
|
478
|
|
479 metabolites_to_add = {}
|
|
480
|
|
481 for met_id, coeff in reactants.items():
|
|
482 if met_id in metabolites_dict:
|
|
483 metabolites_to_add[metabolites_dict[met_id]] = -coeff
|
|
484
|
|
485 for met_id, coeff in products.items():
|
|
486 if met_id in metabolites_dict:
|
|
487 metabolites_to_add[metabolites_dict[met_id]] = coeff
|
|
488
|
|
489 reaction.add_metabolites(metabolites_to_add)
|
|
490
|
|
491
|
|
492 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
|
456
|
493 """Parse one side of a reaction and extract metabolites with coefficients."""
|
419
|
494 metabolites = {}
|
|
495 if not side_str or side_str.strip() == '':
|
|
496 return metabolites
|
|
497
|
|
498 terms = side_str.split('+')
|
|
499 for term in terms:
|
|
500 term = term.strip()
|
|
501 if not term:
|
|
502 continue
|
|
503
|
501
|
504 # First check if term has space-separated coefficient and metabolite
|
|
505 parts = term.split()
|
|
506 if len(parts) == 2:
|
|
507 # Two parts: potential coefficient + metabolite
|
|
508 try:
|
|
509 coeff = float(parts[0])
|
|
510 met_id = parts[1]
|
|
511 # Verify the second part looks like a metabolite with compartment
|
|
512 if re.match(r'[A-Za-z0-9_]+(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', met_id):
|
|
513 metabolites[met_id] = coeff
|
|
514 continue
|
|
515 except ValueError:
|
|
516 pass
|
|
517
|
|
518 # Single term - check if it's a metabolite (no coefficient)
|
|
519 # Updated pattern to include metabolites starting with numbers
|
|
520 if re.match(r'[A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', term):
|
|
521 metabolites[term] = 1.0
|
|
522 else:
|
|
523 print(f"Warning: Could not parse metabolite term: '{term}'")
|
419
|
524
|
|
525 return metabolites
|
|
526
|
|
527
|
|
528
|
430
|
529 def set_objective_from_csv(model: cobra.Model, df: pd.DataFrame, obj_col: str = "ObjectiveCoefficient"):
|
419
|
530 """
|
430
|
531 Sets the model's objective function based on a column of coefficients in the CSV.
|
|
532 Can be any reaction(s), not necessarily biomass.
|
419
|
533 """
|
430
|
534 obj_dict = {}
|
419
|
535
|
430
|
536 for idx, row in df.iterrows():
|
|
537 reaction_id = str(row['ReactionID']).strip()
|
|
538 coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0
|
|
539 if coeff != 0:
|
|
540 if reaction_id in model.reactions:
|
|
541 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff
|
|
542 else:
|
|
543 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.")
|
|
544
|
|
545 if not obj_dict:
|
|
546 raise ValueError("No reactions found with non-zero objective coefficient.")
|
|
547
|
|
548 model.objective = obj_dict
|
|
549 print(f"Objective set with {len(obj_dict)} reactions.")
|
|
550
|
|
551
|
419
|
552
|
|
553
|
|
554 def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
|
456
|
555 """Set the medium based on the 'InMedium' column in the dataframe."""
|
501
|
556 if 'InMedium' not in df.columns:
|
|
557 print("No 'InMedium' column found, skipping medium setup")
|
|
558 return
|
|
559
|
419
|
560 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
|
|
561
|
|
562 medium_dict = {}
|
|
563 for rxn_id in medium_reactions:
|
|
564 if rxn_id in [r.id for r in model.reactions]:
|
|
565 reaction = model.reactions.get_by_id(rxn_id)
|
501
|
566 if reaction.lower_bound < 0:
|
419
|
567 medium_dict[rxn_id] = abs(reaction.lower_bound)
|
|
568
|
|
569 if medium_dict:
|
|
570 model.medium = medium_dict
|
456
|
571 print(f"Medium set with {len(medium_dict)} components")
|
501
|
572 else:
|
|
573 print("No medium components found")
|
419
|
574 def validate_model(model: cobraModel) -> Dict[str, any]:
|
456
|
575 """Validate the model and return basic statistics."""
|
419
|
576 validation = {
|
|
577 'num_reactions': len(model.reactions),
|
|
578 'num_metabolites': len(model.metabolites),
|
|
579 'num_genes': len(model.genes),
|
|
580 'num_compartments': len(model.compartments),
|
|
581 'objective': str(model.objective),
|
|
582 'medium_size': len(model.medium),
|
|
583 'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
|
|
584 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
|
|
585 }
|
|
586
|
|
587 try:
|
456
|
588 # Growth test
|
419
|
589 solution = model.optimize()
|
|
590 validation['growth_rate'] = solution.objective_value
|
|
591 validation['status'] = solution.status
|
|
592 except Exception as e:
|
|
593 validation['growth_rate'] = None
|
|
594 validation['status'] = f"Error: {e}"
|
|
595
|
|
596 return validation
|
|
597
|
456
|
598 def convert_genes(model, annotation):
|
|
599 """Rename genes using a selected annotation key in gene.notes; returns a model copy."""
|
419
|
600 from cobra.manipulation import rename_genes
|
|
601 model2=model.copy()
|
|
602 try:
|
|
603 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes}
|
|
604 except:
|
|
605 print("No annotation in gene dict!")
|
|
606 return -1
|
|
607 rename_genes(model2,dict_genes)
|
|
608
|
426
|
609 return model2
|
|
610
|
|
611 # ---------- Utility helpers ----------
|
|
612 def _normalize_colname(col: str) -> str:
|
|
613 return col.strip().lower().replace(' ', '_')
|
|
614
|
|
615 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
|
|
616 """
|
456
|
617 Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}.
|
|
618 Raise ValueError if no suitable mapping is found.
|
426
|
619 """
|
|
620 cols = { _normalize_colname(c): c for c in mapping_df.columns }
|
|
621 chosen = {}
|
456
|
622 # candidate names for each category
|
426
|
623 candidates = {
|
|
624 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
|
|
625 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
|
444
|
626 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'],
|
455
|
627 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'],
|
|
628 'gene_number': ['gene_number']
|
426
|
629 }
|
|
630 for key, names in candidates.items():
|
|
631 for n in names:
|
|
632 if n in cols:
|
|
633 chosen[key] = cols[n]
|
|
634 break
|
|
635 return chosen
|
|
636
|
|
637 def _validate_target_uniqueness(mapping_df: 'pd.DataFrame',
|
|
638 source_col: str,
|
|
639 target_col: str,
|
|
640 model_source_genes: Optional[Set[str]] = None,
|
|
641 logger: Optional[logging.Logger] = None) -> None:
|
|
642 """
|
456
|
643 Check that, within the filtered mapping_df, each target maps to at most one source.
|
|
644 Log examples if duplicates are found.
|
426
|
645 """
|
|
646 if logger is None:
|
|
647 logger = logging.getLogger(__name__)
|
|
648
|
|
649 if mapping_df.empty:
|
|
650 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
|
|
651 return
|
|
652
|
456
|
653 # normalize temporary columns for grouping (without altering the original df)
|
426
|
654 tmp = mapping_df[[source_col, target_col]].copy()
|
503
|
655 tmp['_src_norm'] = tmp[source_col].astype(str).apply(_normalize_gene_id)
|
426
|
656 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
|
|
657
|
456
|
658 # optionally filter to the set of model source genes
|
426
|
659 if model_source_genes is not None:
|
|
660 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
|
|
661
|
|
662 if tmp.empty:
|
|
663 logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.")
|
|
664 return
|
|
665
|
456
|
666 # build reverse mapping: target -> set(sources)
|
426
|
667 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
|
456
|
668 # find targets with more than one source
|
426
|
669 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
|
|
670
|
|
671 if problematic:
|
456
|
672 # prepare warning message with examples (limited subset)
|
455
|
673 sample_items = list(problematic.items())
|
426
|
674 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
|
|
675 for tgt, sources in sample_items:
|
|
676 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}")
|
|
677 full_msg = "\n".join(msg_lines)
|
456
|
678 # log warning
|
455
|
679 logger.warning(full_msg)
|
426
|
680
|
456
|
681 # if everything is fine
|
426
|
682 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
|
|
683
|
|
684
|
|
685 def _normalize_gene_id(g: str) -> str:
|
456
|
686 """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips)."""
|
426
|
687 if g is None:
|
|
688 return ""
|
|
689 g = str(g).strip()
|
|
690 # remove common prefixes
|
|
691 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE)
|
|
692 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
|
|
693 return g
|
|
694
|
493
|
695 def _is_or_only_expression(expr: str) -> bool:
|
|
696 """
|
|
697 Check if a GPR expression contains only OR operators (no AND operators).
|
|
698
|
|
699 Args:
|
|
700 expr: GPR expression string
|
|
701
|
|
702 Returns:
|
|
703 bool: True if expression contains only OR (and parentheses) and has multiple genes, False otherwise
|
|
704 """
|
|
705 if not expr or not expr.strip():
|
|
706 return False
|
|
707
|
|
708 # Normalize the expression
|
|
709 normalized = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
|
|
710
|
|
711 # Check if it contains any AND operators
|
|
712 has_and = ' and ' in normalized.lower()
|
|
713
|
|
714 # Check if it contains OR operators
|
|
715 has_or = ' or ' in normalized.lower()
|
|
716
|
|
717 # Must have OR operators and no AND operators
|
|
718 return has_or and not has_and
|
|
719
|
|
720
|
|
721 def _flatten_or_only_gpr(expr: str) -> str:
|
|
722 """
|
|
723 Flatten a GPR expression that contains only OR operators by:
|
|
724 1. Removing all parentheses
|
|
725 2. Extracting unique gene names
|
|
726 3. Joining them with ' or '
|
|
727
|
|
728 Args:
|
|
729 expr: GPR expression string with only OR operators
|
|
730
|
|
731 Returns:
|
|
732 str: Flattened GPR expression
|
|
733 """
|
|
734 if not expr or not expr.strip():
|
|
735 return expr
|
|
736
|
|
737 # Extract all gene tokens (exclude logical operators and parentheses)
|
|
738 gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
|
|
739 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
|
|
740
|
|
741 tokens = re.findall(gene_pattern, expr)
|
|
742 genes = [t for t in tokens if t not in logical]
|
|
743
|
|
744 # Create set to remove duplicates, then convert back to list to maintain some order
|
|
745 unique_genes = list(dict.fromkeys(genes)) # Preserves insertion order
|
|
746
|
|
747 if len(unique_genes) == 0:
|
|
748 return expr
|
|
749 elif len(unique_genes) == 1:
|
|
750 return unique_genes[0]
|
|
751 else:
|
|
752 return ' or '.join(unique_genes)
|
|
753
|
|
754
|
455
|
755 def _simplify_boolean_expression(expr: str) -> str:
|
|
756 """
|
490
|
757 Simplify a boolean expression by removing duplicates while strictly preserving semantics.
|
|
758 This function handles simple duplicates within parentheses while being conservative about
|
|
759 complex expressions that could change semantics.
|
455
|
760 """
|
|
761 if not expr or not expr.strip():
|
|
762 return expr
|
|
763
|
490
|
764 # Normalize operators and whitespace
|
455
|
765 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
|
490
|
766 expr = ' '.join(expr.split()) # Normalize whitespace
|
455
|
767
|
490
|
768 def simplify_parentheses_content(match_obj):
|
|
769 """Helper function to simplify content within parentheses."""
|
|
770 content = match_obj.group(1) # Content inside parentheses
|
455
|
771
|
490
|
772 # Only simplify if it's a pure OR or pure AND chain
|
|
773 if ' or ' in content and ' and ' not in content:
|
|
774 # Pure OR chain - safe to deduplicate
|
|
775 parts = [p.strip() for p in content.split(' or ') if p.strip()]
|
|
776 unique_parts = []
|
|
777 seen = set()
|
|
778 for part in parts:
|
|
779 if part not in seen:
|
|
780 unique_parts.append(part)
|
|
781 seen.add(part)
|
455
|
782
|
490
|
783 if len(unique_parts) == 1:
|
|
784 return unique_parts[0] # Remove unnecessary parentheses for single items
|
|
785 else:
|
|
786 return '(' + ' or '.join(unique_parts) + ')'
|
|
787
|
|
788 elif ' and ' in content and ' or ' not in content:
|
|
789 # Pure AND chain - safe to deduplicate
|
|
790 parts = [p.strip() for p in content.split(' and ') if p.strip()]
|
|
791 unique_parts = []
|
|
792 seen = set()
|
|
793 for part in parts:
|
|
794 if part not in seen:
|
|
795 unique_parts.append(part)
|
|
796 seen.add(part)
|
455
|
797
|
490
|
798 if len(unique_parts) == 1:
|
|
799 return unique_parts[0] # Remove unnecessary parentheses for single items
|
|
800 else:
|
|
801 return '(' + ' and '.join(unique_parts) + ')'
|
|
802 else:
|
|
803 # Mixed operators or single item - return with parentheses as-is
|
|
804 return '(' + content + ')'
|
|
805
|
|
806 def remove_duplicates_simple(parts_str: str, separator: str) -> str:
|
|
807 """Remove duplicates from a simple chain of operations."""
|
|
808 parts = [p.strip() for p in parts_str.split(separator) if p.strip()]
|
455
|
809
|
490
|
810 # Remove duplicates while preserving order
|
|
811 unique_parts = []
|
|
812 seen = set()
|
|
813 for part in parts:
|
|
814 if part not in seen:
|
|
815 unique_parts.append(part)
|
|
816 seen.add(part)
|
455
|
817
|
490
|
818 if len(unique_parts) == 1:
|
|
819 return unique_parts[0]
|
455
|
820 else:
|
490
|
821 return f' {separator} '.join(unique_parts)
|
455
|
822
|
|
823 try:
|
490
|
824 import re
|
|
825
|
|
826 # First, simplify content within parentheses
|
|
827 # This handles cases like (A or A) -> A and (B and B) -> B
|
|
828 expr_simplified = re.sub(r'\(([^()]+)\)', simplify_parentheses_content, expr)
|
|
829
|
|
830 # Check if the resulting expression has mixed operators
|
|
831 has_and = ' and ' in expr_simplified
|
|
832 has_or = ' or ' in expr_simplified
|
|
833
|
|
834 # Only simplify top-level if it's pure AND or pure OR
|
|
835 if has_and and not has_or and '(' not in expr_simplified:
|
|
836 # Pure AND chain at top level - safe to deduplicate
|
|
837 return remove_duplicates_simple(expr_simplified, 'and')
|
|
838 elif has_or and not has_and and '(' not in expr_simplified:
|
|
839 # Pure OR chain at top level - safe to deduplicate
|
|
840 return remove_duplicates_simple(expr_simplified, 'or')
|
|
841 else:
|
|
842 # Mixed operators or has parentheses - return the simplified version (with parentheses content cleaned)
|
|
843 return expr_simplified
|
|
844
|
455
|
845 except Exception:
|
490
|
846 # If anything goes wrong, return the original expression
|
455
|
847 return expr
|
|
848
|
492
|
849
|
426
|
850 def translate_model_genes(model: 'cobra.Model',
|
|
851 mapping_df: 'pd.DataFrame',
|
|
852 target_nomenclature: str,
|
|
853 source_nomenclature: str = 'hgnc_id',
|
455
|
854 allow_many_to_one: bool = False,
|
490
|
855 logger: Optional[logging.Logger] = None) -> Tuple['cobra.Model', Dict[str, str]]:
|
426
|
856 """
|
456
|
857 Translate model genes from source_nomenclature to target_nomenclature using a mapping table.
|
|
858 mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez).
|
|
859
|
455
|
860 Args:
|
456
|
861 model: COBRA model to translate.
|
|
862 mapping_df: DataFrame containing the mapping information.
|
|
863 target_nomenclature: Desired target key (e.g., 'hgnc_symbol').
|
|
864 source_nomenclature: Current source key in the model (default 'hgnc_id').
|
|
865 allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs.
|
|
866 logger: Optional logger.
|
490
|
867
|
|
868 Returns:
|
|
869 Tuple containing:
|
|
870 - Translated COBRA model
|
|
871 - Dictionary mapping reaction IDs to translation issue descriptions
|
426
|
872 """
|
|
873 if logger is None:
|
|
874 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
875 logger = logging.getLogger(__name__)
|
|
876
|
|
877 logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'")
|
|
878
|
|
879 # normalize column names and choose relevant columns
|
|
880 chosen = _choose_columns(mapping_df)
|
|
881 if not chosen:
|
|
882 raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.")
|
|
883
|
|
884 # map source/target to actual dataframe column names (allow user-specified source/target keys)
|
|
885 # normalize input args
|
|
886 src_key = source_nomenclature.strip().lower()
|
|
887 tgt_key = target_nomenclature.strip().lower()
|
|
888
|
|
889 # try to find the actual column names for requested keys
|
|
890 col_for_src = None
|
|
891 col_for_tgt = None
|
|
892 # first, try exact match
|
|
893 for k, actual in chosen.items():
|
|
894 if k == src_key:
|
|
895 col_for_src = actual
|
|
896 if k == tgt_key:
|
|
897 col_for_tgt = actual
|
|
898
|
|
899 # if not found, try mapping common names
|
|
900 if col_for_src is None:
|
|
901 possible_src_names = {k: v for k, v in chosen.items()}
|
|
902 # try to match by contained substring
|
|
903 for k, actual in possible_src_names.items():
|
|
904 if src_key in k:
|
|
905 col_for_src = actual
|
|
906 break
|
|
907
|
|
908 if col_for_tgt is None:
|
|
909 for k, actual in chosen.items():
|
|
910 if tgt_key in k:
|
|
911 col_for_tgt = actual
|
|
912 break
|
|
913
|
|
914 if col_for_src is None:
|
|
915 raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
|
|
916 if col_for_tgt is None:
|
|
917 raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
|
|
918
|
|
919 model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
|
|
920 logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
|
|
921
|
|
922 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy()
|
503
|
923 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).apply(_normalize_gene_id)
|
426
|
924
|
|
925 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
|
|
926
|
|
927 if filtered_map.empty:
|
|
928 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
|
|
929
|
455
|
930 if not allow_many_to_one:
|
|
931 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
|
426
|
932
|
455
|
933 # Crea il mapping
|
426
|
934 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
|
|
935
|
|
936 # copy model
|
|
937 model_copy = model.copy()
|
|
938
|
|
939 # statistics
|
493
|
940 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0, 'flattened_or_gprs': 0}
|
426
|
941 unmapped = []
|
|
942 multi = []
|
490
|
943
|
|
944 # Dictionary to store translation issues per reaction
|
|
945 reaction_translation_issues = {}
|
426
|
946
|
|
947 original_genes = {g.id for g in model_copy.genes}
|
|
948 logger.info(f"Original genes count: {len(original_genes)}")
|
|
949
|
|
950 # translate GPRs
|
|
951 for rxn in model_copy.reactions:
|
|
952 gpr = rxn.gene_reaction_rule
|
|
953 if gpr and gpr.strip():
|
490
|
954 new_gpr, rxn_issues = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger, track_issues=True)
|
|
955 if rxn_issues:
|
|
956 reaction_translation_issues[rxn.id] = rxn_issues
|
|
957
|
426
|
958 if new_gpr != gpr:
|
493
|
959 # Check if this GPR has translation issues and contains only OR operators
|
|
960 if rxn_issues and _is_or_only_expression(new_gpr):
|
|
961 # Flatten the GPR: remove parentheses and create set of unique genes
|
|
962 flattened_gpr = _flatten_or_only_gpr(new_gpr)
|
|
963 if flattened_gpr != new_gpr:
|
|
964 stats['flattened_or_gprs'] += 1
|
|
965 logger.debug(f"Flattened OR-only GPR with issues for {rxn.id}: '{new_gpr}' -> '{flattened_gpr}'")
|
|
966 new_gpr = flattened_gpr
|
|
967
|
455
|
968 simplified_gpr = _simplify_boolean_expression(new_gpr)
|
|
969 if simplified_gpr != new_gpr:
|
|
970 stats['simplified_gprs'] += 1
|
|
971 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
|
|
972 rxn.gene_reaction_rule = simplified_gpr
|
|
973 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'")
|
426
|
974
|
|
975 # update model genes based on new GPRs
|
|
976 _update_model_genes(model_copy, logger)
|
|
977
|
|
978 # final logging
|
|
979 _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger)
|
|
980
|
|
981 logger.info("Translation finished")
|
490
|
982 return model_copy, reaction_translation_issues
|
426
|
983
|
|
984
|
|
985 # ---------- helper functions ----------
|
|
986 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]:
|
|
987 """
|
|
988 Build mapping dict: source_id -> list of target_ids
|
|
989 Normalizes IDs (removes prefixes like 'HGNC:' etc).
|
|
990 """
|
|
991 df = mapping_df[[source_col, target_col]].dropna().copy()
|
|
992 # normalize to string
|
503
|
993 df[source_col] = df[source_col].astype(str).apply(_normalize_gene_id)
|
426
|
994 df[target_col] = df[target_col].astype(str).str.strip()
|
|
995
|
|
996 df = df.drop_duplicates()
|
|
997
|
|
998 logger.info(f"Creating mapping from {len(df)} rows")
|
|
999
|
|
1000 mapping = defaultdict(list)
|
|
1001 for _, row in df.iterrows():
|
|
1002 s = row[source_col]
|
|
1003 t = row[target_col]
|
|
1004 if t not in mapping[s]:
|
|
1005 mapping[s].append(t)
|
|
1006
|
|
1007 # stats
|
|
1008 one_to_one = sum(1 for v in mapping.values() if len(v) == 1)
|
|
1009 one_to_many = sum(1 for v in mapping.values() if len(v) > 1)
|
|
1010 logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many")
|
|
1011 return dict(mapping)
|
|
1012
|
|
1013
|
|
1014 def _translate_gpr(gpr_string: str,
|
|
1015 gene_mapping: Dict[str, List[str]],
|
|
1016 stats: Dict[str, int],
|
|
1017 unmapped_genes: List[str],
|
|
1018 multi_mapping_genes: List[Tuple[str, List[str]]],
|
490
|
1019 logger: logging.Logger,
|
|
1020 track_issues: bool = False) -> Union[str, Tuple[str, str]]:
|
426
|
1021 """
|
|
1022 Translate genes inside a GPR string using gene_mapping.
|
490
|
1023 Returns new GPR string, and optionally translation issues if track_issues=True.
|
426
|
1024 """
|
|
1025 # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols)
|
|
1026 token_pattern = r'\b[A-Za-z0-9:_.-]+\b'
|
|
1027 tokens = re.findall(token_pattern, gpr_string)
|
|
1028
|
|
1029 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
|
|
1030 tokens = [t for t in tokens if t not in logical]
|
|
1031
|
|
1032 new_gpr = gpr_string
|
490
|
1033 issues = []
|
426
|
1034
|
|
1035 for token in sorted(set(tokens), key=lambda x: -len(x)): # longer tokens first to avoid partial replacement
|
|
1036 norm = _normalize_gene_id(token)
|
|
1037 if norm in gene_mapping:
|
|
1038 targets = gene_mapping[norm]
|
|
1039 stats['translated'] += 1
|
|
1040 if len(targets) == 1:
|
|
1041 stats['one_to_one'] += 1
|
|
1042 replacement = targets[0]
|
|
1043 else:
|
|
1044 stats['one_to_many'] += 1
|
|
1045 multi_mapping_genes.append((token, targets))
|
|
1046 replacement = "(" + " or ".join(targets) + ")"
|
490
|
1047 if track_issues:
|
|
1048 issues.append(f"{token} -> {' or '.join(targets)}")
|
426
|
1049
|
|
1050 pattern = r'\b' + re.escape(token) + r'\b'
|
|
1051 new_gpr = re.sub(pattern, replacement, new_gpr)
|
|
1052 else:
|
|
1053 stats['not_found'] += 1
|
|
1054 if token not in unmapped_genes:
|
|
1055 unmapped_genes.append(token)
|
|
1056 logger.debug(f"Token not found in mapping (left as-is): {token}")
|
|
1057
|
490
|
1058 # Check for many-to-one cases (multiple source genes mapping to same target)
|
|
1059 if track_issues:
|
|
1060 # Build reverse mapping to detect many-to-one cases from original tokens
|
|
1061 original_to_target = {}
|
|
1062
|
|
1063 for orig_token in tokens:
|
|
1064 norm = _normalize_gene_id(orig_token)
|
|
1065 if norm in gene_mapping:
|
|
1066 targets = gene_mapping[norm]
|
|
1067 for target in targets:
|
|
1068 if target not in original_to_target:
|
|
1069 original_to_target[target] = []
|
|
1070 if orig_token not in original_to_target[target]:
|
|
1071 original_to_target[target].append(orig_token)
|
|
1072
|
|
1073 # Identify many-to-one mappings in this specific GPR
|
|
1074 for target, original_genes in original_to_target.items():
|
|
1075 if len(original_genes) > 1:
|
|
1076 issues.append(f"{' or '.join(original_genes)} -> {target}")
|
|
1077
|
|
1078 issue_text = "; ".join(issues) if issues else ""
|
|
1079
|
|
1080 if track_issues:
|
|
1081 return new_gpr, issue_text
|
|
1082 else:
|
|
1083 return new_gpr
|
426
|
1084
|
|
1085
|
|
1086 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger):
|
|
1087 """
|
|
1088 Rebuild model.genes from gene_reaction_rule content.
|
|
1089 Removes genes not referenced and adds missing ones.
|
|
1090 """
|
|
1091 # collect genes in GPRs
|
|
1092 gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
|
|
1093 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
|
|
1094 genes_in_gpr: Set[str] = set()
|
|
1095
|
|
1096 for rxn in model.reactions:
|
|
1097 gpr = rxn.gene_reaction_rule
|
|
1098 if gpr and gpr.strip():
|
|
1099 toks = re.findall(gene_pattern, gpr)
|
|
1100 toks = [t for t in toks if t not in logical]
|
|
1101 # normalize IDs consistent with mapping normalization
|
|
1102 toks = [_normalize_gene_id(t) for t in toks]
|
|
1103 genes_in_gpr.update(toks)
|
|
1104
|
|
1105 # existing gene ids
|
|
1106 existing = {g.id for g in model.genes}
|
|
1107
|
|
1108 # remove obsolete genes
|
|
1109 to_remove = [gid for gid in existing if gid not in genes_in_gpr]
|
|
1110 removed = 0
|
|
1111 for gid in to_remove:
|
|
1112 try:
|
|
1113 gene_obj = model.genes.get_by_id(gid)
|
|
1114 model.genes.remove(gene_obj)
|
|
1115 removed += 1
|
|
1116 except Exception:
|
|
1117 # safe-ignore
|
|
1118 pass
|
|
1119
|
|
1120 # add new genes
|
|
1121 added = 0
|
|
1122 for gid in genes_in_gpr:
|
|
1123 if gid not in existing:
|
|
1124 new_gene = cobra.Gene(gid)
|
|
1125 try:
|
|
1126 model.genes.add(new_gene)
|
|
1127 except Exception:
|
|
1128 # fallback: if model.genes doesn't support add, try append or model.add_genes
|
|
1129 try:
|
|
1130 model.genes.append(new_gene)
|
|
1131 except Exception:
|
|
1132 try:
|
|
1133 model.add_genes([new_gene])
|
|
1134 except Exception:
|
|
1135 logger.warning(f"Could not add gene object for {gid}")
|
|
1136 added += 1
|
|
1137
|
|
1138 logger.info(f"Model genes updated: removed {removed}, added {added}")
|
|
1139
|
|
1140
|
|
1141 def _log_translation_statistics(stats: Dict[str, int],
|
|
1142 unmapped_genes: List[str],
|
|
1143 multi_mapping_genes: List[Tuple[str, List[str]]],
|
|
1144 original_genes: Set[str],
|
|
1145 final_genes,
|
|
1146 logger: logging.Logger):
|
|
1147 logger.info("=== TRANSLATION STATISTICS ===")
|
|
1148 logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
|
|
1149 logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
|
455
|
1150 logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}")
|
493
|
1151 logger.info(f"Flattened OR-only GPRs with issues: {stats.get('flattened_or_gprs', 0)}")
|
426
|
1152
|
|
1153 final_ids = {g.id for g in final_genes}
|
|
1154 logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")
|
|
1155
|
|
1156 if unmapped_genes:
|
|
1157 logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}")
|
|
1158 if multi_mapping_genes:
|
|
1159 logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):")
|
|
1160 for orig, targets in multi_mapping_genes[:10]:
|
490
|
1161 logger.info(f" {orig} -> {', '.join(targets)}")
|
493
|
1162
|
|
1163 # Log summary of flattened GPRs if any
|
|
1164 if stats.get('flattened_or_gprs', 0) > 0:
|
|
1165 logger.info(f"Flattened {stats['flattened_or_gprs']} OR-only GPRs that had translation issues (removed parentheses, created unique gene sets)")
|
490
|
1166
|
|
1167 |