Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/model_utils.py @ 490:c6ea189ea7e9 draft
Uploaded
author | francesco_lapi |
---|---|
date | Mon, 29 Sep 2025 15:13:21 +0000 |
parents | a6e45049c1b9 |
children | 4ed95023af20 |
comparison
equal
deleted
inserted
replaced
489:97eea560a10f | 490:c6ea189ea7e9 |
---|---|
16 from typing import Optional, Tuple, Union, List, Dict, Set | 16 from typing import Optional, Tuple, Union, List, Dict, Set |
17 from collections import defaultdict | 17 from collections import defaultdict |
18 import utils.rule_parsing as rulesUtils | 18 import utils.rule_parsing as rulesUtils |
19 import utils.reaction_parsing as reactionUtils | 19 import utils.reaction_parsing as reactionUtils |
20 from cobra import Model as cobraModel, Reaction, Metabolite | 20 from cobra import Model as cobraModel, Reaction, Metabolite |
21 import sys | |
22 | |
23 | |
24 ############################ check_methods #################################### | |
25 def gene_type(l :str, name :str) -> str: | |
26 """ | |
27 Determine the type of gene ID. | |
28 | |
29 Args: | |
30 l (str): The gene identifier to check. | |
31 name (str): The name of the dataset, used in error messages. | |
32 | |
33 Returns: | |
34 str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id'). | |
35 | |
36 Raises: | |
37 sys.exit: If the gene ID type is not supported, the execution is aborted. | |
38 """ | |
39 if check_hgnc(l): | |
40 return 'hugo_id' | |
41 elif check_ensembl(l): | |
42 return 'ensembl_gene_id' | |
43 elif check_symbol(l): | |
44 return 'symbol' | |
45 elif check_entrez(l): | |
46 return 'entrez_id' | |
47 else: | |
48 sys.exit('Execution aborted:\n' + | |
49 'gene ID type in ' + name + ' not supported. Supported ID'+ | |
50 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n') | |
51 | |
52 def check_hgnc(l :str) -> bool: | |
53 """ | |
54 Check if a gene identifier follows the HGNC format. | |
55 | |
56 Args: | |
57 l (str): The gene identifier to check. | |
58 | |
59 Returns: | |
60 bool: True if the gene identifier follows the HGNC format, False otherwise. | |
61 """ | |
62 if len(l) > 5: | |
63 if (l.upper()).startswith('HGNC:'): | |
64 return l[5:].isdigit() | |
65 else: | |
66 return False | |
67 else: | |
68 return False | |
69 | |
70 def check_ensembl(l :str) -> bool: | |
71 """ | |
72 Check if a gene identifier follows the Ensembl format. | |
73 | |
74 Args: | |
75 l (str): The gene identifier to check. | |
76 | |
77 Returns: | |
78 bool: True if the gene identifier follows the Ensembl format, False otherwise. | |
79 """ | |
80 return l.upper().startswith('ENS') | |
81 | |
82 | |
83 def check_symbol(l :str) -> bool: | |
84 """ | |
85 Check if a gene identifier follows the symbol format. | |
86 | |
87 Args: | |
88 l (str): The gene identifier to check. | |
89 | |
90 Returns: | |
91 bool: True if the gene identifier follows the symbol format, False otherwise. | |
92 """ | |
93 if len(l) > 0: | |
94 if l[0].isalpha() and l[1:].isalnum(): | |
95 return True | |
96 else: | |
97 return False | |
98 else: | |
99 return False | |
100 | |
101 def check_entrez(l :str) -> bool: | |
102 """ | |
103 Check if a gene identifier follows the Entrez ID format. | |
104 | |
105 Args: | |
106 l (str): The gene identifier to check. | |
107 | |
108 Returns: | |
109 bool: True if the gene identifier follows the Entrez ID format, False otherwise. | |
110 """ | |
111 if len(l) > 0: | |
112 return l.isdigit() | |
113 else: | |
114 return False | |
21 | 115 |
22 ################################- DATA GENERATION -################################ | 116 ################################- DATA GENERATION -################################ |
23 ReactionId = str | 117 ReactionId = str |
24 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | 118 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: |
25 """ | 119 """ |
504 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) | 598 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) |
505 return g | 599 return g |
506 | 600 |
507 def _simplify_boolean_expression(expr: str) -> str: | 601 def _simplify_boolean_expression(expr: str) -> str: |
508 """ | 602 """ |
509 Simplify a boolean expression by removing duplicates and redundancies. | 603 Simplify a boolean expression by removing duplicates while strictly preserving semantics. |
510 Handles expressions with 'and' and 'or'. | 604 This function handles simple duplicates within parentheses while being conservative about |
605 complex expressions that could change semantics. | |
511 """ | 606 """ |
512 if not expr or not expr.strip(): | 607 if not expr or not expr.strip(): |
513 return expr | 608 return expr |
514 | 609 |
515 # normalize operators | 610 # Normalize operators and whitespace |
516 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') | 611 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') |
517 | 612 expr = ' '.join(expr.split()) # Normalize whitespace |
518 # recursive helper to process expressions | 613 |
519 def process_expression(s: str) -> str: | 614 def simplify_parentheses_content(match_obj): |
520 s = s.strip() | 615 """Helper function to simplify content within parentheses.""" |
521 if not s: | 616 content = match_obj.group(1) # Content inside parentheses |
522 return s | 617 |
618 # Only simplify if it's a pure OR or pure AND chain | |
619 if ' or ' in content and ' and ' not in content: | |
620 # Pure OR chain - safe to deduplicate | |
621 parts = [p.strip() for p in content.split(' or ') if p.strip()] | |
622 unique_parts = [] | |
623 seen = set() | |
624 for part in parts: | |
625 if part not in seen: | |
626 unique_parts.append(part) | |
627 seen.add(part) | |
523 | 628 |
524 # handle parentheses | 629 if len(unique_parts) == 1: |
525 while '(' in s: | 630 return unique_parts[0] # Remove unnecessary parentheses for single items |
526 # find the innermost parentheses | |
527 start = -1 | |
528 for i, c in enumerate(s): | |
529 if c == '(': | |
530 start = i | |
531 elif c == ')' and start != -1: | |
532 # process inner content | |
533 inner = s[start+1:i] | |
534 processed_inner = process_expression(inner) | |
535 s = s[:start] + processed_inner + s[i+1:] | |
536 break | |
537 else: | 631 else: |
538 break | 632 return '(' + ' or '.join(unique_parts) + ')' |
539 | 633 |
540 # split by 'or' at top level | 634 elif ' and ' in content and ' or ' not in content: |
541 or_parts = [] | 635 # Pure AND chain - safe to deduplicate |
542 current_part = "" | 636 parts = [p.strip() for p in content.split(' and ') if p.strip()] |
543 paren_count = 0 | 637 unique_parts = [] |
544 | 638 seen = set() |
545 tokens = s.split() | 639 for part in parts: |
546 i = 0 | 640 if part not in seen: |
547 while i < len(tokens): | 641 unique_parts.append(part) |
548 token = tokens[i] | 642 seen.add(part) |
549 if token == 'or' and paren_count == 0: | 643 |
550 if current_part.strip(): | 644 if len(unique_parts) == 1: |
551 or_parts.append(current_part.strip()) | 645 return unique_parts[0] # Remove unnecessary parentheses for single items |
552 current_part = "" | |
553 else: | 646 else: |
554 if token.count('(') > token.count(')'): | 647 return '(' + ' and '.join(unique_parts) + ')' |
555 paren_count += token.count('(') - token.count(')') | 648 else: |
556 elif token.count(')') > token.count('('): | 649 # Mixed operators or single item - return with parentheses as-is |
557 paren_count -= token.count(')') - token.count('(') | 650 return '(' + content + ')' |
558 current_part += token + " " | 651 |
559 i += 1 | 652 def remove_duplicates_simple(parts_str: str, separator: str) -> str: |
560 | 653 """Remove duplicates from a simple chain of operations.""" |
561 if current_part.strip(): | 654 parts = [p.strip() for p in parts_str.split(separator) if p.strip()] |
562 or_parts.append(current_part.strip()) | 655 |
563 | 656 # Remove duplicates while preserving order |
564 # process each OR part | 657 unique_parts = [] |
565 processed_or_parts = [] | 658 seen = set() |
566 for or_part in or_parts: | 659 for part in parts: |
567 # split by 'and' within each OR part | 660 if part not in seen: |
568 and_parts = [] | 661 unique_parts.append(part) |
569 current_and = "" | 662 seen.add(part) |
570 paren_count = 0 | 663 |
664 if len(unique_parts) == 1: | |
665 return unique_parts[0] | |
666 else: | |
667 return f' {separator} '.join(unique_parts) | |
668 | |
669 try: | |
670 import re | |
671 | |
672 # First, simplify content within parentheses | |
673 # This handles cases like (A or A) -> A and (B and B) -> B | |
674 expr_simplified = re.sub(r'\(([^()]+)\)', simplify_parentheses_content, expr) | |
675 | |
676 # Check if the resulting expression has mixed operators | |
677 has_and = ' and ' in expr_simplified | |
678 has_or = ' or ' in expr_simplified | |
679 | |
680 # Only simplify top-level if it's pure AND or pure OR | |
681 if has_and and not has_or and '(' not in expr_simplified: | |
682 # Pure AND chain at top level - safe to deduplicate | |
683 return remove_duplicates_simple(expr_simplified, 'and') | |
684 elif has_or and not has_and and '(' not in expr_simplified: | |
685 # Pure OR chain at top level - safe to deduplicate | |
686 return remove_duplicates_simple(expr_simplified, 'or') | |
687 else: | |
688 # Mixed operators or has parentheses - return the simplified version (with parentheses content cleaned) | |
689 return expr_simplified | |
571 | 690 |
572 and_tokens = or_part.split() | |
573 j = 0 | |
574 while j < len(and_tokens): | |
575 token = and_tokens[j] | |
576 if token == 'and' and paren_count == 0: | |
577 if current_and.strip(): | |
578 and_parts.append(current_and.strip()) | |
579 current_and = "" | |
580 else: | |
581 if token.count('(') > token.count(')'): | |
582 paren_count += token.count('(') - token.count(')') | |
583 elif token.count(')') > token.count('('): | |
584 paren_count -= token.count(')') - token.count('(') | |
585 current_and += token + " " | |
586 j += 1 | |
587 | |
588 if current_and.strip(): | |
589 and_parts.append(current_and.strip()) | |
590 | |
591 # deduplicate AND parts | |
592 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine | |
593 | |
594 if len(unique_and_parts) == 1: | |
595 processed_or_parts.append(unique_and_parts[0]) | |
596 elif len(unique_and_parts) > 1: | |
597 processed_or_parts.append(" and ".join(unique_and_parts)) | |
598 | |
599 # deduplicate OR parts | |
600 unique_or_parts = list(dict.fromkeys(processed_or_parts)) | |
601 | |
602 if len(unique_or_parts) == 1: | |
603 return unique_or_parts[0] | |
604 elif len(unique_or_parts) > 1: | |
605 return " or ".join(unique_or_parts) | |
606 else: | |
607 return "" | |
608 | |
609 try: | |
610 return process_expression(expr) | |
611 except Exception: | 691 except Exception: |
612 # if simplification fails, return the original expression | 692 # If anything goes wrong, return the original expression |
613 return expr | 693 return expr |
614 | 694 |
615 # ---------- Main public function ---------- | 695 # ---------- Main public function ---------- |
616 def translate_model_genes(model: 'cobra.Model', | 696 def translate_model_genes(model: 'cobra.Model', |
617 mapping_df: 'pd.DataFrame', | 697 mapping_df: 'pd.DataFrame', |
618 target_nomenclature: str, | 698 target_nomenclature: str, |
619 source_nomenclature: str = 'hgnc_id', | 699 source_nomenclature: str = 'hgnc_id', |
620 allow_many_to_one: bool = False, | 700 allow_many_to_one: bool = False, |
621 logger: Optional[logging.Logger] = None) -> 'cobra.Model': | 701 logger: Optional[logging.Logger] = None) -> Tuple['cobra.Model', Dict[str, str]]: |
622 """ | 702 """ |
623 Translate model genes from source_nomenclature to target_nomenclature using a mapping table. | 703 Translate model genes from source_nomenclature to target_nomenclature using a mapping table. |
624 mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez). | 704 mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez). |
625 | 705 |
626 Args: | 706 Args: |
628 mapping_df: DataFrame containing the mapping information. | 708 mapping_df: DataFrame containing the mapping information. |
629 target_nomenclature: Desired target key (e.g., 'hgnc_symbol'). | 709 target_nomenclature: Desired target key (e.g., 'hgnc_symbol'). |
630 source_nomenclature: Current source key in the model (default 'hgnc_id'). | 710 source_nomenclature: Current source key in the model (default 'hgnc_id'). |
631 allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs. | 711 allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs. |
632 logger: Optional logger. | 712 logger: Optional logger. |
713 | |
714 Returns: | |
715 Tuple containing: | |
716 - Translated COBRA model | |
717 - Dictionary mapping reaction IDs to translation issue descriptions | |
633 """ | 718 """ |
634 if logger is None: | 719 if logger is None: |
635 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | 720 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
636 logger = logging.getLogger(__name__) | 721 logger = logging.getLogger(__name__) |
637 | 722 |
699 | 784 |
700 # statistics | 785 # statistics |
701 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0} | 786 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0} |
702 unmapped = [] | 787 unmapped = [] |
703 multi = [] | 788 multi = [] |
789 | |
790 # Dictionary to store translation issues per reaction | |
791 reaction_translation_issues = {} | |
704 | 792 |
705 original_genes = {g.id for g in model_copy.genes} | 793 original_genes = {g.id for g in model_copy.genes} |
706 logger.info(f"Original genes count: {len(original_genes)}") | 794 logger.info(f"Original genes count: {len(original_genes)}") |
707 | 795 |
708 # translate GPRs | 796 # translate GPRs |
709 for rxn in model_copy.reactions: | 797 for rxn in model_copy.reactions: |
710 gpr = rxn.gene_reaction_rule | 798 gpr = rxn.gene_reaction_rule |
711 if gpr and gpr.strip(): | 799 if gpr and gpr.strip(): |
712 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) | 800 new_gpr, rxn_issues = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger, track_issues=True) |
801 if rxn_issues: | |
802 reaction_translation_issues[rxn.id] = rxn_issues | |
803 | |
713 if new_gpr != gpr: | 804 if new_gpr != gpr: |
714 simplified_gpr = _simplify_boolean_expression(new_gpr) | 805 simplified_gpr = _simplify_boolean_expression(new_gpr) |
715 if simplified_gpr != new_gpr: | 806 if simplified_gpr != new_gpr: |
716 stats['simplified_gprs'] += 1 | 807 stats['simplified_gprs'] += 1 |
717 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") | 808 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") |
723 | 814 |
724 # final logging | 815 # final logging |
725 _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger) | 816 _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger) |
726 | 817 |
727 logger.info("Translation finished") | 818 logger.info("Translation finished") |
728 return model_copy | 819 return model_copy, reaction_translation_issues |
729 | 820 |
730 | 821 |
731 # ---------- helper functions ---------- | 822 # ---------- helper functions ---------- |
732 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]: | 823 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]: |
733 """ | 824 """ |
760 def _translate_gpr(gpr_string: str, | 851 def _translate_gpr(gpr_string: str, |
761 gene_mapping: Dict[str, List[str]], | 852 gene_mapping: Dict[str, List[str]], |
762 stats: Dict[str, int], | 853 stats: Dict[str, int], |
763 unmapped_genes: List[str], | 854 unmapped_genes: List[str], |
764 multi_mapping_genes: List[Tuple[str, List[str]]], | 855 multi_mapping_genes: List[Tuple[str, List[str]]], |
765 logger: logging.Logger) -> str: | 856 logger: logging.Logger, |
857 track_issues: bool = False) -> Union[str, Tuple[str, str]]: | |
766 """ | 858 """ |
767 Translate genes inside a GPR string using gene_mapping. | 859 Translate genes inside a GPR string using gene_mapping. |
768 Returns new GPR string. | 860 Returns new GPR string, and optionally translation issues if track_issues=True. |
769 """ | 861 """ |
770 # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols) | 862 # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols) |
771 token_pattern = r'\b[A-Za-z0-9:_.-]+\b' | 863 token_pattern = r'\b[A-Za-z0-9:_.-]+\b' |
772 tokens = re.findall(token_pattern, gpr_string) | 864 tokens = re.findall(token_pattern, gpr_string) |
773 | 865 |
774 logical = {'and', 'or', 'AND', 'OR', '(', ')'} | 866 logical = {'and', 'or', 'AND', 'OR', '(', ')'} |
775 tokens = [t for t in tokens if t not in logical] | 867 tokens = [t for t in tokens if t not in logical] |
776 | 868 |
777 new_gpr = gpr_string | 869 new_gpr = gpr_string |
870 issues = [] | |
778 | 871 |
779 for token in sorted(set(tokens), key=lambda x: -len(x)): # longer tokens first to avoid partial replacement | 872 for token in sorted(set(tokens), key=lambda x: -len(x)): # longer tokens first to avoid partial replacement |
780 norm = _normalize_gene_id(token) | 873 norm = _normalize_gene_id(token) |
781 if norm in gene_mapping: | 874 if norm in gene_mapping: |
782 targets = gene_mapping[norm] | 875 targets = gene_mapping[norm] |
786 replacement = targets[0] | 879 replacement = targets[0] |
787 else: | 880 else: |
788 stats['one_to_many'] += 1 | 881 stats['one_to_many'] += 1 |
789 multi_mapping_genes.append((token, targets)) | 882 multi_mapping_genes.append((token, targets)) |
790 replacement = "(" + " or ".join(targets) + ")" | 883 replacement = "(" + " or ".join(targets) + ")" |
884 if track_issues: | |
885 issues.append(f"{token} -> {' or '.join(targets)}") | |
791 | 886 |
792 pattern = r'\b' + re.escape(token) + r'\b' | 887 pattern = r'\b' + re.escape(token) + r'\b' |
793 new_gpr = re.sub(pattern, replacement, new_gpr) | 888 new_gpr = re.sub(pattern, replacement, new_gpr) |
794 else: | 889 else: |
795 stats['not_found'] += 1 | 890 stats['not_found'] += 1 |
796 if token not in unmapped_genes: | 891 if token not in unmapped_genes: |
797 unmapped_genes.append(token) | 892 unmapped_genes.append(token) |
798 logger.debug(f"Token not found in mapping (left as-is): {token}") | 893 logger.debug(f"Token not found in mapping (left as-is): {token}") |
799 | 894 |
800 return new_gpr | 895 # Check for many-to-one cases (multiple source genes mapping to same target) |
896 if track_issues: | |
897 # Build reverse mapping to detect many-to-one cases from original tokens | |
898 original_to_target = {} | |
899 | |
900 for orig_token in tokens: | |
901 norm = _normalize_gene_id(orig_token) | |
902 if norm in gene_mapping: | |
903 targets = gene_mapping[norm] | |
904 for target in targets: | |
905 if target not in original_to_target: | |
906 original_to_target[target] = [] | |
907 if orig_token not in original_to_target[target]: | |
908 original_to_target[target].append(orig_token) | |
909 | |
910 # Identify many-to-one mappings in this specific GPR | |
911 for target, original_genes in original_to_target.items(): | |
912 if len(original_genes) > 1: | |
913 issues.append(f"{' or '.join(original_genes)} -> {target}") | |
914 | |
915 issue_text = "; ".join(issues) if issues else "" | |
916 | |
917 if track_issues: | |
918 return new_gpr, issue_text | |
919 else: | |
920 return new_gpr | |
801 | 921 |
802 | 922 |
803 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger): | 923 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger): |
804 """ | 924 """ |
805 Rebuild model.genes from gene_reaction_rule content. | 925 Rebuild model.genes from gene_reaction_rule content. |
873 logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}") | 993 logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}") |
874 if multi_mapping_genes: | 994 if multi_mapping_genes: |
875 logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):") | 995 logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):") |
876 for orig, targets in multi_mapping_genes[:10]: | 996 for orig, targets in multi_mapping_genes[:10]: |
877 logger.info(f" {orig} -> {', '.join(targets)}") | 997 logger.info(f" {orig} -> {', '.join(targets)}") |
998 | |
999 |