comparison COBRAxy/utils/model_utils.py @ 490:c6ea189ea7e9 draft

Uploaded
author francesco_lapi
date Mon, 29 Sep 2025 15:13:21 +0000
parents a6e45049c1b9
children 4ed95023af20
comparison
equal deleted inserted replaced
489:97eea560a10f 490:c6ea189ea7e9
16 from typing import Optional, Tuple, Union, List, Dict, Set 16 from typing import Optional, Tuple, Union, List, Dict, Set
17 from collections import defaultdict 17 from collections import defaultdict
18 import utils.rule_parsing as rulesUtils 18 import utils.rule_parsing as rulesUtils
19 import utils.reaction_parsing as reactionUtils 19 import utils.reaction_parsing as reactionUtils
20 from cobra import Model as cobraModel, Reaction, Metabolite 20 from cobra import Model as cobraModel, Reaction, Metabolite
21 import sys
22
23
24 ############################ check_methods ####################################
25 def gene_type(l :str, name :str) -> str:
26 """
27 Determine the type of gene ID.
28
29 Args:
30 l (str): The gene identifier to check.
31 name (str): The name of the dataset, used in error messages.
32
33 Returns:
34 str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id').
35
36 Raises:
37 sys.exit: If the gene ID type is not supported, the execution is aborted.
38 """
39 if check_hgnc(l):
40 return 'hugo_id'
41 elif check_ensembl(l):
42 return 'ensembl_gene_id'
43 elif check_symbol(l):
44 return 'symbol'
45 elif check_entrez(l):
46 return 'entrez_id'
47 else:
48 sys.exit('Execution aborted:\n' +
49 'gene ID type in ' + name + ' not supported. Supported ID'+
50 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
51
52 def check_hgnc(l :str) -> bool:
53 """
54 Check if a gene identifier follows the HGNC format.
55
56 Args:
57 l (str): The gene identifier to check.
58
59 Returns:
60 bool: True if the gene identifier follows the HGNC format, False otherwise.
61 """
62 if len(l) > 5:
63 if (l.upper()).startswith('HGNC:'):
64 return l[5:].isdigit()
65 else:
66 return False
67 else:
68 return False
69
70 def check_ensembl(l :str) -> bool:
71 """
72 Check if a gene identifier follows the Ensembl format.
73
74 Args:
75 l (str): The gene identifier to check.
76
77 Returns:
78 bool: True if the gene identifier follows the Ensembl format, False otherwise.
79 """
80 return l.upper().startswith('ENS')
81
82
83 def check_symbol(l :str) -> bool:
84 """
85 Check if a gene identifier follows the symbol format.
86
87 Args:
88 l (str): The gene identifier to check.
89
90 Returns:
91 bool: True if the gene identifier follows the symbol format, False otherwise.
92 """
93 if len(l) > 0:
94 if l[0].isalpha() and l[1:].isalnum():
95 return True
96 else:
97 return False
98 else:
99 return False
100
101 def check_entrez(l :str) -> bool:
102 """
103 Check if a gene identifier follows the Entrez ID format.
104
105 Args:
106 l (str): The gene identifier to check.
107
108 Returns:
109 bool: True if the gene identifier follows the Entrez ID format, False otherwise.
110 """
111 if len(l) > 0:
112 return l.isdigit()
113 else:
114 return False
21 115
22 ################################- DATA GENERATION -################################ 116 ################################- DATA GENERATION -################################
23 ReactionId = str 117 ReactionId = str
24 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: 118 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
25 """ 119 """
504 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) 598 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
505 return g 599 return g
506 600
507 def _simplify_boolean_expression(expr: str) -> str: 601 def _simplify_boolean_expression(expr: str) -> str:
508 """ 602 """
509 Simplify a boolean expression by removing duplicates and redundancies. 603 Simplify a boolean expression by removing duplicates while strictly preserving semantics.
510 Handles expressions with 'and' and 'or'. 604 This function handles simple duplicates within parentheses while being conservative about
605 complex expressions that could change semantics.
511 """ 606 """
512 if not expr or not expr.strip(): 607 if not expr or not expr.strip():
513 return expr 608 return expr
514 609
515 # normalize operators 610 # Normalize operators and whitespace
516 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') 611 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
517 612 expr = ' '.join(expr.split()) # Normalize whitespace
518 # recursive helper to process expressions 613
519 def process_expression(s: str) -> str: 614 def simplify_parentheses_content(match_obj):
520 s = s.strip() 615 """Helper function to simplify content within parentheses."""
521 if not s: 616 content = match_obj.group(1) # Content inside parentheses
522 return s 617
618 # Only simplify if it's a pure OR or pure AND chain
619 if ' or ' in content and ' and ' not in content:
620 # Pure OR chain - safe to deduplicate
621 parts = [p.strip() for p in content.split(' or ') if p.strip()]
622 unique_parts = []
623 seen = set()
624 for part in parts:
625 if part not in seen:
626 unique_parts.append(part)
627 seen.add(part)
523 628
524 # handle parentheses 629 if len(unique_parts) == 1:
525 while '(' in s: 630 return unique_parts[0] # Remove unnecessary parentheses for single items
526 # find the innermost parentheses
527 start = -1
528 for i, c in enumerate(s):
529 if c == '(':
530 start = i
531 elif c == ')' and start != -1:
532 # process inner content
533 inner = s[start+1:i]
534 processed_inner = process_expression(inner)
535 s = s[:start] + processed_inner + s[i+1:]
536 break
537 else: 631 else:
538 break 632 return '(' + ' or '.join(unique_parts) + ')'
539 633
540 # split by 'or' at top level 634 elif ' and ' in content and ' or ' not in content:
541 or_parts = [] 635 # Pure AND chain - safe to deduplicate
542 current_part = "" 636 parts = [p.strip() for p in content.split(' and ') if p.strip()]
543 paren_count = 0 637 unique_parts = []
544 638 seen = set()
545 tokens = s.split() 639 for part in parts:
546 i = 0 640 if part not in seen:
547 while i < len(tokens): 641 unique_parts.append(part)
548 token = tokens[i] 642 seen.add(part)
549 if token == 'or' and paren_count == 0: 643
550 if current_part.strip(): 644 if len(unique_parts) == 1:
551 or_parts.append(current_part.strip()) 645 return unique_parts[0] # Remove unnecessary parentheses for single items
552 current_part = ""
553 else: 646 else:
554 if token.count('(') > token.count(')'): 647 return '(' + ' and '.join(unique_parts) + ')'
555 paren_count += token.count('(') - token.count(')') 648 else:
556 elif token.count(')') > token.count('('): 649 # Mixed operators or single item - return with parentheses as-is
557 paren_count -= token.count(')') - token.count('(') 650 return '(' + content + ')'
558 current_part += token + " " 651
559 i += 1 652 def remove_duplicates_simple(parts_str: str, separator: str) -> str:
560 653 """Remove duplicates from a simple chain of operations."""
561 if current_part.strip(): 654 parts = [p.strip() for p in parts_str.split(separator) if p.strip()]
562 or_parts.append(current_part.strip()) 655
563 656 # Remove duplicates while preserving order
564 # process each OR part 657 unique_parts = []
565 processed_or_parts = [] 658 seen = set()
566 for or_part in or_parts: 659 for part in parts:
567 # split by 'and' within each OR part 660 if part not in seen:
568 and_parts = [] 661 unique_parts.append(part)
569 current_and = "" 662 seen.add(part)
570 paren_count = 0 663
664 if len(unique_parts) == 1:
665 return unique_parts[0]
666 else:
667 return f' {separator} '.join(unique_parts)
668
669 try:
670 import re
671
672 # First, simplify content within parentheses
673 # This handles cases like (A or A) -> A and (B and B) -> B
674 expr_simplified = re.sub(r'\(([^()]+)\)', simplify_parentheses_content, expr)
675
676 # Check if the resulting expression has mixed operators
677 has_and = ' and ' in expr_simplified
678 has_or = ' or ' in expr_simplified
679
680 # Only simplify top-level if it's pure AND or pure OR
681 if has_and and not has_or and '(' not in expr_simplified:
682 # Pure AND chain at top level - safe to deduplicate
683 return remove_duplicates_simple(expr_simplified, 'and')
684 elif has_or and not has_and and '(' not in expr_simplified:
685 # Pure OR chain at top level - safe to deduplicate
686 return remove_duplicates_simple(expr_simplified, 'or')
687 else:
688 # Mixed operators or has parentheses - return the simplified version (with parentheses content cleaned)
689 return expr_simplified
571 690
572 and_tokens = or_part.split()
573 j = 0
574 while j < len(and_tokens):
575 token = and_tokens[j]
576 if token == 'and' and paren_count == 0:
577 if current_and.strip():
578 and_parts.append(current_and.strip())
579 current_and = ""
580 else:
581 if token.count('(') > token.count(')'):
582 paren_count += token.count('(') - token.count(')')
583 elif token.count(')') > token.count('('):
584 paren_count -= token.count(')') - token.count('(')
585 current_and += token + " "
586 j += 1
587
588 if current_and.strip():
589 and_parts.append(current_and.strip())
590
591 # deduplicate AND parts
592 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine
593
594 if len(unique_and_parts) == 1:
595 processed_or_parts.append(unique_and_parts[0])
596 elif len(unique_and_parts) > 1:
597 processed_or_parts.append(" and ".join(unique_and_parts))
598
599 # deduplicate OR parts
600 unique_or_parts = list(dict.fromkeys(processed_or_parts))
601
602 if len(unique_or_parts) == 1:
603 return unique_or_parts[0]
604 elif len(unique_or_parts) > 1:
605 return " or ".join(unique_or_parts)
606 else:
607 return ""
608
609 try:
610 return process_expression(expr)
611 except Exception: 691 except Exception:
612 # if simplification fails, return the original expression 692 # If anything goes wrong, return the original expression
613 return expr 693 return expr
614 694
615 # ---------- Main public function ---------- 695 # ---------- Main public function ----------
616 def translate_model_genes(model: 'cobra.Model', 696 def translate_model_genes(model: 'cobra.Model',
617 mapping_df: 'pd.DataFrame', 697 mapping_df: 'pd.DataFrame',
618 target_nomenclature: str, 698 target_nomenclature: str,
619 source_nomenclature: str = 'hgnc_id', 699 source_nomenclature: str = 'hgnc_id',
620 allow_many_to_one: bool = False, 700 allow_many_to_one: bool = False,
621 logger: Optional[logging.Logger] = None) -> 'cobra.Model': 701 logger: Optional[logging.Logger] = None) -> Tuple['cobra.Model', Dict[str, str]]:
622 """ 702 """
623 Translate model genes from source_nomenclature to target_nomenclature using a mapping table. 703 Translate model genes from source_nomenclature to target_nomenclature using a mapping table.
624 mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez). 704 mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez).
625 705
626 Args: 706 Args:
628 mapping_df: DataFrame containing the mapping information. 708 mapping_df: DataFrame containing the mapping information.
629 target_nomenclature: Desired target key (e.g., 'hgnc_symbol'). 709 target_nomenclature: Desired target key (e.g., 'hgnc_symbol').
630 source_nomenclature: Current source key in the model (default 'hgnc_id'). 710 source_nomenclature: Current source key in the model (default 'hgnc_id').
631 allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs. 711 allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs.
632 logger: Optional logger. 712 logger: Optional logger.
713
714 Returns:
715 Tuple containing:
716 - Translated COBRA model
717 - Dictionary mapping reaction IDs to translation issue descriptions
633 """ 718 """
634 if logger is None: 719 if logger is None:
635 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') 720 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
636 logger = logging.getLogger(__name__) 721 logger = logging.getLogger(__name__)
637 722
699 784
700 # statistics 785 # statistics
701 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0} 786 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0}
702 unmapped = [] 787 unmapped = []
703 multi = [] 788 multi = []
789
790 # Dictionary to store translation issues per reaction
791 reaction_translation_issues = {}
704 792
705 original_genes = {g.id for g in model_copy.genes} 793 original_genes = {g.id for g in model_copy.genes}
706 logger.info(f"Original genes count: {len(original_genes)}") 794 logger.info(f"Original genes count: {len(original_genes)}")
707 795
708 # translate GPRs 796 # translate GPRs
709 for rxn in model_copy.reactions: 797 for rxn in model_copy.reactions:
710 gpr = rxn.gene_reaction_rule 798 gpr = rxn.gene_reaction_rule
711 if gpr and gpr.strip(): 799 if gpr and gpr.strip():
712 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) 800 new_gpr, rxn_issues = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger, track_issues=True)
801 if rxn_issues:
802 reaction_translation_issues[rxn.id] = rxn_issues
803
713 if new_gpr != gpr: 804 if new_gpr != gpr:
714 simplified_gpr = _simplify_boolean_expression(new_gpr) 805 simplified_gpr = _simplify_boolean_expression(new_gpr)
715 if simplified_gpr != new_gpr: 806 if simplified_gpr != new_gpr:
716 stats['simplified_gprs'] += 1 807 stats['simplified_gprs'] += 1
717 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") 808 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
723 814
724 # final logging 815 # final logging
725 _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger) 816 _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger)
726 817
727 logger.info("Translation finished") 818 logger.info("Translation finished")
728 return model_copy 819 return model_copy, reaction_translation_issues
729 820
730 821
731 # ---------- helper functions ---------- 822 # ---------- helper functions ----------
732 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]: 823 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]:
733 """ 824 """
760 def _translate_gpr(gpr_string: str, 851 def _translate_gpr(gpr_string: str,
761 gene_mapping: Dict[str, List[str]], 852 gene_mapping: Dict[str, List[str]],
762 stats: Dict[str, int], 853 stats: Dict[str, int],
763 unmapped_genes: List[str], 854 unmapped_genes: List[str],
764 multi_mapping_genes: List[Tuple[str, List[str]]], 855 multi_mapping_genes: List[Tuple[str, List[str]]],
765 logger: logging.Logger) -> str: 856 logger: logging.Logger,
857 track_issues: bool = False) -> Union[str, Tuple[str, str]]:
766 """ 858 """
767 Translate genes inside a GPR string using gene_mapping. 859 Translate genes inside a GPR string using gene_mapping.
768 Returns new GPR string. 860 Returns new GPR string, and optionally translation issues if track_issues=True.
769 """ 861 """
770 # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols) 862 # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols)
771 token_pattern = r'\b[A-Za-z0-9:_.-]+\b' 863 token_pattern = r'\b[A-Za-z0-9:_.-]+\b'
772 tokens = re.findall(token_pattern, gpr_string) 864 tokens = re.findall(token_pattern, gpr_string)
773 865
774 logical = {'and', 'or', 'AND', 'OR', '(', ')'} 866 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
775 tokens = [t for t in tokens if t not in logical] 867 tokens = [t for t in tokens if t not in logical]
776 868
777 new_gpr = gpr_string 869 new_gpr = gpr_string
870 issues = []
778 871
779 for token in sorted(set(tokens), key=lambda x: -len(x)): # longer tokens first to avoid partial replacement 872 for token in sorted(set(tokens), key=lambda x: -len(x)): # longer tokens first to avoid partial replacement
780 norm = _normalize_gene_id(token) 873 norm = _normalize_gene_id(token)
781 if norm in gene_mapping: 874 if norm in gene_mapping:
782 targets = gene_mapping[norm] 875 targets = gene_mapping[norm]
786 replacement = targets[0] 879 replacement = targets[0]
787 else: 880 else:
788 stats['one_to_many'] += 1 881 stats['one_to_many'] += 1
789 multi_mapping_genes.append((token, targets)) 882 multi_mapping_genes.append((token, targets))
790 replacement = "(" + " or ".join(targets) + ")" 883 replacement = "(" + " or ".join(targets) + ")"
884 if track_issues:
885 issues.append(f"{token} -> {' or '.join(targets)}")
791 886
792 pattern = r'\b' + re.escape(token) + r'\b' 887 pattern = r'\b' + re.escape(token) + r'\b'
793 new_gpr = re.sub(pattern, replacement, new_gpr) 888 new_gpr = re.sub(pattern, replacement, new_gpr)
794 else: 889 else:
795 stats['not_found'] += 1 890 stats['not_found'] += 1
796 if token not in unmapped_genes: 891 if token not in unmapped_genes:
797 unmapped_genes.append(token) 892 unmapped_genes.append(token)
798 logger.debug(f"Token not found in mapping (left as-is): {token}") 893 logger.debug(f"Token not found in mapping (left as-is): {token}")
799 894
800 return new_gpr 895 # Check for many-to-one cases (multiple source genes mapping to same target)
896 if track_issues:
897 # Build reverse mapping to detect many-to-one cases from original tokens
898 original_to_target = {}
899
900 for orig_token in tokens:
901 norm = _normalize_gene_id(orig_token)
902 if norm in gene_mapping:
903 targets = gene_mapping[norm]
904 for target in targets:
905 if target not in original_to_target:
906 original_to_target[target] = []
907 if orig_token not in original_to_target[target]:
908 original_to_target[target].append(orig_token)
909
910 # Identify many-to-one mappings in this specific GPR
911 for target, original_genes in original_to_target.items():
912 if len(original_genes) > 1:
913 issues.append(f"{' or '.join(original_genes)} -> {target}")
914
915 issue_text = "; ".join(issues) if issues else ""
916
917 if track_issues:
918 return new_gpr, issue_text
919 else:
920 return new_gpr
801 921
802 922
803 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger): 923 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger):
804 """ 924 """
805 Rebuild model.genes from gene_reaction_rule content. 925 Rebuild model.genes from gene_reaction_rule content.
873 logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}") 993 logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}")
874 if multi_mapping_genes: 994 if multi_mapping_genes:
875 logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):") 995 logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):")
876 for orig, targets in multi_mapping_genes[:10]: 996 for orig, targets in multi_mapping_genes[:10]:
877 logger.info(f" {orig} -> {', '.join(targets)}") 997 logger.info(f" {orig} -> {', '.join(targets)}")
998
999