Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/general_utils.py @ 408:f413b78d61bf draft
Uploaded
author | francesco_lapi |
---|---|
date | Mon, 08 Sep 2025 17:12:35 +0000 |
parents | a0b53ccc73a8 |
children | 71850bdf9e1e |
comparison
equal
deleted
inserted
replaced
407:6619f237aebe | 408:f413b78d61bf |
---|---|
5 import pickle | 5 import pickle |
6 import lxml.etree as ET | 6 import lxml.etree as ET |
7 | 7 |
8 from enum import Enum | 8 from enum import Enum |
9 from itertools import count | 9 from itertools import count |
10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union | 10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple |
11 | 11 |
12 import pandas as pd | 12 import pandas as pd |
13 import cobra | 13 import cobra |
14 from cobra import Model, Reaction, Metabolite | |
14 | 15 |
15 import zipfile | 16 import zipfile |
16 import gzip | 17 import gzip |
17 import bz2 | 18 import bz2 |
18 from io import StringIO | 19 from io import StringIO |
712 print("No annotation in gene dict!") | 713 print("No annotation in gene dict!") |
713 return -1 | 714 return -1 |
714 rename_genes(model2,dict_genes) | 715 rename_genes(model2,dict_genes) |
715 | 716 |
716 return model2 | 717 return model2 |
718 | |
719 | |
720 def build_cobra_model_from_csv(csv_path: str, model_id: str = "ENGRO2_custom") -> cobra.Model: | |
721 """ | |
722 Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. | |
723 | |
724 Args: | |
725 csv_path: Path al file CSV (separato da tab) | |
726 model_id: ID del modello da creare | |
727 | |
728 Returns: | |
729 cobra.Model: Il modello COBRApy costruito | |
730 """ | |
731 | |
732 # Leggi i dati dal CSV | |
733 df = pd.read_csv(csv_path, sep='\t') | |
734 | |
735 # Crea il modello vuoto | |
736 model = Model(model_id) | |
737 | |
738 # Dict per tenere traccia di metaboliti e compartimenti | |
739 metabolites_dict = {} | |
740 compartments_dict = {} | |
741 | |
742 print(f"Costruendo modello da {len(df)} reazioni...") | |
743 | |
744 # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni | |
745 for idx, row in df.iterrows(): | |
746 reaction_formula = str(row['Reaction']).strip() | |
747 if not reaction_formula or reaction_formula == 'nan': | |
748 continue | |
749 | |
750 # Estrai metaboliti dalla formula della reazione | |
751 metabolites = extract_metabolites_from_reaction(reaction_formula) | |
752 | |
753 for met_id in metabolites: | |
754 compartment = extract_compartment_from_metabolite(met_id) | |
755 | |
756 # Aggiungi compartimento se non esiste | |
757 if compartment not in compartments_dict: | |
758 compartments_dict[compartment] = compartment | |
759 | |
760 # Aggiungi metabolita se non esiste | |
761 if met_id not in metabolites_dict: | |
762 metabolites_dict[met_id] = Metabolite( | |
763 id=met_id, | |
764 compartment=compartment, | |
765 name=met_id.replace(f"_{compartment}", "").replace("__", "_") | |
766 ) | |
767 | |
768 # Aggiungi compartimenti al modello | |
769 model.compartments = compartments_dict | |
770 | |
771 # Aggiungi metaboliti al modello | |
772 model.add_metabolites(list(metabolites_dict.values())) | |
773 | |
774 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") | |
775 | |
776 # Seconda passata: aggiungi le reazioni | |
777 reactions_added = 0 | |
778 reactions_skipped = 0 | |
779 | |
780 for idx, row in df.iterrows(): | |
781 try: | |
782 reaction_id = str(row['ReactionID']).strip() | |
783 reaction_formula = str(row['Reaction']).strip() | |
784 | |
785 # Salta reazioni senza formula | |
786 if not reaction_formula or reaction_formula == 'nan': | |
787 reactions_skipped += 1 | |
788 continue | |
789 | |
790 # Crea la reazione | |
791 reaction = Reaction(reaction_id) | |
792 reaction.name = reaction_id | |
793 | |
794 # Imposta bounds | |
795 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 | |
796 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 | |
797 | |
798 # Aggiungi gene rule se presente | |
799 if pd.notna(row['Rule']) and str(row['Rule']).strip(): | |
800 reaction.gene_reaction_rule = str(row['Rule']).strip() | |
801 | |
802 # Parse della formula della reazione | |
803 try: | |
804 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) | |
805 except Exception as e: | |
806 print(f"Errore nel parsing della reazione {reaction_id}: {e}") | |
807 reactions_skipped += 1 | |
808 continue | |
809 | |
810 # Aggiungi la reazione al modello | |
811 model.add_reactions([reaction]) | |
812 reactions_added += 1 | |
813 | |
814 except Exception as e: | |
815 print(f"Errore nell'aggiungere la reazione {reaction_id}: {e}") | |
816 reactions_skipped += 1 | |
817 continue | |
818 | |
819 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") | |
820 | |
821 # Imposta l'obiettivo di biomassa | |
822 set_biomass_objective(model) | |
823 | |
824 # Imposta il medium | |
825 set_medium_from_data(model, df) | |
826 | |
827 print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") | |
828 | |
829 return model | |
830 | |
831 | |
832 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) | |
833 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: | |
834 """ | |
835 Estrae gli ID dei metaboliti da una formula di reazione. | |
836 Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) | |
837 e permette che comincino con cifre o underscore. | |
838 """ | |
839 metabolites = set() | |
840 # coefficiente opzionale seguito da un token che termina con _<letters> | |
841 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' | |
842 matches = re.findall(pattern, reaction_formula) | |
843 metabolites.update(matches) | |
844 return metabolites | |
845 | |
846 | |
847 def extract_compartment_from_metabolite(metabolite_id: str) -> str: | |
848 """ | |
849 Estrae il compartimento dall'ID del metabolita. | |
850 """ | |
851 # Il compartimento รจ solitamente l'ultima lettera dopo l'underscore | |
852 if '_' in metabolite_id: | |
853 return metabolite_id.split('_')[-1] | |
854 return 'c' # default cytoplasm | |
855 | |
856 | |
857 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): | |
858 """ | |
859 Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. | |
860 """ | |
861 | |
862 if reaction.id == 'EX_thbpt_e': | |
863 print(reaction.id) | |
864 print(formula) | |
865 # Dividi in parte sinistra e destra | |
866 if '<=>' in formula: | |
867 left, right = formula.split('<=>') | |
868 reversible = True | |
869 elif '<--' in formula: | |
870 left, right = formula.split('<--') | |
871 reversible = False | |
872 left, right = left, right | |
873 elif '-->' in formula: | |
874 left, right = formula.split('-->') | |
875 reversible = False | |
876 elif '<-' in formula: | |
877 left, right = formula.split('<-') | |
878 reversible = False | |
879 left, right = left, right | |
880 else: | |
881 raise ValueError(f"Formato reazione non riconosciuto: {formula}") | |
882 | |
883 # Parse dei metaboliti e coefficienti | |
884 reactants = parse_metabolites_side(left.strip()) | |
885 products = parse_metabolites_side(right.strip()) | |
886 | |
887 # Aggiungi metaboliti alla reazione | |
888 metabolites_to_add = {} | |
889 | |
890 # Reagenti (coefficienti negativi) | |
891 for met_id, coeff in reactants.items(): | |
892 if met_id in metabolites_dict: | |
893 metabolites_to_add[metabolites_dict[met_id]] = -coeff | |
894 | |
895 # Prodotti (coefficienti positivi) | |
896 for met_id, coeff in products.items(): | |
897 if met_id in metabolites_dict: | |
898 metabolites_to_add[metabolites_dict[met_id]] = coeff | |
899 | |
900 reaction.add_metabolites(metabolites_to_add) | |
901 | |
902 | |
903 def parse_metabolites_side(side_str: str) -> Dict[str, float]: | |
904 """ | |
905 Parsa un lato della reazione per estrarre metaboliti e coefficienti. | |
906 """ | |
907 metabolites = {} | |
908 if not side_str or side_str.strip() == '': | |
909 return metabolites | |
910 | |
911 terms = side_str.split('+') | |
912 for term in terms: | |
913 term = term.strip() | |
914 if not term: | |
915 continue | |
916 | |
917 # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> | |
918 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) | |
919 if match: | |
920 coeff_str, met_id = match.groups() | |
921 coeff = float(coeff_str) if coeff_str else 1.0 | |
922 metabolites[met_id] = coeff | |
923 | |
924 return metabolites | |
925 | |
926 | |
927 | |
928 def set_biomass_objective(model: Model): | |
929 """ | |
930 Imposta la reazione di biomassa come obiettivo. | |
931 """ | |
932 biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()] | |
933 | |
934 if biomass_reactions: | |
935 model.objective = biomass_reactions[0].id | |
936 print(f"Obiettivo impostato su: {biomass_reactions[0].id}") | |
937 else: | |
938 print("Nessuna reazione di biomassa trovata") | |
939 | |
940 | |
941 def set_medium_from_data(model: Model, df: pd.DataFrame): | |
942 """ | |
943 Imposta il medium basato sulla colonna InMedium. | |
944 """ | |
945 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() | |
946 | |
947 medium_dict = {} | |
948 for rxn_id in medium_reactions: | |
949 if rxn_id in [r.id for r in model.reactions]: | |
950 reaction = model.reactions.get_by_id(rxn_id) | |
951 if reaction.lower_bound < 0: # Solo reazioni di uptake | |
952 medium_dict[rxn_id] = abs(reaction.lower_bound) | |
953 | |
954 if medium_dict: | |
955 model.medium = medium_dict | |
956 print(f"Medium impostato con {len(medium_dict)} componenti") | |
957 | |
958 | |
959 def validate_model(model: Model) -> Dict[str, any]: | |
960 """ | |
961 Valida il modello e fornisce statistiche di base. | |
962 """ | |
963 validation = { | |
964 'num_reactions': len(model.reactions), | |
965 'num_metabolites': len(model.metabolites), | |
966 'num_genes': len(model.genes), | |
967 'num_compartments': len(model.compartments), | |
968 'objective': str(model.objective), | |
969 'medium_size': len(model.medium), | |
970 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), | |
971 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), | |
972 } | |
973 | |
974 try: | |
975 # Test di crescita | |
976 solution = model.optimize() | |
977 validation['growth_rate'] = solution.objective_value | |
978 validation['status'] = solution.status | |
979 except Exception as e: | |
980 validation['growth_rate'] = None | |
981 validation['status'] = f"Error: {e}" | |
982 | |
983 return validation |